diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..880f753 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,36 @@ +name: test + +on: + push: + branches: [main] + pull_request: + +env: + CARGO_TERM_COLOR: always + +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust cache + uses: Swatinem/rust-cache@v2 + with: + cache-on-failure: true + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: nightly-2025-06-01 + + - name: Install buf + uses: bufbuild/buf-action@v1 + with: + setup_only: true + + - name: Run tests + run: cargo test \ No newline at end of file diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..7958c77 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,48 @@ +name: lint + +on: + push: + branches: [main] + pull_request: + +env: + CARGO_TERM_COLOR: always + +jobs: + lint: + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - uses: actions/checkout@v4 + + - name: Setup Rust cache + uses: Swatinem/rust-cache@v2 + with: + cache-on-failure: true + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: nightly-2025-06-01 + components: clippy, rustfmt + + - name: Install buf + uses: bufbuild/buf-action@v1 + with: + setup_only: true + + - name: Setup just + uses: extractions/setup-just@v2 + with: + just-version: 1.5.0 + + - name: Check compilation + run: cargo check + + - name: Check formatting + run: just fmt-check + + - name: Check clippy + run: just clippy \ No newline at end of file diff --git a/crates/network/benches/quic_basic.rs b/crates/network/benches/quic_basic.rs index 43e8802..df3439d 100644 --- a/crates/network/benches/quic_basic.rs +++ b/crates/network/benches/quic_basic.rs @@ -1,8 +1,10 @@ use std::{ net::SocketAddr, sync::{ - atomic::{AtomicUsize, Ordering}, Arc - }, time::Duration, + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, }; use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main}; @@ -28,91 +30,101 @@ pub fn broadcast(c: &mut Criterion) { for i in 1..=3 { let criterion_batch_size = BatchSize::PerIteration; let throughput = Throughput::Elements((total * i) as u64); - - group.throughput(throughput.clone()).bench_function(format!("quic_basic_{BATCH_SIZE}_{i}"), |x| { - x.iter_batched( - || { - let recv_counter = Arc::new(AtomicUsize::default()); - - let (mut server_tile, server_id) = { - let secret = k256::ecdsa::SigningKey::random(&mut rng); - let key_bytes: [u8; 32] = secret.to_bytes().into(); - let keypair = Keypair::from_secret(&key_bytes).unwrap(); - let server_id = keypair.peer_id(); - let server_config = silver_network::create_server_config(&keypair).unwrap(); - let server_endpoint = Endpoint::new( - Arc::new(EndpointConfig::default()), - Some(Arc::new(server_config)), - false, - None, - ); - ( - NetworkTile::new( - keypair, - server_endpoint, - "0.0.0.0:20001".parse().unwrap(), - ServerHandler(recv_counter.clone()), + + group.throughput(throughput.clone()).bench_function( + format!("quic_basic_{BATCH_SIZE}_{i}"), + |x| { + x.iter_batched( + || { + let recv_counter = Arc::new(AtomicUsize::default()); + + let (mut server_tile, server_id) = { + let secret = k256::ecdsa::SigningKey::random(&mut rng); + let key_bytes: [u8; 32] = secret.to_bytes().into(); + let keypair = Keypair::from_secret(&key_bytes).unwrap(); + let server_id = keypair.peer_id(); + let server_config = + silver_network::create_server_config(&keypair).unwrap(); + let server_endpoint = Endpoint::new( + Arc::new(EndpointConfig::default()), + Some(Arc::new(server_config)), + false, + None, + ); + ( + NetworkTile::new( + keypair, + server_endpoint, + "0.0.0.0:20001".parse().unwrap(), + ServerHandler(recv_counter.clone()), + ) + .unwrap(), + server_id, ) - .unwrap(), - server_id, - ) - }; - - let server_handle = std::thread::spawn(move || { - loop { - server_tile.spin(); - if recv_counter.load(Ordering::Relaxed) == (total * i) { - tracing::info!("server completed"); - break; + }; + + let server_handle = std::thread::spawn(move || { + loop { + server_tile.spin(); + if recv_counter.load(Ordering::Relaxed) == (total * i) { + tracing::info!("server completed"); + break; + } } + }); + + let mut clients = vec![]; + for n in 0..i { + let data = data.clone(); + + let secret = k256::ecdsa::SigningKey::random(&mut rng); + let key_bytes: [u8; 32] = secret.to_bytes().into(); + let keypair = Keypair::from_secret(&key_bytes).unwrap(); + let client_endpoint = Endpoint::new( + Arc::new(EndpointConfig::default()), + None, + false, + None, + ); + + let client_data = ClientData { + server_id: Some(server_id.clone()), + server_addr: "127.0.0.1:20001".parse().unwrap(), + remote_peer: None, + remote_stream: None, + data, + offset: 0, + did_stream: false, + }; + + let addr = format!("127.0.0.1:{}", 20002 + n); + + clients.push( + NetworkTile::new( + keypair, + client_endpoint, + addr.parse().unwrap(), + client_data, + ) + .unwrap(), + ); } - }); - - let mut clients = vec![]; - for n in 0..i { - let data = data.clone(); - - let secret = k256::ecdsa::SigningKey::random(&mut rng); - let key_bytes: [u8; 32] = secret.to_bytes().into(); - let keypair = Keypair::from_secret(&key_bytes).unwrap(); - let client_endpoint = - Endpoint::new(Arc::new(EndpointConfig::default()), None, false, None); - - let client_data = ClientData { - server_id: Some(server_id.clone()), - server_addr: "127.0.0.1:20001".parse().unwrap(), - remote_peer: None, - remote_stream: None, - data, - offset: 0, - did_stream: false, - }; - let addr = format!("127.0.0.1:{}", 20002 + n); - - clients.push(NetworkTile::new( - keypair, - client_endpoint, - addr.parse().unwrap(), - client_data, - ) - .unwrap()); - } - - std::thread::sleep(Duration::from_millis(200)); - (server_handle, clients) - }, - |(handle, mut clients)| { - while !handle.is_finished() { - for client in &mut clients { - client.spin(); + std::thread::sleep(Duration::from_millis(200)); + (server_handle, clients) + }, + |(handle, mut clients)| { + while !handle.is_finished() { + for client in &mut clients { + client.spin(); + } } - } - handle.join().unwrap(); - }, - criterion_batch_size, - ); - }); + handle.join().unwrap(); + }, + criterion_batch_size, + ); + }, + ); } } @@ -139,14 +151,12 @@ impl silver_network::NetworkSend for ServerHandler { fn to_send(&mut self) -> Option<(silver_network::RemotePeer, quinn_proto::StreamId, &[u8])> { None } - + fn new_streams(&mut self) -> Option<(RemotePeer, quinn_proto::Dir)> { None } - - fn sent(&mut self, _peer: &RemotePeer, _stream: &quinn_proto::StreamId, _sent: usize) { - - } + + fn sent(&mut self, _peer: &RemotePeer, _stream: &quinn_proto::StreamId, _sent: usize) {} } impl silver_network::NetworkRecv for ServerHandler { @@ -193,24 +203,23 @@ impl silver_network::NetworkSend for ClientData { let Some(remote_stream) = self.remote_stream.as_ref() { if let Some(data) = self.data.last() { - return Some(( - remote_peer.clone(), - remote_stream.clone(), - &data[self.offset..], - )); + return Some((remote_peer.clone(), remote_stream.clone(), &data[self.offset..])); } } None } - + fn new_streams(&mut self) -> Option<(RemotePeer, quinn_proto::Dir)> { - if let Some(remote_peer) = self.remote_peer.as_ref() && self.remote_stream.is_none() && !self.did_stream { + if let Some(remote_peer) = self.remote_peer.as_ref() && + self.remote_stream.is_none() && + !self.did_stream + { self.did_stream = true; return Some((remote_peer.clone(), quinn_proto::Dir::Bi)); } None } - + fn sent(&mut self, _peer: &RemotePeer, _stream: &quinn_proto::StreamId, sent: usize) { self.offset += sent; let pop = self.data.last().map(|v| self.offset >= v.len()).unwrap_or_default(); diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index 1b1f06b..14f5f94 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -30,7 +30,7 @@ pub trait NetworkSend: Send { /// `None` is returned. fn to_send(&mut self) -> Option<(RemotePeer, StreamId, &[u8])>; - /// Send result callback. + /// Send result callback. fn sent(&mut self, peer: &RemotePeer, stream: &StreamId, sent: usize); } diff --git a/crates/network/src/p2p/quic/peer.rs b/crates/network/src/p2p/quic/peer.rs index 9508bc0..3250610 100644 --- a/crates/network/src/p2p/quic/peer.rs +++ b/crates/network/src/p2p/quic/peer.rs @@ -2,11 +2,11 @@ use std::{io::Error, time::Instant}; use bytes::Bytes; use quinn_proto::{ - Connection, ConnectionEvent, ConnectionHandle, Dir, EndpointEvent, StreamId, Transmit, VarInt + Connection, ConnectionEvent, ConnectionHandle, Dir, EndpointEvent, StreamId, Transmit, VarInt, }; use silver_common::PeerId; -use crate::{p2p::tls::peer_id_from_certificate, NetworkRecv, NetworkSend, RemotePeer}; +use crate::{NetworkRecv, NetworkSend, RemotePeer, p2p::tls::peer_id_from_certificate}; pub(crate) struct Peer { id: RemotePeer, @@ -38,7 +38,8 @@ impl Peer { now: Instant, ep_callback: &mut F, handler: &mut H, - ) -> Option where + ) -> Option + where F: FnMut(ConnectionHandle, EndpointEvent) -> Option, { while self.connection.poll_timeout().is_some_and(|t| t <= now) { @@ -75,7 +76,12 @@ impl Peer { match stream_event { quinn_proto::StreamEvent::Opened { dir } => { while let Some(id) = self.connection.streams().accept(dir) { - tracing::info!(?id, ?dir, spins=self.spin_count, "stream openned"); + tracing::info!( + ?id, + ?dir, + spins = self.spin_count, + "stream openned" + ); handler.new_stream(&self.id, &id); // try to read @@ -100,7 +106,8 @@ impl Peer { // conn.send_stream(id).write(data) { // } - //tracing::info!(spins=self.spin_count, "stream writable"); + //tracing::info!(spins=self.spin_count, "stream + // writable"); } quinn_proto::StreamEvent::Finished { id } => { tracing::info!(?id, "stream finished"); @@ -111,7 +118,7 @@ impl Peer { quinn_proto::StreamEvent::Available { dir } => { // Callback if it is now possible ot open a new stream (when previously // at limits) - tracing::info!(?dir, spins=self.spin_count, "stream available"); + tracing::info!(?dir, spins = self.spin_count, "stream available"); if let Some(id) = self.connection.streams().open(dir) {} } } diff --git a/justfile b/justfile index 7febb4b..d4fa682 100644 --- a/justfile +++ b/justfile @@ -2,10 +2,12 @@ toolchain := "nightly-2025-06-01" fmt: rustup toolchain install {{toolchain}} > /dev/null 2>&1 && \ + rustup toolchain install {{toolchain}} --component rustfmt > /dev/null 2>&1 && \ cargo +{{toolchain}} fmt fmt-check: rustup toolchain install {{toolchain}} > /dev/null 2>&1 && \ + rustup toolchain install {{toolchain}} --component rustfmt > /dev/null 2>&1 && \ cargo +{{toolchain}} fmt --check clippy: diff --git a/rust-toolchain.toml b/rust-toolchain.toml index d70af36..22fd8ba 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] channel = "1.91.0" -components = ["rust-analyzer", "rustfmt"] +components = ["clippy", "rust-analyzer", "rustfmt"]