From c6f348ba04a9b834ec9a1c47145da3db63ee81f7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:02:41 +0000 Subject: [PATCH 1/3] Initial plan From 02cb1d20b329a6b33067771e5cc1bcdda48c1f4b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:21:39 +0000 Subject: [PATCH 2/3] fix: mark losing-branch nodes as cancelled after df.race completes After a df.race() workflow completes, the losing branch's nodes in df.instance_nodes were left in 'running' or 'pending' status, causing ghost in-flight work to appear in diagnostics and dashboards. Changes: - Add 'cancelled' to nodes_status_chk constraint (src/lib.rs) - Create migration sql/pg_durable--0.2.2--0.2.3.sql to widen constraint - Bump version to 0.2.3 in Cargo.toml - Add cancel_subtree_nodes activity (bulk-cancels non-terminal nodes) - Register new activity in activities/mod.rs and registry.rs - Add collect_subtree_node_ids() helper (DFS traversal of node graph) - Modify execute_race_node() to cancel losing-branch nodes after winner - Add e2e test 24_race_loser_cancelled.sql - Update CHANGELOG.md and docs/upgrade-testing.md Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- CHANGELOG.md | 1 + Cargo.lock | 2 +- Cargo.toml | 2 +- docs/upgrade-testing.md | 8 ++ sql/pg_durable--0.2.2--0.2.3.sql | 16 +++ src/activities/cancel_subtree_nodes.rs | 60 ++++++++++ src/activities/mod.rs | 1 + src/lib.rs | 2 +- src/orchestrations/execute_function_graph.rs | 83 ++++++++++++- src/registry.rs | 5 + tests/e2e/sql/24_race_loser_cancelled.sql | 118 +++++++++++++++++++ 11 files changed, 289 insertions(+), 9 deletions(-) create mode 100644 sql/pg_durable--0.2.2--0.2.3.sql create mode 100644 src/activities/cancel_subtree_nodes.rs create mode 100644 tests/e2e/sql/24_race_loser_cancelled.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index cdbef3ec..6866b7b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc ## Unreleased +- Fix: `df.instance_nodes` no longer shows losing-branch nodes as `running` or `pending` after a `df.race` completes. The orchestrator now marks all non-terminal nodes in the losing branch as `cancelled` once the winning branch is decided. Schema change: `nodes_status_chk` is widened to include `'cancelled'`; upgrade DDL is in `sql/pg_durable--0.2.2--0.2.3.sql`. - Fix: `df.signal()` now propagates the event to running sub-orchestrations spawned by `df.race` / `df.join` / `df.join3`, so `df.wait_for_signal` inside a parallel branch wakes as expected. Known limitation: signals raised before the target sub-orchestration is in the `Running` state are not yet redelivered when it starts; a proper fix requires unmatched-event forwarding in duroxide (#154). - Fix: `df.start()`, RLS policies on `df.instances` / `df.nodes` / `df.vars`, and `df.vars` reads/writes no longer fail with `role "..." does not exist` when `current_user` requires quoting (mixed case, spaces, embedded quotes). All `current_user::regrole` casts are now wrapped with `quote_ident()` so the role lookup preserves the original identifier casing (#161, #162). Schema upgrade DDL is in `sql/pg_durable--0.2.1--0.2.2.sql`. diff --git a/Cargo.lock b/Cargo.lock index ca1c17ac..55e3e3d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,7 +1892,7 @@ dependencies = [ [[package]] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" dependencies = [ "bigdecimal", "chrono", diff --git a/Cargo.toml b/Cargo.toml index cb769e6c..317e864a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" edition = "2021" license = "MIT" repository = "https://github.com/Azure/pg_durable" diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 567b831d..c48d2e6c 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -192,6 +192,14 @@ No additional fixture is needed for subsequent minors — intermediate versions Each schema-changing PR should add a section here documenting what changed, what the upgrade script handles, and any backward compatibility considerations. +### v0.2.2 → v0.2.3 + +#### Race loser node cancellation (fixes df.instance_nodes ghost nodes) +- **DDL change:** `nodes_status_chk` on `df.nodes` is dropped and recreated to include `'cancelled'` in the allowed status set. The existing `nodes_result_status_chk` (`result IS NULL OR status IN ('completed', 'failed')`) is unchanged — cancelled nodes carry no result. Upgrade DDL is in `sql/pg_durable--0.2.2--0.2.3.sql`. +- **Scenario A considerations:** Schema comparison must verify that `nodes_status_chk` now allows `'cancelled'` on both fresh installs and upgraded databases. +- **Scenario B1 considerations:** The new `.so` continues to work against v0.2.2 schemas (the constraint is `NOT VALID`, so pre-existing rows are unaffected). The new `cancel_subtree_nodes` activity that writes `'cancelled'` will fail the constraint only on schemas that have not been upgraded; for those deployments the activity will error and the orchestrator will log a warning, but existing workflows are otherwise unaffected. The ghost-node symptom remains until the schema is upgraded. +- **Scenario B2 considerations:** No data migration needed. All existing `df.nodes` rows have status in `('pending', 'running', 'completed', 'failed')` and continue to satisfy the widened constraint. + ### v0.2.1 → v0.2.2 #### #162 quote_ident-wrapped `current_user::regrole` (fixes #161) diff --git a/sql/pg_durable--0.2.2--0.2.3.sql b/sql/pg_durable--0.2.2--0.2.3.sql new file mode 100644 index 00000000..2005e93b --- /dev/null +++ b/sql/pg_durable--0.2.2--0.2.3.sql @@ -0,0 +1,16 @@ +-- pg_durable upgrade: 0.2.2 → 0.2.3 +-- +-- Fix: df.instance_nodes leaves race-loser nodes as 'running' or 'pending' after +-- a race completes. The orchestrator now marks all non-terminal nodes in the +-- losing branch of a RACE as 'cancelled' once the winning branch finishes. +-- +-- To support the new 'cancelled' node status, the nodes_status_chk constraint is +-- widened to include it. The nodes_result_status_chk constraint is unchanged: +-- cancelled nodes carry no result, so result IS NULL already satisfies it. + +ALTER TABLE df.nodes + DROP CONSTRAINT IF EXISTS nodes_status_chk; + +ALTER TABLE df.nodes + ADD CONSTRAINT nodes_status_chk + CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) NOT VALID; diff --git a/src/activities/cancel_subtree_nodes.rs b/src/activities/cancel_subtree_nodes.rs new file mode 100644 index 00000000..80adc47c --- /dev/null +++ b/src/activities/cancel_subtree_nodes.rs @@ -0,0 +1,60 @@ +//! CancelSubtreeNodes activity - marks all non-terminal nodes in a list as 'cancelled' +//! +//! Used after a RACE winner is determined to clean up the losing branch's node records. + +use duroxide::ActivityContext; +use sqlx::PgPool; +use std::sync::Arc; + +/// Activity name for registration and scheduling +pub const NAME: &str = "pg_durable::activity::cancel-subtree-nodes"; + +/// Mark all non-terminal nodes in `node_ids` as 'cancelled'. +/// +/// Nodes that are already in a terminal state (`completed`, `failed`, `cancelled`) +/// are left untouched; only `pending` and `running` nodes are updated. +pub async fn execute( + ctx: ActivityContext, + pool: Arc, + input_json: String, +) -> Result { + let input: serde_json::Value = serde_json::from_str(&input_json) + .map_err(|e| format!("Failed to parse cancel-subtree-nodes input: {e}"))?; + + let node_ids: Vec = input["node_ids"] + .as_array() + .ok_or("Missing node_ids array")? + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + + if node_ids.is_empty() { + return Ok("No nodes to cancel".to_string()); + } + + ctx.trace_info(format!("Cancelling {} losing-branch nodes", node_ids.len())); + + // Bulk-update only nodes that are still in a non-terminal state so we + // never overwrite a 'completed' or 'failed' result from a branch that + // happened to finish before the race was decided. + let result = sqlx::query( + "UPDATE df.nodes + SET status = 'cancelled', updated_at = now() + WHERE id = ANY($1) AND status NOT IN ('completed', 'failed', 'cancelled')", + ) + .bind(&node_ids[..]) + .execute(pool.as_ref()) + .await; + + match result { + Ok(r) => { + ctx.trace_info(format!("Cancelled {} node(s)", r.rows_affected())); + Ok(format!("Cancelled {} node(s)", r.rows_affected())) + } + Err(e) => { + let err_msg = format!("Failed to cancel subtree nodes: {e}"); + ctx.trace_info(&err_msg); + Err(err_msg) + } + } +} diff --git a/src/activities/mod.rs b/src/activities/mod.rs index ae1a7db0..41d1a70e 100644 --- a/src/activities/mod.rs +++ b/src/activities/mod.rs @@ -3,6 +3,7 @@ //! Each activity is in its own file with a co-located NAME constant. //! This enables IDE navigation (F12 jumps to implementation). +pub mod cancel_subtree_nodes; pub mod execute_http; pub mod execute_sql; pub mod load_function_graph; diff --git a/src/lib.rs b/src/lib.rs index be84c0d4..d4a8b97f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -240,7 +240,7 @@ ALTER TABLE df.nodes ADD CONSTRAINT nodes_result_name_chk CHECK (result_name IS NULL OR result_name ~ '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID, ADD CONSTRAINT nodes_status_chk - CHECK (status IN ('pending', 'running', 'completed', 'failed')) NOT VALID, + CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) NOT VALID, ADD CONSTRAINT nodes_result_status_chk CHECK (result IS NULL OR status IN ('completed', 'failed')) NOT VALID, ADD CONSTRAINT nodes_structure_chk diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index 67183d67..d17a960b 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -806,6 +806,57 @@ async fn execute_join_node( Ok(result) } +/// Collect all node IDs in the subtree rooted at `root_id` by depth-first traversal. +/// +/// Handles the full node structure: +/// - `left_node` / `right_node` for THEN, IF, JOIN, RACE, LOOP +/// - `condition_node` embedded in `query` JSON for IF and LOOP nodes +/// - `extra_nodes` array embedded in `query` JSON for JOIN nodes (join3, etc.) +fn collect_subtree_node_ids(graph: &FunctionGraph, root_id: &str) -> Vec { + let mut ids: Vec = Vec::new(); + let mut stack = vec![root_id.to_string()]; + + while let Some(node_id) = stack.pop() { + // Guard against visiting the same node twice (not expected in valid graphs, + // but prevents any potential infinite loop). + if ids.contains(&node_id) { + continue; + } + let Some(node) = graph.nodes.get(&node_id) else { + continue; + }; + ids.push(node_id.clone()); + + // Follow structural children + if let Some(left) = &node.left_node { + stack.push(left.clone()); + } + if let Some(right) = &node.right_node { + stack.push(right.clone()); + } + + // Follow children embedded in the query config JSON + if let Some(config_str) = &node.query { + if let Ok(config) = serde_json::from_str::(config_str) { + // IF and LOOP: condition_node is a plain node ID string + if let Some(cond_id) = config["condition_node"].as_str() { + stack.push(cond_id.to_string()); + } + // JOIN (join3, etc.): extra_nodes is an array of node ID strings + if let Some(extras) = config["extra_nodes"].as_array() { + for extra in extras { + if let Some(id) = extra.as_str() { + stack.push(id.to_string()); + } + } + } + } + } + } + + ids +} + async fn execute_race_node( ctx: &OrchestrationContext, graph: &FunctionGraph, @@ -856,18 +907,38 @@ async fn execute_race_node( // Use ctx.select2() - first to complete wins // select2 now returns Either2 instead of (winner_idx, DurableOutput) - let raw = match ctx.select2(left_fut, right_fut).await { + let (raw, loser_root_id) = match ctx.select2(left_fut, right_fut).await { duroxide::Either2::First(Ok(r)) => { ctx.trace_info("RACE completed - left branch won"); - Ok(r) + (Ok(r), right_id.clone()) } - duroxide::Either2::First(Err(e)) => Err(format!("RACE left branch failed: {e}")), + duroxide::Either2::First(Err(e)) => (Err(format!("RACE left branch failed: {e}")), right_id.clone()), duroxide::Either2::Second(Ok(r)) => { ctx.trace_info("RACE completed - right branch won"); - Ok(r) + (Ok(r), left_id.clone()) } - duroxide::Either2::Second(Err(e)) => Err(format!("RACE right branch failed: {e}")), - }?; + duroxide::Either2::Second(Err(e)) => (Err(format!("RACE right branch failed: {e}")), left_id.clone()), + }; + + // Cancel all non-terminal nodes in the losing branch so that df.instance_nodes + // does not show ghost running/pending work after the race has been decided. + let loser_node_ids = collect_subtree_node_ids(graph, &loser_root_id); + if !loser_node_ids.is_empty() { + ctx.trace_info(format!( + "Cancelling {} losing-branch node(s) (root: {})", + loser_node_ids.len(), + loser_root_id + )); + let cancel_input = serde_json::json!({ "node_ids": loser_node_ids }); + let _ = ctx + .schedule_activity( + activities::cancel_subtree_nodes::NAME, + cancel_input.to_string(), + ) + .await; + } + + let raw = raw?; // Parse the subtree output envelope produced by execute_subtree and merge any named // results from the winning branch into the parent results map. diff --git a/src/registry.rs b/src/registry.rs index bb0a57ee..7ddccc32 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -15,6 +15,7 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let graph_pool = pool.clone(); let status_pool = pool.clone(); let node_status_pool = pool.clone(); + let cancel_subtree_pool = pool.clone(); let http_pool = pool.clone(); ActivityRegistry::builder() @@ -34,6 +35,10 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let pool = node_status_pool.clone(); async move { activities::update_node_status::execute(ctx, pool, input_json).await } }) + .register(activities::cancel_subtree_nodes::NAME, move |ctx: ActivityContext, input_json: String| { + let pool = cancel_subtree_pool.clone(); + async move { activities::cancel_subtree_nodes::execute(ctx, pool, input_json).await } + }) .register(activities::execute_http::NAME, move |ctx: ActivityContext, config_json: String| { let pool = http_pool.clone(); async move { activities::execute_http::execute(ctx, pool, config_json).await } diff --git a/tests/e2e/sql/24_race_loser_cancelled.sql b/tests/e2e/sql/24_race_loser_cancelled.sql new file mode 100644 index 00000000..478ca3d2 --- /dev/null +++ b/tests/e2e/sql/24_race_loser_cancelled.sql @@ -0,0 +1,118 @@ +-- Tests: df.instance_nodes marks losing-branch nodes as 'cancelled' after race completes +-- +-- Repro for: df.instance_nodes leaves race-loser nodes running or pending after +-- race completion. +-- +-- Setup: a race where one branch completes instantly and the other waits with a +-- long sleep. After the race completes, every losing-branch node must have +-- status = 'cancelled' in df.nodes; none should remain 'running' or 'pending'. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Scenario 1: fast SQL wins, long sleep loses === + +CREATE TEMP TABLE _race_loser_cancelled_state (instance_id TEXT); + +INSERT INTO _race_loser_cancelled_state +SELECT df.start( + df.race( + 'SELECT ''fast'' AS winner', + df.sleep(60) + ), + 'test-race-loser-cancelled' +); + +DO $$ +DECLARE + inst_id TEXT; + v_status TEXT; + ghost_count INT; +BEGIN + SELECT instance_id INTO inst_id FROM _race_loser_cancelled_state; + RAISE NOTICE 'Testing race loser cancellation for instance: %', inst_id; + + SELECT df.wait_for_completion(inst_id, 30) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: expected completed, got %', v_status; + END IF; + + -- Allow a short settle window for the cancel activity to write through + PERFORM pg_sleep(2); + + -- No node in the instance should remain 'running' or 'pending' after the + -- race has been decided. + SELECT COUNT(*) INTO ghost_count + FROM df.instance_nodes(inst_id) + WHERE status IN ('running', 'pending'); + + IF ghost_count > 0 THEN + RAISE EXCEPTION + 'TEST FAILED [race-loser-cancelled]: % node(s) still running/pending after race completed', + ghost_count; + END IF; + + -- Verify the losing branch root node (the sleep) is specifically 'cancelled' + IF NOT EXISTS ( + SELECT 1 + FROM df.instance_nodes(inst_id) + WHERE node_type = 'SLEEP' AND status = 'cancelled' + ) THEN + RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: SLEEP node not marked as cancelled'; + END IF; + + RAISE NOTICE 'TEST PASSED: race_loser_cancelled (scenario 1)'; +END $$; + +DROP TABLE _race_loser_cancelled_state; + +-- === Scenario 2: losing branch is a multi-node sequence (THEN + SQL) === + +CREATE TEMP TABLE _race_loser_seq_state (instance_id TEXT); + +INSERT INTO _race_loser_seq_state +SELECT df.start( + df.race( + 'SELECT ''fast'' AS winner', + df.seq( + df.sleep(60), + 'SELECT ''slow-follow-up''' + ) + ), + 'test-race-loser-seq' +); + +DO $$ +DECLARE + inst_id TEXT; + v_status TEXT; + ghost_count INT; +BEGIN + SELECT instance_id INTO inst_id FROM _race_loser_seq_state; + RAISE NOTICE 'Testing race loser cancellation (multi-node) for instance: %', inst_id; + + SELECT df.wait_for_completion(inst_id, 30) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [race-loser-seq]: expected completed, got %', v_status; + END IF; + + PERFORM pg_sleep(2); + + SELECT COUNT(*) INTO ghost_count + FROM df.instance_nodes(inst_id) + WHERE status IN ('running', 'pending'); + + IF ghost_count > 0 THEN + RAISE EXCEPTION + 'TEST FAILED [race-loser-seq]: % node(s) still running/pending after race completed', + ghost_count; + END IF; + + RAISE NOTICE 'TEST PASSED: race_loser_cancelled (scenario 2: multi-node loser)'; +END $$; + +DROP TABLE _race_loser_seq_state; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED: race loser nodes cancelled' AS result; From f744a7771fd695668704f602ab6aa6fb8bfa1b2d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:26:12 +0000 Subject: [PATCH 3/3] fix: address code review feedback - Update comment in cancel_subtree_nodes.rs to list all three terminal statuses ('completed', 'failed', 'cancelled') that are guarded - Add trace logging when cancel-subtree-nodes activity fails (best-effort, does not fail the workflow) instead of silently discarding the error - Replace hardcoded pg_sleep(2) in e2e test with poll loop (50 x 200ms) to avoid flakiness while still detecting the settled state quickly Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- src/activities/cancel_subtree_nodes.rs | 6 ++-- src/orchestrations/execute_function_graph.rs | 14 ++++++-- tests/e2e/sql/24_race_loser_cancelled.sql | 34 +++++++++++++------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/activities/cancel_subtree_nodes.rs b/src/activities/cancel_subtree_nodes.rs index 80adc47c..c8d3a414 100644 --- a/src/activities/cancel_subtree_nodes.rs +++ b/src/activities/cancel_subtree_nodes.rs @@ -34,9 +34,9 @@ pub async fn execute( ctx.trace_info(format!("Cancelling {} losing-branch nodes", node_ids.len())); - // Bulk-update only nodes that are still in a non-terminal state so we - // never overwrite a 'completed' or 'failed' result from a branch that - // happened to finish before the race was decided. + // Bulk-update only nodes that are still in a non-terminal state so we never + // overwrite a 'completed', 'failed', or already-'cancelled' node — any of these + // are terminal and must not be disturbed. let result = sqlx::query( "UPDATE df.nodes SET status = 'cancelled', updated_at = now() diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index d17a960b..f5d190d8 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -930,12 +930,22 @@ async fn execute_race_node( loser_root_id )); let cancel_input = serde_json::json!({ "node_ids": loser_node_ids }); - let _ = ctx + // Best-effort: a failure here does not affect the race result but will + // leave losing-branch nodes in a non-terminal state. Log so operators + // can observe the problem without failing the workflow. + match ctx .schedule_activity( activities::cancel_subtree_nodes::NAME, cancel_input.to_string(), ) - .await; + .await + { + Ok(_) => {} + Err(e) => ctx.trace_info(format!( + "Warning: failed to cancel losing-branch nodes (root: {}): {e}", + loser_root_id + )), + } } let raw = raw?; diff --git a/tests/e2e/sql/24_race_loser_cancelled.sql b/tests/e2e/sql/24_race_loser_cancelled.sql index 478ca3d2..34e9cb46 100644 --- a/tests/e2e/sql/24_race_loser_cancelled.sql +++ b/tests/e2e/sql/24_race_loser_cancelled.sql @@ -27,6 +27,7 @@ DECLARE inst_id TEXT; v_status TEXT; ghost_count INT; + attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _race_loser_cancelled_state; RAISE NOTICE 'Testing race loser cancellation for instance: %', inst_id; @@ -37,14 +38,19 @@ BEGIN RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: expected completed, got %', v_status; END IF; - -- Allow a short settle window for the cancel activity to write through - PERFORM pg_sleep(2); + -- Poll until all nodes reach a terminal state (or time out after ~10 s). + -- The cancel activity is scheduled asynchronously so there may be a short + -- lag between the instance completing and the losing-branch nodes being + -- written to 'cancelled'. + LOOP + SELECT COUNT(*) INTO ghost_count + FROM df.instance_nodes(inst_id) + WHERE status IN ('running', 'pending'); - -- No node in the instance should remain 'running' or 'pending' after the - -- race has been decided. - SELECT COUNT(*) INTO ghost_count - FROM df.instance_nodes(inst_id) - WHERE status IN ('running', 'pending'); + EXIT WHEN ghost_count = 0 OR attempts >= 50; + PERFORM pg_sleep(0.2); + attempts := attempts + 1; + END LOOP; IF ghost_count > 0 THEN RAISE EXCEPTION @@ -87,6 +93,7 @@ DECLARE inst_id TEXT; v_status TEXT; ghost_count INT; + attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _race_loser_seq_state; RAISE NOTICE 'Testing race loser cancellation (multi-node) for instance: %', inst_id; @@ -97,11 +104,16 @@ BEGIN RAISE EXCEPTION 'TEST FAILED [race-loser-seq]: expected completed, got %', v_status; END IF; - PERFORM pg_sleep(2); + -- Poll until all nodes reach a terminal state (or time out after ~10 s). + LOOP + SELECT COUNT(*) INTO ghost_count + FROM df.instance_nodes(inst_id) + WHERE status IN ('running', 'pending'); - SELECT COUNT(*) INTO ghost_count - FROM df.instance_nodes(inst_id) - WHERE status IN ('running', 'pending'); + EXIT WHEN ghost_count = 0 OR attempts >= 50; + PERFORM pg_sleep(0.2); + attempts := attempts + 1; + END LOOP; IF ghost_count > 0 THEN RAISE EXCEPTION