-
Notifications
You must be signed in to change notification settings - Fork 50
Run message processing tasks in parallel #878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,7 +9,7 @@ use plane_common::{ | |||||||||||||||||
| ApiErrorKind, BackendAction, BackendActionMessage, Heartbeat, KeyDeadlines, | ||||||||||||||||||
| MessageFromDrone, MessageToDrone, RenewKeyResponse, | ||||||||||||||||||
| }, | ||||||||||||||||||
| typed_socket::{server::new_server, TypedSocket}, | ||||||||||||||||||
| typed_socket::{server::new_server, TypedSocketSender}, | ||||||||||||||||||
| types::{ | ||||||||||||||||||
| backend_state::TerminationReason, ClusterName, DronePoolName, NodeId, TerminationKind, | ||||||||||||||||||
| }, | ||||||||||||||||||
|
|
@@ -41,7 +41,7 @@ pub async fn handle_message_from_drone( | |||||||||||||||||
| msg: MessageFromDrone, | ||||||||||||||||||
| drone_id: NodeId, | ||||||||||||||||||
| controller: &Controller, | ||||||||||||||||||
| sender: &mut TypedSocket<MessageToDrone>, | ||||||||||||||||||
| sender: TypedSocketSender<MessageToDrone>, | ||||||||||||||||||
| ) -> anyhow::Result<()> { | ||||||||||||||||||
| match msg { | ||||||||||||||||||
| MessageFromDrone::BackendMetrics(metrics_msg) => { | ||||||||||||||||||
|
|
@@ -146,7 +146,7 @@ pub async fn sweep_loop(db: PlaneDatabase, drone_id: NodeId) { | |||||||||||||||||
|
|
||||||||||||||||||
| pub async fn process_pending_actions( | ||||||||||||||||||
| db: &PlaneDatabase, | ||||||||||||||||||
| socket: &mut TypedSocket<MessageToDrone>, | ||||||||||||||||||
| socket: TypedSocketSender<MessageToDrone>, | ||||||||||||||||||
| drone_id: &NodeId, | ||||||||||||||||||
| ) -> Result<(), anyhow::Error> { | ||||||||||||||||||
| let mut count = 0; | ||||||||||||||||||
|
|
@@ -200,7 +200,7 @@ pub async fn drone_socket_inner( | |||||||||||||||||
| let mut backend_actions: Subscription<BackendActionMessage> = | ||||||||||||||||||
| controller.db.subscribe_with_key(&drone_id.to_string()); | ||||||||||||||||||
|
|
||||||||||||||||||
| process_pending_actions(&controller.db, &mut socket, &drone_id).await?; | ||||||||||||||||||
| process_pending_actions(&controller.db, socket.sender(), &drone_id).await?; | ||||||||||||||||||
|
|
||||||||||||||||||
| let mut interval = tokio::time::interval(Duration::from_secs(5)); | ||||||||||||||||||
| interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | ||||||||||||||||||
|
|
@@ -212,7 +212,13 @@ pub async fn drone_socket_inner( | |||||||||||||||||
| loop { | ||||||||||||||||||
| tokio::select! { | ||||||||||||||||||
| _ = interval.tick() => { | ||||||||||||||||||
| process_pending_actions(&controller.db, &mut socket, &drone_id).await?; | ||||||||||||||||||
| let sender = socket.sender(); | ||||||||||||||||||
| let db = controller.db.clone(); | ||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||
| if let Err(err) = process_pending_actions(&db, sender, &drone_id).await { | ||||||||||||||||||
| tracing::error!(?err, "Error processing pending actions"); | ||||||||||||||||||
| } | ||||||||||||||||||
| }); | ||||||||||||||||||
| } | ||||||||||||||||||
| _ = log_interval.tick() => { | ||||||||||||||||||
| let (outgoing, incoming) = socket.channel_depths(); | ||||||||||||||||||
|
|
@@ -263,9 +269,14 @@ pub async fn drone_socket_inner( | |||||||||||||||||
| *message_counts.entry("backend_metrics").or_insert(0) += 1; | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, &mut socket).await { | ||||||||||||||||||
| tracing::error!(?err, "Error handling message from drone"); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| let sender = socket.sender(); | ||||||||||||||||||
| let controller = controller.clone(); | ||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await { | ||||||||||||||||||
| tracing::error!(?err, "Error handling message from drone"); | ||||||||||||||||||
| } | ||||||||||||||||||
| }); | ||||||||||||||||||
|
Comment on lines
+275
to
+279
|
||||||||||||||||||
| tokio::spawn(async move { | |
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await { | |
| tracing::error!(?err, "Error handling message from drone"); | |
| } | |
| }); | |
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await { | |
| tracing::error!(?err, "Error handling message from drone"); | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,7 +14,7 @@ use plane_common::{ | |||||||||||||||||
| ApiErrorKind, CertManagerRequest, CertManagerResponse, MessageFromProxy, MessageToProxy, | ||||||||||||||||||
| RouteInfoRequest, RouteInfoResponse, | ||||||||||||||||||
| }, | ||||||||||||||||||
| typed_socket::{server::new_server, TypedSocket}, | ||||||||||||||||||
| typed_socket::{server::new_server, TypedSocketSender}, | ||||||||||||||||||
| types::{BackendState, BearerToken, ClusterName, NodeId}, | ||||||||||||||||||
| }; | ||||||||||||||||||
| use std::{ | ||||||||||||||||||
|
|
@@ -28,7 +28,7 @@ use valuable::Valuable; | |||||||||||||||||
| pub async fn handle_route_info_request( | ||||||||||||||||||
| token: BearerToken, | ||||||||||||||||||
| controller: &Controller, | ||||||||||||||||||
| socket: &mut TypedSocket<MessageToProxy>, | ||||||||||||||||||
| socket: TypedSocketSender<MessageToProxy>, | ||||||||||||||||||
| ) -> anyhow::Result<()> { | ||||||||||||||||||
| match controller.db.backend().route_info_for_token(&token).await { | ||||||||||||||||||
| // When a proxy requests a route, either: | ||||||||||||||||||
|
|
@@ -79,65 +79,58 @@ pub async fn handle_route_info_request( | |||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| let socket = socket.sender(MessageToProxy::RouteInfoResponse); | ||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||
| loop { | ||||||||||||||||||
| // Note: this timeout is arbitrary to avoid a memory leak. Under normal system operation, the critical | ||||||||||||||||||
| // timeout will be that of the backend failing to start. We use a large timeout to avoid it becoming | ||||||||||||||||||
| // the critical timeout when the system is functioning. | ||||||||||||||||||
| let result = match tokio::time::timeout( | ||||||||||||||||||
| std::time::Duration::from_secs(30 * 60 /* 30 minutes */), | ||||||||||||||||||
| sub.next(), | ||||||||||||||||||
| ) | ||||||||||||||||||
| .await | ||||||||||||||||||
| { | ||||||||||||||||||
| Ok(Some(result)) => result, | ||||||||||||||||||
| Ok(None) => { | ||||||||||||||||||
| tracing::error!("Event subscription closed!"); | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| Err(_) => { | ||||||||||||||||||
| tracing::error!("Timeout waiting for backend state"); | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| }; | ||||||||||||||||||
| let socket = socket.wrap(MessageToProxy::RouteInfoResponse); | ||||||||||||||||||
|
|
||||||||||||||||||
| let Notification { payload, .. } = result; | ||||||||||||||||||
| loop { | ||||||||||||||||||
| // Note: this timeout is arbitrary to avoid a memory leak. Under normal system operation, the critical | ||||||||||||||||||
| // timeout will be that of the backend failing to start. We use a large timeout to avoid it becoming | ||||||||||||||||||
| // the critical timeout when the system is functioning. | ||||||||||||||||||
| let result = match tokio::time::timeout( | ||||||||||||||||||
| std::time::Duration::from_secs(30 * 60 /* 30 minutes */), | ||||||||||||||||||
| sub.next(), | ||||||||||||||||||
| ) | ||||||||||||||||||
| .await | ||||||||||||||||||
| { | ||||||||||||||||||
| Ok(Some(result)) => result, | ||||||||||||||||||
| Ok(None) => { | ||||||||||||||||||
| tracing::error!("Event subscription closed!"); | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| Err(_) => { | ||||||||||||||||||
| tracing::error!("Timeout waiting for backend state"); | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| }; | ||||||||||||||||||
|
|
||||||||||||||||||
| match payload { | ||||||||||||||||||
| BackendState::Ready { address } => { | ||||||||||||||||||
| let route_info = partial_route_info.set_address(address); | ||||||||||||||||||
| let response = RouteInfoResponse { | ||||||||||||||||||
| token, | ||||||||||||||||||
| route_info: Some(route_info), | ||||||||||||||||||
| }; | ||||||||||||||||||
| if let Err(err) = socket.send(response) { | ||||||||||||||||||
| tracing::error!( | ||||||||||||||||||
| ?err, | ||||||||||||||||||
| "Error sending route info response to proxy." | ||||||||||||||||||
| ); | ||||||||||||||||||
| } | ||||||||||||||||||
| break; | ||||||||||||||||||
| let Notification { payload, .. } = result; | ||||||||||||||||||
|
|
||||||||||||||||||
| match payload { | ||||||||||||||||||
| BackendState::Ready { address } => { | ||||||||||||||||||
| let route_info = partial_route_info.set_address(address); | ||||||||||||||||||
| let response = RouteInfoResponse { | ||||||||||||||||||
| token, | ||||||||||||||||||
| route_info: Some(route_info), | ||||||||||||||||||
| }; | ||||||||||||||||||
| if let Err(err) = socket.send(response) { | ||||||||||||||||||
| tracing::error!(?err, "Error sending route info response to proxy."); | ||||||||||||||||||
| } | ||||||||||||||||||
| BackendState::Terminated { .. } | ||||||||||||||||||
| | BackendState::Terminating { .. } | ||||||||||||||||||
| | BackendState::HardTerminating { .. } => { | ||||||||||||||||||
| let response = RouteInfoResponse { | ||||||||||||||||||
| token, | ||||||||||||||||||
| route_info: None, | ||||||||||||||||||
| }; | ||||||||||||||||||
| if let Err(err) = socket.send(response) { | ||||||||||||||||||
| tracing::error!( | ||||||||||||||||||
| ?err, | ||||||||||||||||||
| "Error sending route info response to proxy." | ||||||||||||||||||
| ); | ||||||||||||||||||
| } | ||||||||||||||||||
| break; | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| BackendState::Terminated { .. } | ||||||||||||||||||
| | BackendState::Terminating { .. } | ||||||||||||||||||
| | BackendState::HardTerminating { .. } => { | ||||||||||||||||||
| let response = RouteInfoResponse { | ||||||||||||||||||
| token, | ||||||||||||||||||
| route_info: None, | ||||||||||||||||||
| }; | ||||||||||||||||||
| if let Err(err) = socket.send(response) { | ||||||||||||||||||
| tracing::error!(?err, "Error sending route info response to proxy."); | ||||||||||||||||||
| } | ||||||||||||||||||
| _ => {} | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| _ => {} | ||||||||||||||||||
| } | ||||||||||||||||||
| }); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| Ok(RouteInfoResult::NotFound) => { | ||||||||||||||||||
| let response = RouteInfoResponse { | ||||||||||||||||||
|
|
@@ -159,7 +152,7 @@ pub async fn handle_route_info_request( | |||||||||||||||||
| pub async fn handle_message_from_proxy( | ||||||||||||||||||
| message: MessageFromProxy, | ||||||||||||||||||
| controller: &Controller, | ||||||||||||||||||
| socket: &mut TypedSocket<MessageToProxy>, | ||||||||||||||||||
| socket: TypedSocketSender<MessageToProxy>, | ||||||||||||||||||
| cluster: &ClusterName, | ||||||||||||||||||
| node_id: NodeId, | ||||||||||||||||||
| ) -> anyhow::Result<()> { | ||||||||||||||||||
|
|
@@ -301,7 +294,15 @@ pub async fn proxy_socket_inner( | |||||||||||||||||
| *message_counts.entry("cert_manager_request").or_insert(0) += 1; | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| handle_message_from_proxy(message, &controller, &mut socket, &cluster, node_guard.id).await? | ||||||||||||||||||
|
|
||||||||||||||||||
| let sender = socket.sender(); | ||||||||||||||||||
| let controller = controller.clone(); | ||||||||||||||||||
| let cluster = cluster.clone(); | ||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||
| if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await { | ||||||||||||||||||
| tracing::error!(?err, "Error handling message from proxy"); | ||||||||||||||||||
| } | ||||||||||||||||||
| }); | ||||||||||||||||||
|
Comment on lines
+301
to
+305
|
||||||||||||||||||
| tokio::spawn(async move { | |
| if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await { | |
| tracing::error!(?err, "Error handling message from proxy"); | |
| } | |
| }); | |
| if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await { | |
| tracing::error!(?err, "Error handling message from proxy"); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spawning
process_pending_actionsin a task every 5 seconds could lead to multiple concurrent executions if a previous call takes longer than 5 seconds to complete. This could result in duplicate action messages being sent to the drone or increased database load. Consider checking if a previous execution is still running before spawning a new task, or use a mechanism to ensure only one instance runs at a time.