Skip to content

Run message processing tasks in parallel#878

Merged
paulgb merged 4 commits into
mainfrom
paulgb/parallel-process-messages
Jan 11, 2026
Merged

Run message processing tasks in parallel#878
paulgb merged 4 commits into
mainfrom
paulgb/parallel-process-messages

Conversation

@paulgb
Copy link
Copy Markdown
Member

@paulgb paulgb commented Jan 11, 2026

No description provided.

@vercel
Copy link
Copy Markdown

vercel Bot commented Jan 11, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Review Updated (UTC)
plane Error Error Jan 11, 2026 11:48pm

@paulgb paulgb changed the title Non-wrapping sender Run message processing tasks in parallel Jan 11, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces parallel message processing for drone and proxy socket handlers, refactors the TypedSocket API to support better composability, and modernizes format string syntax throughout the codebase.

Changes:

  • Refactors message handling in controller to spawn tasks for concurrent processing instead of sequential handling
  • Modernizes TypedSocket API with separate TypedSocketSender and WrappedTypedSocketSender types
  • Updates format strings to use captured identifiers (format!("{var}") instead of format!("{}", var))

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
common/src/typed_socket/mod.rs Refactors sender API with new TypedSocketSender and WrappedTypedSocketSender types, replacing the previous closure-based approach
plane/src/controller/drone.rs Implements parallel message processing by spawning tasks for handle_message_from_drone and process_pending_actions
plane/src/controller/proxy.rs Implements parallel message processing by spawning tasks for handle_message_from_proxy
plane/src/drone/mod.rs Updates to use new sender().wrap() API pattern
plane/src/drone/heartbeat.rs Updates type signatures to use WrappedTypedSocketSender
plane/src/drone/key_manager.rs Updates type signatures to use WrappedTypedSocketSender
plane/src/proxy/proxy_connection.rs Updates to use new sender().wrap() API pattern
plane/plane-tests/tests/proxy_cors.rs Fixes test to match actual CORS header implementation
plane/plane-tests/tests/cert_manager.rs Adds #[ignore] attributes to two failing tests
plane/src/util.rs Modernizes format strings
plane/src/typed_unix_socket/mod.rs Modernizes format strings
plane/src/proxy/request.rs Modernizes format strings
plane/src/proxy/cert_manager.rs Modernizes format strings
plane/src/drone/runtime/docker/commands.rs Modernizes format strings
plane/src/drone/executor.rs Modernizes format strings in panic messages
plane/src/dns/mod.rs Modernizes format strings
plane/src/dns/error.rs Modernizes format strings
plane/src/controller/drain.rs Modernizes format strings
plane/src/bin/db-cli.rs Modernizes format strings
plane/src/admin.rs Modernizes format strings
dynamic-proxy/tests/*.rs Modernizes format strings in tests
plane/plane-tests/tests/*.rs Modernizes format strings in tests

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +215 to +221
let sender = socket.sender();
let db = controller.db.clone();
tokio::spawn(async move {
if let Err(err) = process_pending_actions(&db, sender, &drone_id).await {
tracing::error!(?err, "Error processing pending actions");
}
});
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spawning process_pending_actions in a task every 5 seconds could lead to multiple concurrent executions if a previous call takes longer than 5 seconds to complete. This could result in duplicate action messages being sent to the drone or increased database load. Consider checking if a previous execution is still running before spawning a new task, or use a mechanism to ensure only one instance runs at a time.

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +279
tokio::spawn(async move {
if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await {
tracing::error!(?err, "Error handling message from drone");
}
});
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallel processing of BackendEvent messages introduces a potential race condition. When multiple backend events for the same backend are received in quick succession, they will now be processed concurrently by different tasks. This could lead to out-of-order state updates in the database if the events are meant to represent a sequential state transition (e.g., Scheduling -> Ready -> Terminating).

Consider whether the database's update_state method handles concurrent updates correctly, or if there needs to be ordering guarantees for events from the same backend. If state transitions must be processed sequentially per backend, you may need to use a per-backend task queue or locking mechanism.

Suggested change
tokio::spawn(async move {
if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await {
tracing::error!(?err, "Error handling message from drone");
}
});
if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await {
tracing::error!(?err, "Error handling message from drone");
}

Copilot uses AI. Check for mistakes.
Comment on lines +308 to +312
tokio::spawn(async move {
if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await {
tracing::error!(?err, "Error handling message from proxy");
}
});
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parallel processing of messages from the proxy introduces potential concurrency issues. Multiple RouteInfoRequest or KeepAlive messages could now be processed simultaneously, which might lead to race conditions in the database operations or unexpected ordering of responses.

For example, if multiple KeepAlive messages for the same backend arrive in quick succession, the concurrent update_keepalive calls could potentially interfere with each other. Consider whether these operations need to be serialized per backend or if the database layer already handles this correctly.

Suggested change
tokio::spawn(async move {
if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await {
tracing::error!(?err, "Error handling message from proxy");
}
});
if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await {
tracing::error!(?err, "Error handling message from proxy");
}

Copilot uses AI. Check for mistakes.
@paulgb paulgb merged commit bb8a64f into main Jan 11, 2026
4 of 6 checks passed
@paulgb paulgb deleted the paulgb/parallel-process-messages branch January 11, 2026 23:49
rolyatmax added a commit that referenced this pull request Jan 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants