Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions crates/flux-network/src/tcp/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use flux::spine::{SpineProducerWithDCache, SpineProducers};
use flux_timing::{Duration, Instant, Nanos, Repeater};
use flux_utils::{DCachePtr, safe_panic};
use mio::{Events, Interest, Poll, Token, event::Event, net::TcpListener};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};

use crate::tcp::{
ConnState, TcpStream, TcpTelemetry,
Expand Down Expand Up @@ -61,6 +61,7 @@ struct ConnectionManager {
telemetry: TcpTelemetry,
socket_buf_size: Option<usize>,
user_timeout_ms: u32,
nodelay: bool,
dcache: Option<DCachePtr>,
/// When set, connections whose send backlog exceeds `max` messages for
/// longer than `timeout` are disconnected (outbound scheduled for
Expand All @@ -82,6 +83,7 @@ impl Default for ConnectionManager {
telemetry: TcpTelemetry::Disabled,
socket_buf_size: None,
user_timeout_ms: DEFAULT_TCP_USER_TIMEOUT_MS,
nodelay: true,
dcache: None,
max_backlog: None,
to_be_reconnected: Vec::with_capacity(10),
Expand Down Expand Up @@ -305,7 +307,7 @@ impl ConnectionManager {
return None;
};
new_stream
.set_nodelay(true)
.set_nodelay(self.nodelay)
.inspect_err(|e| {
error!("couldn't setup nodelay for tcp stream for {addr}: {e}");
})
Expand Down Expand Up @@ -385,6 +387,7 @@ impl ConnectionManager {
if let Some(size) = self.socket_buf_size {
set_socket_buf_size(&stream, size);
}
set_user_timeout(&stream, self.user_timeout_ms);
let token = Token(self.next_token);
if let Err(e) =
self.poll.registry().register(&mut stream, token, Interest::READABLE)
Expand All @@ -393,7 +396,7 @@ impl ConnectionManager {
let _ = stream.shutdown(std::net::Shutdown::Both);
continue;
};
if let Err(e) = stream.set_nodelay(true) {
if let Err(e) = stream.set_nodelay(self.nodelay) {
error!("couldn't set nodelay on stream to {addr}: {e}");
continue;
}
Expand Down Expand Up @@ -470,6 +473,7 @@ impl ConnectionManager {
if let Some(size) = self.socket_buf_size {
set_socket_buf_size(&stream, size);
}
set_user_timeout(&stream, self.user_timeout_ms);
let token = Token(self.next_token);
if let Err(e) =
self.poll.registry().register(&mut stream, token, Interest::READABLE)
Expand All @@ -478,7 +482,7 @@ impl ConnectionManager {
let _ = stream.shutdown(std::net::Shutdown::Both);
continue;
};
if let Err(e) = stream.set_nodelay(true) {
if let Err(e) = stream.set_nodelay(self.nodelay) {
error!("couldn't set nodelay on stream to {addr}: {e}");
continue;
}
Expand Down Expand Up @@ -549,10 +553,11 @@ impl ConnectionManager {
pub struct TcpConnector {
events: Events,
conn_mgr: ConnectionManager,
event_capacity: usize,
}
impl Default for TcpConnector {
fn default() -> Self {
Self { events: Events::with_capacity(128), conn_mgr: ConnectionManager::default() }
Self { events: Events::with_capacity(128), conn_mgr: ConnectionManager::default(), event_capacity: 128 }
}
}
impl TcpConnector {
Expand Down Expand Up @@ -598,13 +603,22 @@ impl TcpConnector {
self
}

/// Overrides the TCP_USER_TIMEOUT socket option applied to
/// outbound connections.
/// Overrides the TCP_USER_TIMEOUT socket option applied to all
/// connections (outbound, reconnected, and accepted inbound).
pub fn with_user_timeout(mut self, timeout_ms: u32) -> Self {
self.conn_mgr.user_timeout_ms = timeout_ms;
self
}

/// Sets TCP_NODELAY on all connections. Default: `true` (Nagle disabled).
///
/// Set to `false` to allow Nagle's algorithm to coalesce small writes,
/// which can improve throughput at the cost of latency.
pub fn with_nodelay(mut self, nodelay: bool) -> Self {
self.conn_mgr.nodelay = nodelay;
self
}

/// Sets the maximum send backlog (in framed messages) and how long it must
/// stay exceeded before a connection is automatically disconnected.
///
Expand All @@ -619,6 +633,29 @@ impl TcpConnector {
self
}

/// Sets the initial epoll event buffer capacity.
///
/// The buffer auto-grows (doubles) whenever a `poll()` fills the current
/// capacity, so this only affects the starting size. Default: 128.
pub fn with_event_capacity(mut self, capacity: usize) -> Self {
self.event_capacity = capacity;
self.events = Events::with_capacity(capacity);
self
}

/// If the last `poll()` returned exactly `event_capacity` events, double
/// the buffer to avoid starving connections that didn't fit.
fn maybe_grow_events(&mut self, n_events: usize) {
// mio returns at most `capacity` events; hitting the limit means
// there were likely more fds ready than we could service.
if n_events >= self.event_capacity {
let new_cap = self.event_capacity * 2;
info!(old = self.event_capacity, new = new_cap, "epoll event buffer full, growing");
self.event_capacity = new_cap;
self.events = Events::with_capacity(new_cap);
}
Comment on lines +648 to +656
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 maybe_grow_events never grows from zero capacity due to 0 * 2 == 0

If with_event_capacity(0) is called, maybe_grow_events enters a degenerate state: n_events >= 0 is always true for usize, so the growth branch fires every poll cycle, but 0 * 2 = 0 means the capacity never actually increases. This causes: (1) the connector never processes any IO events since Events::with_capacity(0) can never return events from poll(), making the connector completely non-functional, and (2) an info! log is emitted on every single poll_with call, creating infinite log spam.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}

/// Polls sockets once (non-blocking) and dispatches events via
/// [`PollEvent`].
///
Expand All @@ -640,14 +677,15 @@ impl TcpConnector {
safe_panic!("got error polling {e}");
return false;
}
let mut o = false;
let mut n_events = 0usize;

for e in self.events.iter() {
o = true;
n_events += 1;
self.conn_mgr.handle_event(e, &mut handler);
}
self.maybe_grow_events(n_events);
self.conn_mgr.flush_backlogs();
o
n_events > 0
}

/// Like [`poll_with`] but for dcache-backed streams. The handler receives
Expand All @@ -671,13 +709,14 @@ impl TcpConnector {
safe_panic!("got error polling {e}");
return false;
}
let mut o = false;
let mut n_events = 0usize;
for e in self.events.iter() {
o = true;
n_events += 1;
self.conn_mgr.handle_event_produce(e, produce, &mut on_msg);
}
self.maybe_grow_events(n_events);
self.conn_mgr.flush_backlogs();
o
n_events > 0
}

/// Writes immediately or enqueues bytes for later sending.
Expand Down
19 changes: 17 additions & 2 deletions crates/flux-network/src/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,14 @@ impl TcpStream {
RxState::ReadingHeader { mut buf, mut have } => {
while have < FRAME_HEADER_SIZE {
match self.stream.read(&mut buf[have..]) {
Ok(0) => return ReadOutcome::Disconnected,
Ok(0) => {
debug!(
peer = %self.peer_addr,
have,
"tcp: connection closed by peer (reading header)",
);
return ReadOutcome::Disconnected;
}

Ok(n) => {
have += n;
Expand Down Expand Up @@ -511,7 +518,15 @@ impl TcpStream {
self.stream.read(&mut buf[offset..msg_len])
};
match result {
Ok(0) => return ReadOutcome::Disconnected,
Ok(0) => {
debug!(
peer = %self.peer_addr,
msg_len,
offset,
"tcp: connection closed by peer (reading payload)",
);
return ReadOutcome::Disconnected;
}
Ok(n) => {
offset += n;
if offset == msg_len {
Expand Down
Loading