Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: test

on:
push:
branches: [main]
pull_request:

env:
CARGO_TERM_COLOR: always

jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- uses: actions/checkout@v4

- name: Setup Rust cache
uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly-2025-06-01

- name: Install buf
uses: bufbuild/buf-action@v1
with:
setup_only: true

- name: Run tests
run: cargo test
48 changes: 48 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: lint

on:
push:
branches: [main]
pull_request:

env:
CARGO_TERM_COLOR: always

jobs:
lint:
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- uses: actions/checkout@v4

- name: Setup Rust cache
uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly-2025-06-01
components: clippy, rustfmt

- name: Install buf
uses: bufbuild/buf-action@v1
with:
setup_only: true

- name: Setup just
uses: extractions/setup-just@v2
with:
just-version: 1.5.0

- name: Check compilation
run: cargo check

- name: Check formatting
run: just fmt-check

- name: Check clippy
run: just clippy
199 changes: 104 additions & 95 deletions crates/network/benches/quic_basic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicUsize, Ordering}, Arc
}, time::Duration,
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};

use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main};
Expand All @@ -28,91 +30,101 @@ pub fn broadcast(c: &mut Criterion) {
for i in 1..=3 {
let criterion_batch_size = BatchSize::PerIteration;
let throughput = Throughput::Elements((total * i) as u64);

group.throughput(throughput.clone()).bench_function(format!("quic_basic_{BATCH_SIZE}_{i}"), |x| {
x.iter_batched(
|| {
let recv_counter = Arc::new(AtomicUsize::default());

let (mut server_tile, server_id) = {
let secret = k256::ecdsa::SigningKey::random(&mut rng);
let key_bytes: [u8; 32] = secret.to_bytes().into();
let keypair = Keypair::from_secret(&key_bytes).unwrap();
let server_id = keypair.peer_id();
let server_config = silver_network::create_server_config(&keypair).unwrap();
let server_endpoint = Endpoint::new(
Arc::new(EndpointConfig::default()),
Some(Arc::new(server_config)),
false,
None,
);
(
NetworkTile::new(
keypair,
server_endpoint,
"0.0.0.0:20001".parse().unwrap(),
ServerHandler(recv_counter.clone()),

group.throughput(throughput.clone()).bench_function(
format!("quic_basic_{BATCH_SIZE}_{i}"),
|x| {
x.iter_batched(
|| {
let recv_counter = Arc::new(AtomicUsize::default());

let (mut server_tile, server_id) = {
let secret = k256::ecdsa::SigningKey::random(&mut rng);
let key_bytes: [u8; 32] = secret.to_bytes().into();
let keypair = Keypair::from_secret(&key_bytes).unwrap();
let server_id = keypair.peer_id();
let server_config =
silver_network::create_server_config(&keypair).unwrap();
let server_endpoint = Endpoint::new(
Arc::new(EndpointConfig::default()),
Some(Arc::new(server_config)),
false,
None,
);
(
NetworkTile::new(
keypair,
server_endpoint,
"0.0.0.0:20001".parse().unwrap(),
ServerHandler(recv_counter.clone()),
)
.unwrap(),
server_id,
)
.unwrap(),
server_id,
)
};

let server_handle = std::thread::spawn(move || {
loop {
server_tile.spin();
if recv_counter.load(Ordering::Relaxed) == (total * i) {
tracing::info!("server completed");
break;
};

let server_handle = std::thread::spawn(move || {
loop {
server_tile.spin();
if recv_counter.load(Ordering::Relaxed) == (total * i) {
tracing::info!("server completed");
break;
}
}
});

let mut clients = vec![];
for n in 0..i {
let data = data.clone();

let secret = k256::ecdsa::SigningKey::random(&mut rng);
let key_bytes: [u8; 32] = secret.to_bytes().into();
let keypair = Keypair::from_secret(&key_bytes).unwrap();
let client_endpoint = Endpoint::new(
Arc::new(EndpointConfig::default()),
None,
false,
None,
);

let client_data = ClientData {
server_id: Some(server_id.clone()),
server_addr: "127.0.0.1:20001".parse().unwrap(),
remote_peer: None,
remote_stream: None,
data,
offset: 0,
did_stream: false,
};

let addr = format!("127.0.0.1:{}", 20002 + n);

clients.push(
NetworkTile::new(
keypair,
client_endpoint,
addr.parse().unwrap(),
client_data,
)
.unwrap(),
);
}
});

let mut clients = vec![];
for n in 0..i {
let data = data.clone();

let secret = k256::ecdsa::SigningKey::random(&mut rng);
let key_bytes: [u8; 32] = secret.to_bytes().into();
let keypair = Keypair::from_secret(&key_bytes).unwrap();
let client_endpoint =
Endpoint::new(Arc::new(EndpointConfig::default()), None, false, None);

let client_data = ClientData {
server_id: Some(server_id.clone()),
server_addr: "127.0.0.1:20001".parse().unwrap(),
remote_peer: None,
remote_stream: None,
data,
offset: 0,
did_stream: false,
};

let addr = format!("127.0.0.1:{}", 20002 + n);

clients.push(NetworkTile::new(
keypair,
client_endpoint,
addr.parse().unwrap(),
client_data,
)
.unwrap());
}

std::thread::sleep(Duration::from_millis(200));
(server_handle, clients)
},
|(handle, mut clients)| {
while !handle.is_finished() {
for client in &mut clients {
client.spin();
std::thread::sleep(Duration::from_millis(200));
(server_handle, clients)
},
|(handle, mut clients)| {
while !handle.is_finished() {
for client in &mut clients {
client.spin();
}
}
}
handle.join().unwrap();
},
criterion_batch_size,
);
});
handle.join().unwrap();
},
criterion_batch_size,
);
},
);
}
}

Expand All @@ -139,14 +151,12 @@ impl silver_network::NetworkSend for ServerHandler {
fn to_send(&mut self) -> Option<(silver_network::RemotePeer, quinn_proto::StreamId, &[u8])> {
None
}

fn new_streams(&mut self) -> Option<(RemotePeer, quinn_proto::Dir)> {
None
}

fn sent(&mut self, _peer: &RemotePeer, _stream: &quinn_proto::StreamId, _sent: usize) {

}

fn sent(&mut self, _peer: &RemotePeer, _stream: &quinn_proto::StreamId, _sent: usize) {}
}

impl silver_network::NetworkRecv for ServerHandler {
Expand Down Expand Up @@ -193,24 +203,23 @@ impl silver_network::NetworkSend for ClientData {
let Some(remote_stream) = self.remote_stream.as_ref()
{
if let Some(data) = self.data.last() {
return Some((
remote_peer.clone(),
remote_stream.clone(),
&data[self.offset..],
));
return Some((remote_peer.clone(), remote_stream.clone(), &data[self.offset..]));
}
}
None
}

fn new_streams(&mut self) -> Option<(RemotePeer, quinn_proto::Dir)> {
if let Some(remote_peer) = self.remote_peer.as_ref() && self.remote_stream.is_none() && !self.did_stream {
if let Some(remote_peer) = self.remote_peer.as_ref() &&
self.remote_stream.is_none() &&
!self.did_stream
{
self.did_stream = true;
return Some((remote_peer.clone(), quinn_proto::Dir::Bi));
}
None
}

fn sent(&mut self, _peer: &RemotePeer, _stream: &quinn_proto::StreamId, sent: usize) {
self.offset += sent;
let pop = self.data.last().map(|v| self.offset >= v.len()).unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait NetworkSend: Send {
/// `None` is returned.
fn to_send(&mut self) -> Option<(RemotePeer, StreamId, &[u8])>;

/// Send result callback.
/// Send result callback.
fn sent(&mut self, peer: &RemotePeer, stream: &StreamId, sent: usize);
}

Expand Down
19 changes: 13 additions & 6 deletions crates/network/src/p2p/quic/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::{io::Error, time::Instant};

use bytes::Bytes;
use quinn_proto::{
Connection, ConnectionEvent, ConnectionHandle, Dir, EndpointEvent, StreamId, Transmit, VarInt
Connection, ConnectionEvent, ConnectionHandle, Dir, EndpointEvent, StreamId, Transmit, VarInt,
};
use silver_common::PeerId;

use crate::{p2p::tls::peer_id_from_certificate, NetworkRecv, NetworkSend, RemotePeer};
use crate::{NetworkRecv, NetworkSend, RemotePeer, p2p::tls::peer_id_from_certificate};

pub(crate) struct Peer {
id: RemotePeer,
Expand Down Expand Up @@ -38,7 +38,8 @@ impl Peer {
now: Instant,
ep_callback: &mut F,
handler: &mut H,
) -> Option<Instant> where
) -> Option<Instant>
where
F: FnMut(ConnectionHandle, EndpointEvent) -> Option<ConnectionEvent>,
{
while self.connection.poll_timeout().is_some_and(|t| t <= now) {
Expand Down Expand Up @@ -75,7 +76,12 @@ impl Peer {
match stream_event {
quinn_proto::StreamEvent::Opened { dir } => {
while let Some(id) = self.connection.streams().accept(dir) {
tracing::info!(?id, ?dir, spins=self.spin_count, "stream openned");
tracing::info!(
?id,
?dir,
spins = self.spin_count,
"stream openned"
);
handler.new_stream(&self.id, &id);

// try to read
Expand All @@ -100,7 +106,8 @@ impl Peer {
// conn.send_stream(id).write(data) {

// }
//tracing::info!(spins=self.spin_count, "stream writable");
//tracing::info!(spins=self.spin_count, "stream
// writable");
}
quinn_proto::StreamEvent::Finished { id } => {
tracing::info!(?id, "stream finished");
Expand All @@ -111,7 +118,7 @@ impl Peer {
quinn_proto::StreamEvent::Available { dir } => {
// Callback if it is now possible ot open a new stream (when previously
// at limits)
tracing::info!(?dir, spins=self.spin_count, "stream available");
tracing::info!(?dir, spins = self.spin_count, "stream available");
if let Some(id) = self.connection.streams().open(dir) {}
}
}
Expand Down
Loading
Loading