Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/src/typed_socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
}

impl<T: ChannelMessage> TypedSocket<T> {
pub fn send(&mut self, message: T) -> Result<(), PlaneClientError> {

Check warning on line 70 in common/src/typed_socket/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

the `Err`-variant returned from this function is very large

warning: the `Err`-variant returned from this function is very large --> common/src/typed_socket/mod.rs:70:43 | 70 | pub fn send(&mut self, message: T) -> Result<(), PlaneClientError> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ::: common/src/lib.rs:52:5 | 52 | Tungstenite(#[from] tokio_tungstenite::tungstenite::Error), | ---------------------------------------------------------- the largest variant contains at least 136 bytes | = help: try reducing the size of `PlaneClientError`, for example by boxing large elements or replacing it with `Box<PlaneClientError>` = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.92.0/index.html#result_large_err
self.send
.try_send(SocketAction::Send(message))
.map_err(|_| PlaneClientError::SendFailed)?;
Expand Down Expand Up @@ -99,6 +99,13 @@
pub async fn close(&mut self) {
let _ = self.send.send(SocketAction::Close).await;
}

/// Returns (pending_outgoing, pending_incoming) message counts
pub fn channel_depths(&self) -> (usize, usize) {
let outgoing = self.send.max_capacity() - self.send.capacity();
let incoming = self.recv.len();
(outgoing, incoming)
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
37 changes: 37 additions & 0 deletions plane/src/controller/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use plane_common::{
};
use serde::Deserialize;
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
time::Duration,
};
Expand Down Expand Up @@ -204,11 +205,30 @@ pub async fn drone_socket_inner(
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut log_interval = tokio::time::interval(Duration::from_secs(60));
log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut message_counts: HashMap<&'static str, u64> = HashMap::new();

loop {
tokio::select! {
_ = interval.tick() => {
process_pending_actions(&controller.db, &mut socket, &drone_id).await?;
}
_ = log_interval.tick() => {
let (outgoing, incoming) = socket.channel_depths();
tracing::info!(
drone_id = drone_id.as_i32(),
outgoing_pending = outgoing,
incoming_pending = incoming,
heartbeat = message_counts.get("heartbeat").copied().unwrap_or(0),
backend_event = message_counts.get("backend_event").copied().unwrap_or(0),
ack_action = message_counts.get("ack_action").copied().unwrap_or(0),
renew_key = message_counts.get("renew_key").copied().unwrap_or(0),
backend_metrics = message_counts.get("backend_metrics").copied().unwrap_or(0),
"Drone channel stats (last 60s)"
);
message_counts.clear();
}
backend_action_result = backend_actions.next() => {
match backend_action_result {
Some(backend_action) => {
Expand All @@ -226,6 +246,23 @@ pub async fn drone_socket_inner(
message_from_drone_result = socket.recv() => {
match message_from_drone_result {
Some(message_from_drone) => {
match &message_from_drone {
MessageFromDrone::Heartbeat(_) => {
*message_counts.entry("heartbeat").or_insert(0) += 1;
}
MessageFromDrone::BackendEvent(_) => {
*message_counts.entry("backend_event").or_insert(0) += 1;
}
MessageFromDrone::AckAction { .. } => {
*message_counts.entry("ack_action").or_insert(0) += 1;
}
MessageFromDrone::RenewKey(_) => {
*message_counts.entry("renew_key").or_insert(0) += 1;
}
MessageFromDrone::BackendMetrics(_) => {
*message_counts.entry("backend_metrics").or_insert(0) += 1;
}
}
if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, &mut socket).await {
tracing::error!(?err, "Error handling message from drone");
}
Expand Down
38 changes: 36 additions & 2 deletions plane/src/controller/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use plane_common::{
typed_socket::{server::new_server, TypedSocket},
types::{BackendState, BearerToken, ClusterName, NodeId},
};
use std::net::{IpAddr, SocketAddr};
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
time::Duration,
};
use tokio::select;
use valuable::Valuable;

Expand Down Expand Up @@ -264,11 +268,41 @@ pub async fn proxy_socket_inner(

let mut event_subscription: Subscription<BackendState> = controller.db.subscribe();

let mut log_interval = tokio::time::interval(Duration::from_secs(60));
log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut message_counts: HashMap<&'static str, u64> = HashMap::new();

loop {
select! {
_ = log_interval.tick() => {
let (outgoing, incoming) = socket.channel_depths();
tracing::info!(
node_id = node_guard.id.as_i32(),
outgoing_pending = outgoing,
incoming_pending = incoming,
route_info_request = message_counts.get("route_info_request").copied().unwrap_or(0),
keep_alive = message_counts.get("keep_alive").copied().unwrap_or(0),
cert_manager_request = message_counts.get("cert_manager_request").copied().unwrap_or(0),
"Proxy channel stats (last 60s)"
);
message_counts.clear();
}
message_from_proxy_result = socket.recv() => {
match message_from_proxy_result {
Some(message) => handle_message_from_proxy(message, &controller, &mut socket, &cluster, node_guard.id).await?,
Some(message) => {
match &message {
MessageFromProxy::RouteInfoRequest(_) => {
*message_counts.entry("route_info_request").or_insert(0) += 1;
}
MessageFromProxy::KeepAlive(_) => {
*message_counts.entry("keep_alive").or_insert(0) += 1;
}
MessageFromProxy::CertManagerRequest(_) => {
*message_counts.entry("cert_manager_request").or_insert(0) += 1;
}
}
handle_message_from_proxy(message, &controller, &mut socket, &cluster, node_guard.id).await?
}
None => {
tracing::info!("Proxy socket closed");
break;
Expand Down
55 changes: 44 additions & 11 deletions plane/src/drone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use runtime::docker::DockerRuntime;
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fs::{set_permissions, File, Permissions},
net::IpAddr,
os::unix::fs::PermissionsExt,
Expand Down Expand Up @@ -93,19 +94,51 @@ pub async fn drone_loop(
}
}

let mut log_interval = tokio::time::interval(std::time::Duration::from_secs(60));
log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut message_counts: HashMap<&'static str, u64> = HashMap::new();

loop {
let Some(message) = socket.recv().await else {
tracing::warn!("Connection closed.");
break;
};
tokio::select! {
_ = log_interval.tick() => {
let (outgoing, incoming) = socket.channel_depths();
tracing::info!(
outgoing_pending = outgoing,
incoming_pending = incoming,
action = message_counts.get("action").copied().unwrap_or(0),
ack_event = message_counts.get("ack_event").copied().unwrap_or(0),
renew_key_response = message_counts.get("renew_key_response").copied().unwrap_or(0),
"Drone channel stats (last 60s)"
);
message_counts.clear();
}
message_result = socket.recv() => {
let Some(message) = message_result else {
tracing::warn!("Connection closed.");
break;
};

match &message {
MessageToDrone::Action(_) => {
*message_counts.entry("action").or_insert(0) += 1;
}
MessageToDrone::AckEvent { .. } => {
*message_counts.entry("ack_event").or_insert(0) += 1;
}
MessageToDrone::RenewKeyResponse(_) => {
*message_counts.entry("renew_key_response").or_insert(0) += 1;
}
}

let key_manager = key_manager.clone();
tokio::spawn(handle_message(
message,
key_manager,
socket.sender(|x| x),
executor.clone(),
));
let key_manager = key_manager.clone();
tokio::spawn(handle_message(
message,
key_manager,
socket.sender(|x| x),
executor.clone(),
));
}
}
}
}

Expand Down
62 changes: 49 additions & 13 deletions plane/src/proxy/proxy_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use plane_common::{
types::ClusterName,
PlaneClient,
};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::task::JoinHandle;
use valuable::Valuable;

Expand Down Expand Up @@ -58,21 +58,57 @@ impl ProxyConnection {
}
});

while let Some(message) = conn.recv().await {
match message {
MessageToProxy::RouteInfoResponse(response) => {
state.inner.route_map.receive(response);
}
MessageToProxy::CertManagerResponse(response) => {
let mut log_interval = tokio::time::interval(Duration::from_secs(60));
log_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut message_counts: HashMap<&'static str, u64> = HashMap::new();

loop {
tokio::select! {
_ = log_interval.tick() => {
let (outgoing, incoming) = conn.channel_depths();
tracing::info!(
response = response.as_value(),
"Received cert manager response"
outgoing_pending = outgoing,
incoming_pending = incoming,
route_info_response = message_counts.get("route_info_response").copied().unwrap_or(0),
cert_manager_response = message_counts.get("cert_manager_response").copied().unwrap_or(0),
backend_removed = message_counts.get("backend_removed").copied().unwrap_or(0),
"Proxy channel stats (last 60s)"
);
cert_manager.receive(response);
message_counts.clear();
}
MessageToProxy::BackendRemoved { backend } => {
state.inner.route_map.remove_backend(&backend);
state.inner.monitor.remove_backend(&backend);
message_result = conn.recv() => {
let Some(message) = message_result else {
break;
};

match &message {
MessageToProxy::RouteInfoResponse(_) => {
*message_counts.entry("route_info_response").or_insert(0) += 1;
}
MessageToProxy::CertManagerResponse(_) => {
*message_counts.entry("cert_manager_response").or_insert(0) += 1;
}
MessageToProxy::BackendRemoved { .. } => {
*message_counts.entry("backend_removed").or_insert(0) += 1;
}
}

match message {
MessageToProxy::RouteInfoResponse(response) => {
state.inner.route_map.receive(response);
}
MessageToProxy::CertManagerResponse(response) => {
tracing::info!(
response = response.as_value(),
"Received cert manager response"
);
cert_manager.receive(response);
}
MessageToProxy::BackendRemoved { backend } => {
state.inner.route_map.remove_backend(&backend);
state.inner.monitor.remove_backend(&backend);
}
}
}
}
}
Expand Down
Loading