diff --git a/common/src/typed_socket/mod.rs b/common/src/typed_socket/mod.rs index 0e8ff2fe..3b747e2b 100644 --- a/common/src/typed_socket/mod.rs +++ b/common/src/typed_socket/mod.rs @@ -99,6 +99,13 @@ impl TypedSocket { pub async fn close(&mut self) { let _ = self.send.send(SocketAction::Close).await; } + + /// Returns (pending_outgoing, pending_incoming) message counts + pub fn channel_depths(&self) -> (usize, usize) { + let outgoing = self.send.max_capacity() - self.send.capacity(); + let incoming = self.recv.len(); + (outgoing, incoming) + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/plane/src/controller/drone.rs b/plane/src/controller/drone.rs index 5ff78e24..93d1676b 100644 --- a/plane/src/controller/drone.rs +++ b/plane/src/controller/drone.rs @@ -16,6 +16,7 @@ use plane_common::{ }; use serde::Deserialize; use std::{ + collections::HashMap, net::{IpAddr, SocketAddr}, time::Duration, }; @@ -204,11 +205,30 @@ pub async fn drone_socket_inner( let mut interval = tokio::time::interval(Duration::from_secs(5)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut log_interval = tokio::time::interval(Duration::from_secs(60)); + log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut message_counts: HashMap<&'static str, u64> = HashMap::new(); + loop { tokio::select! { _ = interval.tick() => { process_pending_actions(&controller.db, &mut socket, &drone_id).await?; } + _ = log_interval.tick() => { + let (outgoing, incoming) = socket.channel_depths(); + tracing::info!( + drone_id = drone_id.as_i32(), + outgoing_pending = outgoing, + incoming_pending = incoming, + heartbeat = message_counts.get("heartbeat").copied().unwrap_or(0), + backend_event = message_counts.get("backend_event").copied().unwrap_or(0), + ack_action = message_counts.get("ack_action").copied().unwrap_or(0), + renew_key = message_counts.get("renew_key").copied().unwrap_or(0), + backend_metrics = message_counts.get("backend_metrics").copied().unwrap_or(0), + "Drone channel stats (last 60s)" + ); + message_counts.clear(); + } backend_action_result = backend_actions.next() => { match backend_action_result { Some(backend_action) => { @@ -226,6 +246,23 @@ pub async fn drone_socket_inner( message_from_drone_result = socket.recv() => { match message_from_drone_result { Some(message_from_drone) => { + match &message_from_drone { + MessageFromDrone::Heartbeat(_) => { + *message_counts.entry("heartbeat").or_insert(0) += 1; + } + MessageFromDrone::BackendEvent(_) => { + *message_counts.entry("backend_event").or_insert(0) += 1; + } + MessageFromDrone::AckAction { .. } => { + *message_counts.entry("ack_action").or_insert(0) += 1; + } + MessageFromDrone::RenewKey(_) => { + *message_counts.entry("renew_key").or_insert(0) += 1; + } + MessageFromDrone::BackendMetrics(_) => { + *message_counts.entry("backend_metrics").or_insert(0) += 1; + } + } if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, &mut socket).await { tracing::error!(?err, "Error handling message from drone"); } diff --git a/plane/src/controller/proxy.rs b/plane/src/controller/proxy.rs index b6e60479..bf4e7a3f 100644 --- a/plane/src/controller/proxy.rs +++ b/plane/src/controller/proxy.rs @@ -17,7 +17,11 @@ use plane_common::{ typed_socket::{server::new_server, TypedSocket}, types::{BackendState, BearerToken, ClusterName, NodeId}, }; -use std::net::{IpAddr, SocketAddr}; +use std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + time::Duration, +}; use tokio::select; use valuable::Valuable; @@ -264,11 +268,41 @@ pub async fn proxy_socket_inner( let mut event_subscription: Subscription = controller.db.subscribe(); + let mut log_interval = tokio::time::interval(Duration::from_secs(60)); + log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut message_counts: HashMap<&'static str, u64> = HashMap::new(); + loop { select! { + _ = log_interval.tick() => { + let (outgoing, incoming) = socket.channel_depths(); + tracing::info!( + node_id = node_guard.id.as_i32(), + outgoing_pending = outgoing, + incoming_pending = incoming, + route_info_request = message_counts.get("route_info_request").copied().unwrap_or(0), + keep_alive = message_counts.get("keep_alive").copied().unwrap_or(0), + cert_manager_request = message_counts.get("cert_manager_request").copied().unwrap_or(0), + "Proxy channel stats (last 60s)" + ); + message_counts.clear(); + } message_from_proxy_result = socket.recv() => { match message_from_proxy_result { - Some(message) => handle_message_from_proxy(message, &controller, &mut socket, &cluster, node_guard.id).await?, + Some(message) => { + match &message { + MessageFromProxy::RouteInfoRequest(_) => { + *message_counts.entry("route_info_request").or_insert(0) += 1; + } + MessageFromProxy::KeepAlive(_) => { + *message_counts.entry("keep_alive").or_insert(0) += 1; + } + MessageFromProxy::CertManagerRequest(_) => { + *message_counts.entry("cert_manager_request").or_insert(0) += 1; + } + } + handle_message_from_proxy(message, &controller, &mut socket, &cluster, node_guard.id).await? + } None => { tracing::info!("Proxy socket closed"); break; diff --git a/plane/src/drone/mod.rs b/plane/src/drone/mod.rs index faad8087..22cadbef 100644 --- a/plane/src/drone/mod.rs +++ b/plane/src/drone/mod.rs @@ -25,6 +25,7 @@ use runtime::docker::DockerRuntime; use rusqlite::Connection; use serde::{Deserialize, Serialize}; use std::{ + collections::HashMap, fs::{set_permissions, File, Permissions}, net::IpAddr, os::unix::fs::PermissionsExt, @@ -93,19 +94,51 @@ pub async fn drone_loop( } } + let mut log_interval = tokio::time::interval(std::time::Duration::from_secs(60)); + log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut message_counts: HashMap<&'static str, u64> = HashMap::new(); + loop { - let Some(message) = socket.recv().await else { - tracing::warn!("Connection closed."); - break; - }; + tokio::select! { + _ = log_interval.tick() => { + let (outgoing, incoming) = socket.channel_depths(); + tracing::info!( + outgoing_pending = outgoing, + incoming_pending = incoming, + action = message_counts.get("action").copied().unwrap_or(0), + ack_event = message_counts.get("ack_event").copied().unwrap_or(0), + renew_key_response = message_counts.get("renew_key_response").copied().unwrap_or(0), + "Drone channel stats (last 60s)" + ); + message_counts.clear(); + } + message_result = socket.recv() => { + let Some(message) = message_result else { + tracing::warn!("Connection closed."); + break; + }; + + match &message { + MessageToDrone::Action(_) => { + *message_counts.entry("action").or_insert(0) += 1; + } + MessageToDrone::AckEvent { .. } => { + *message_counts.entry("ack_event").or_insert(0) += 1; + } + MessageToDrone::RenewKeyResponse(_) => { + *message_counts.entry("renew_key_response").or_insert(0) += 1; + } + } - let key_manager = key_manager.clone(); - tokio::spawn(handle_message( - message, - key_manager, - socket.sender(|x| x), - executor.clone(), - )); + let key_manager = key_manager.clone(); + tokio::spawn(handle_message( + message, + key_manager, + socket.sender(|x| x), + executor.clone(), + )); + } + } } } diff --git a/plane/src/proxy/proxy_connection.rs b/plane/src/proxy/proxy_connection.rs index 77fc1511..80b9da75 100644 --- a/plane/src/proxy/proxy_connection.rs +++ b/plane/src/proxy/proxy_connection.rs @@ -5,7 +5,7 @@ use plane_common::{ types::ClusterName, PlaneClient, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::task::JoinHandle; use valuable::Valuable; @@ -58,21 +58,57 @@ impl ProxyConnection { } }); - while let Some(message) = conn.recv().await { - match message { - MessageToProxy::RouteInfoResponse(response) => { - state.inner.route_map.receive(response); - } - MessageToProxy::CertManagerResponse(response) => { + let mut log_interval = tokio::time::interval(Duration::from_secs(60)); + log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut message_counts: HashMap<&'static str, u64> = HashMap::new(); + + loop { + tokio::select! { + _ = log_interval.tick() => { + let (outgoing, incoming) = conn.channel_depths(); tracing::info!( - response = response.as_value(), - "Received cert manager response" + outgoing_pending = outgoing, + incoming_pending = incoming, + route_info_response = message_counts.get("route_info_response").copied().unwrap_or(0), + cert_manager_response = message_counts.get("cert_manager_response").copied().unwrap_or(0), + backend_removed = message_counts.get("backend_removed").copied().unwrap_or(0), + "Proxy channel stats (last 60s)" ); - cert_manager.receive(response); + message_counts.clear(); } - MessageToProxy::BackendRemoved { backend } => { - state.inner.route_map.remove_backend(&backend); - state.inner.monitor.remove_backend(&backend); + message_result = conn.recv() => { + let Some(message) = message_result else { + break; + }; + + match &message { + MessageToProxy::RouteInfoResponse(_) => { + *message_counts.entry("route_info_response").or_insert(0) += 1; + } + MessageToProxy::CertManagerResponse(_) => { + *message_counts.entry("cert_manager_response").or_insert(0) += 1; + } + MessageToProxy::BackendRemoved { .. } => { + *message_counts.entry("backend_removed").or_insert(0) += 1; + } + } + + match message { + MessageToProxy::RouteInfoResponse(response) => { + state.inner.route_map.receive(response); + } + MessageToProxy::CertManagerResponse(response) => { + tracing::info!( + response = response.as_value(), + "Received cert manager response" + ); + cert_manager.receive(response); + } + MessageToProxy::BackendRemoved { backend } => { + state.inner.route_map.remove_backend(&backend); + state.inner.monitor.remove_backend(&backend); + } + } } } }