Run message processing tasks in parallel#878
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
0f44eb0 to
7dd9f09
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces parallel message processing for drone and proxy socket handlers, refactors the TypedSocket API to support better composability, and modernizes format string syntax throughout the codebase.
Changes:
- Refactors message handling in controller to spawn tasks for concurrent processing instead of sequential handling
- Modernizes TypedSocket API with separate
TypedSocketSenderandWrappedTypedSocketSendertypes - Updates format strings to use captured identifiers (
format!("{var}")instead offormat!("{}", var))
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| common/src/typed_socket/mod.rs | Refactors sender API with new TypedSocketSender and WrappedTypedSocketSender types, replacing the previous closure-based approach |
| plane/src/controller/drone.rs | Implements parallel message processing by spawning tasks for handle_message_from_drone and process_pending_actions |
| plane/src/controller/proxy.rs | Implements parallel message processing by spawning tasks for handle_message_from_proxy |
| plane/src/drone/mod.rs | Updates to use new sender().wrap() API pattern |
| plane/src/drone/heartbeat.rs | Updates type signatures to use WrappedTypedSocketSender |
| plane/src/drone/key_manager.rs | Updates type signatures to use WrappedTypedSocketSender |
| plane/src/proxy/proxy_connection.rs | Updates to use new sender().wrap() API pattern |
| plane/plane-tests/tests/proxy_cors.rs | Fixes test to match actual CORS header implementation |
| plane/plane-tests/tests/cert_manager.rs | Adds #[ignore] attributes to two failing tests |
| plane/src/util.rs | Modernizes format strings |
| plane/src/typed_unix_socket/mod.rs | Modernizes format strings |
| plane/src/proxy/request.rs | Modernizes format strings |
| plane/src/proxy/cert_manager.rs | Modernizes format strings |
| plane/src/drone/runtime/docker/commands.rs | Modernizes format strings |
| plane/src/drone/executor.rs | Modernizes format strings in panic messages |
| plane/src/dns/mod.rs | Modernizes format strings |
| plane/src/dns/error.rs | Modernizes format strings |
| plane/src/controller/drain.rs | Modernizes format strings |
| plane/src/bin/db-cli.rs | Modernizes format strings |
| plane/src/admin.rs | Modernizes format strings |
| dynamic-proxy/tests/*.rs | Modernizes format strings in tests |
| plane/plane-tests/tests/*.rs | Modernizes format strings in tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let sender = socket.sender(); | ||
| let db = controller.db.clone(); | ||
| tokio::spawn(async move { | ||
| if let Err(err) = process_pending_actions(&db, sender, &drone_id).await { | ||
| tracing::error!(?err, "Error processing pending actions"); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Spawning process_pending_actions in a task every 5 seconds could lead to multiple concurrent executions if a previous call takes longer than 5 seconds to complete. This could result in duplicate action messages being sent to the drone or increased database load. Consider checking if a previous execution is still running before spawning a new task, or use a mechanism to ensure only one instance runs at a time.
| tokio::spawn(async move { | ||
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await { | ||
| tracing::error!(?err, "Error handling message from drone"); | ||
| } | ||
| }); |
There was a problem hiding this comment.
The parallel processing of BackendEvent messages introduces a potential race condition. When multiple backend events for the same backend are received in quick succession, they will now be processed concurrently by different tasks. This could lead to out-of-order state updates in the database if the events are meant to represent a sequential state transition (e.g., Scheduling -> Ready -> Terminating).
Consider whether the database's update_state method handles concurrent updates correctly, or if there needs to be ordering guarantees for events from the same backend. If state transitions must be processed sequentially per backend, you may need to use a per-backend task queue or locking mechanism.
| tokio::spawn(async move { | |
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await { | |
| tracing::error!(?err, "Error handling message from drone"); | |
| } | |
| }); | |
| if let Err(err) = handle_message_from_drone(message_from_drone, drone_id, &controller, sender).await { | |
| tracing::error!(?err, "Error handling message from drone"); | |
| } |
| tokio::spawn(async move { | ||
| if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await { | ||
| tracing::error!(?err, "Error handling message from proxy"); | ||
| } | ||
| }); |
There was a problem hiding this comment.
The parallel processing of messages from the proxy introduces potential concurrency issues. Multiple RouteInfoRequest or KeepAlive messages could now be processed simultaneously, which might lead to race conditions in the database operations or unexpected ordering of responses.
For example, if multiple KeepAlive messages for the same backend arrive in quick succession, the concurrent update_keepalive calls could potentially interfere with each other. Consider whether these operations need to be serialized per backend or if the database layer already handles this correctly.
| tokio::spawn(async move { | |
| if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await { | |
| tracing::error!(?err, "Error handling message from proxy"); | |
| } | |
| }); | |
| if let Err(err) = handle_message_from_proxy(message, &controller, sender, &cluster, node_guard.id).await { | |
| tracing::error!(?err, "Error handling message from proxy"); | |
| } |
This reverts commit bb8a64f.
No description provided.