Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc

## Unreleased

- Fix: `df.instance_nodes` no longer shows losing-branch nodes as `running` or `pending` after a `df.race` completes. The orchestrator now marks all non-terminal nodes in the losing branch as `cancelled` once the winning branch is decided. Schema change: `nodes_status_chk` is widened to include `'cancelled'`; upgrade DDL is in `sql/pg_durable--0.2.2--0.2.3.sql`.
- Fix: `df.signal()` now propagates the event to running sub-orchestrations spawned by `df.race` / `df.join` / `df.join3`, so `df.wait_for_signal` inside a parallel branch wakes as expected. Known limitation: signals raised before the target sub-orchestration is in the `Running` state are not yet redelivered when it starts; a proper fix requires unmatched-event forwarding in duroxide (#154).
- Fix: `df.start()`, RLS policies on `df.instances` / `df.nodes` / `df.vars`, and `df.vars` reads/writes no longer fail with `role "..." does not exist` when `current_user` requires quoting (mixed case, spaces, embedded quotes). All `current_user::regrole` casts are now wrapped with `quote_ident()` so the role lookup preserves the original identifier casing (#161, #162). Schema upgrade DDL is in `sql/pg_durable--0.2.1--0.2.2.sql`.

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pg_durable"
version = "0.2.2"
version = "0.2.3"
edition = "2021"
license = "MIT"
repository = "https://github.com/Azure/pg_durable"
Expand Down
8 changes: 8 additions & 0 deletions docs/upgrade-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ No additional fixture is needed for subsequent minors — intermediate versions
Each schema-changing PR should add a section here documenting what changed,
what the upgrade script handles, and any backward compatibility considerations.

### v0.2.2 → v0.2.3

#### Race loser node cancellation (fixes df.instance_nodes ghost nodes)
- **DDL change:** `nodes_status_chk` on `df.nodes` is dropped and recreated to include `'cancelled'` in the allowed status set. The existing `nodes_result_status_chk` (`result IS NULL OR status IN ('completed', 'failed')`) is unchanged — cancelled nodes carry no result. Upgrade DDL is in `sql/pg_durable--0.2.2--0.2.3.sql`.
- **Scenario A considerations:** Schema comparison must verify that `nodes_status_chk` now allows `'cancelled'` on both fresh installs and upgraded databases.
- **Scenario B1 considerations:** The new `.so` continues to work against v0.2.2 schemas (the constraint is `NOT VALID`, so pre-existing rows are unaffected). The new `cancel_subtree_nodes` activity that writes `'cancelled'` will fail the constraint only on schemas that have not been upgraded; for those deployments the activity will error and the orchestrator will log a warning, but existing workflows are otherwise unaffected. The ghost-node symptom remains until the schema is upgraded.
- **Scenario B2 considerations:** No data migration needed. All existing `df.nodes` rows have status in `('pending', 'running', 'completed', 'failed')` and continue to satisfy the widened constraint.

### v0.2.1 → v0.2.2

#### #162 quote_ident-wrapped `current_user::regrole` (fixes #161)
Expand Down
16 changes: 16 additions & 0 deletions sql/pg_durable--0.2.2--0.2.3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- pg_durable upgrade: 0.2.2 → 0.2.3
--
-- Fix: df.instance_nodes leaves race-loser nodes as 'running' or 'pending' after
-- a race completes. The orchestrator now marks all non-terminal nodes in the
-- losing branch of a RACE as 'cancelled' once the winning branch finishes.
--
-- To support the new 'cancelled' node status, the nodes_status_chk constraint is
-- widened to include it. The nodes_result_status_chk constraint is unchanged:
-- cancelled nodes carry no result, so result IS NULL already satisfies it.

ALTER TABLE df.nodes
DROP CONSTRAINT IF EXISTS nodes_status_chk;

ALTER TABLE df.nodes
ADD CONSTRAINT nodes_status_chk
CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) NOT VALID;
60 changes: 60 additions & 0 deletions src/activities/cancel_subtree_nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! CancelSubtreeNodes activity - marks all non-terminal nodes in a list as 'cancelled'
//!
//! Used after a RACE winner is determined to clean up the losing branch's node records.

use duroxide::ActivityContext;
use sqlx::PgPool;
use std::sync::Arc;

/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::cancel-subtree-nodes";

/// Mark all non-terminal nodes in `node_ids` as 'cancelled'.
///
/// Nodes that are already in a terminal state (`completed`, `failed`, `cancelled`)
/// are left untouched; only `pending` and `running` nodes are updated.
pub async fn execute(
ctx: ActivityContext,
pool: Arc<PgPool>,
input_json: String,
) -> Result<String, String> {
let input: serde_json::Value = serde_json::from_str(&input_json)
.map_err(|e| format!("Failed to parse cancel-subtree-nodes input: {e}"))?;

let node_ids: Vec<String> = input["node_ids"]
.as_array()
.ok_or("Missing node_ids array")?
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();

if node_ids.is_empty() {
return Ok("No nodes to cancel".to_string());
}

ctx.trace_info(format!("Cancelling {} losing-branch nodes", node_ids.len()));

// Bulk-update only nodes that are still in a non-terminal state so we never
// overwrite a 'completed', 'failed', or already-'cancelled' node — any of these
// are terminal and must not be disturbed.
let result = sqlx::query(
"UPDATE df.nodes
SET status = 'cancelled', updated_at = now()
WHERE id = ANY($1) AND status NOT IN ('completed', 'failed', 'cancelled')",
)
.bind(&node_ids[..])
.execute(pool.as_ref())
.await;

match result {
Ok(r) => {
ctx.trace_info(format!("Cancelled {} node(s)", r.rows_affected()));
Ok(format!("Cancelled {} node(s)", r.rows_affected()))
}
Err(e) => {
let err_msg = format!("Failed to cancel subtree nodes: {e}");
ctx.trace_info(&err_msg);
Err(err_msg)
}
}
}
1 change: 1 addition & 0 deletions src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Each activity is in its own file with a co-located NAME constant.
//! This enables IDE navigation (F12 jumps to implementation).

pub mod cancel_subtree_nodes;
pub mod execute_http;
pub mod execute_sql;
pub mod load_function_graph;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ ALTER TABLE df.nodes
ADD CONSTRAINT nodes_result_name_chk
CHECK (result_name IS NULL OR result_name ~ '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID,
ADD CONSTRAINT nodes_status_chk
CHECK (status IN ('pending', 'running', 'completed', 'failed')) NOT VALID,
CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')) NOT VALID,
ADD CONSTRAINT nodes_result_status_chk
CHECK (result IS NULL OR status IN ('completed', 'failed')) NOT VALID,
ADD CONSTRAINT nodes_structure_chk
Expand Down
93 changes: 87 additions & 6 deletions src/orchestrations/execute_function_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,57 @@ async fn execute_join_node(
Ok(result)
}

/// Collect all node IDs in the subtree rooted at `root_id` by depth-first traversal.
///
/// Handles the full node structure:
/// - `left_node` / `right_node` for THEN, IF, JOIN, RACE, LOOP
/// - `condition_node` embedded in `query` JSON for IF and LOOP nodes
/// - `extra_nodes` array embedded in `query` JSON for JOIN nodes (join3, etc.)
fn collect_subtree_node_ids(graph: &FunctionGraph, root_id: &str) -> Vec<String> {
let mut ids: Vec<String> = Vec::new();
let mut stack = vec![root_id.to_string()];

while let Some(node_id) = stack.pop() {
// Guard against visiting the same node twice (not expected in valid graphs,
// but prevents any potential infinite loop).
if ids.contains(&node_id) {
continue;
}
let Some(node) = graph.nodes.get(&node_id) else {
continue;
};
ids.push(node_id.clone());

// Follow structural children
if let Some(left) = &node.left_node {
stack.push(left.clone());
}
if let Some(right) = &node.right_node {
stack.push(right.clone());
}

// Follow children embedded in the query config JSON
if let Some(config_str) = &node.query {
if let Ok(config) = serde_json::from_str::<serde_json::Value>(config_str) {
// IF and LOOP: condition_node is a plain node ID string
if let Some(cond_id) = config["condition_node"].as_str() {
stack.push(cond_id.to_string());
}
// JOIN (join3, etc.): extra_nodes is an array of node ID strings
if let Some(extras) = config["extra_nodes"].as_array() {
for extra in extras {
if let Some(id) = extra.as_str() {
stack.push(id.to_string());
}
}
}
}
}
}

ids
}

async fn execute_race_node(
ctx: &OrchestrationContext,
graph: &FunctionGraph,
Expand Down Expand Up @@ -856,18 +907,48 @@ async fn execute_race_node(

// Use ctx.select2() - first to complete wins
// select2 now returns Either2<Left, Right> instead of (winner_idx, DurableOutput)
let raw = match ctx.select2(left_fut, right_fut).await {
let (raw, loser_root_id) = match ctx.select2(left_fut, right_fut).await {
duroxide::Either2::First(Ok(r)) => {
ctx.trace_info("RACE completed - left branch won");
Ok(r)
(Ok(r), right_id.clone())
}
duroxide::Either2::First(Err(e)) => Err(format!("RACE left branch failed: {e}")),
duroxide::Either2::First(Err(e)) => (Err(format!("RACE left branch failed: {e}")), right_id.clone()),
duroxide::Either2::Second(Ok(r)) => {
ctx.trace_info("RACE completed - right branch won");
Ok(r)
(Ok(r), left_id.clone())
}
duroxide::Either2::Second(Err(e)) => Err(format!("RACE right branch failed: {e}")),
}?;
duroxide::Either2::Second(Err(e)) => (Err(format!("RACE right branch failed: {e}")), left_id.clone()),
};

// Cancel all non-terminal nodes in the losing branch so that df.instance_nodes
// does not show ghost running/pending work after the race has been decided.
let loser_node_ids = collect_subtree_node_ids(graph, &loser_root_id);
if !loser_node_ids.is_empty() {
ctx.trace_info(format!(
"Cancelling {} losing-branch node(s) (root: {})",
loser_node_ids.len(),
loser_root_id
));
let cancel_input = serde_json::json!({ "node_ids": loser_node_ids });
// Best-effort: a failure here does not affect the race result but will
// leave losing-branch nodes in a non-terminal state. Log so operators
// can observe the problem without failing the workflow.
match ctx
.schedule_activity(
activities::cancel_subtree_nodes::NAME,
cancel_input.to_string(),
)
.await
{
Ok(_) => {}
Err(e) => ctx.trace_info(format!(
"Warning: failed to cancel losing-branch nodes (root: {}): {e}",
loser_root_id
)),
}
}

let raw = raw?;

// Parse the subtree output envelope produced by execute_subtree and merge any named
// results from the winning branch into the parent results map.
Expand Down
5 changes: 5 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
let graph_pool = pool.clone();
let status_pool = pool.clone();
let node_status_pool = pool.clone();
let cancel_subtree_pool = pool.clone();
let http_pool = pool.clone();

ActivityRegistry::builder()
Expand All @@ -34,6 +35,10 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
let pool = node_status_pool.clone();
async move { activities::update_node_status::execute(ctx, pool, input_json).await }
})
.register(activities::cancel_subtree_nodes::NAME, move |ctx: ActivityContext, input_json: String| {
let pool = cancel_subtree_pool.clone();
async move { activities::cancel_subtree_nodes::execute(ctx, pool, input_json).await }
})
.register(activities::execute_http::NAME, move |ctx: ActivityContext, config_json: String| {
let pool = http_pool.clone();
async move { activities::execute_http::execute(ctx, pool, config_json).await }
Expand Down
130 changes: 130 additions & 0 deletions tests/e2e/sql/24_race_loser_cancelled.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
-- Tests: df.instance_nodes marks losing-branch nodes as 'cancelled' after race completes
--
-- Repro for: df.instance_nodes leaves race-loser nodes running or pending after
-- race completion.
--
-- Setup: a race where one branch completes instantly and the other waits with a
-- long sleep. After the race completes, every losing-branch node must have
-- status = 'cancelled' in df.nodes; none should remain 'running' or 'pending'.

SET SESSION AUTHORIZATION df_e2e_user;

-- === Scenario 1: fast SQL wins, long sleep loses ===

CREATE TEMP TABLE _race_loser_cancelled_state (instance_id TEXT);

INSERT INTO _race_loser_cancelled_state
SELECT df.start(
df.race(
'SELECT ''fast'' AS winner',
df.sleep(60)
),
'test-race-loser-cancelled'
);

DO $$
DECLARE
inst_id TEXT;
v_status TEXT;
ghost_count INT;
attempts INT := 0;
BEGIN
SELECT instance_id INTO inst_id FROM _race_loser_cancelled_state;
RAISE NOTICE 'Testing race loser cancellation for instance: %', inst_id;

SELECT df.wait_for_completion(inst_id, 30) INTO v_status;

IF v_status != 'completed' THEN
RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: expected completed, got %', v_status;
END IF;

-- Poll until all nodes reach a terminal state (or time out after ~10 s).
-- The cancel activity is scheduled asynchronously so there may be a short
-- lag between the instance completing and the losing-branch nodes being
-- written to 'cancelled'.
LOOP
SELECT COUNT(*) INTO ghost_count
FROM df.instance_nodes(inst_id)
WHERE status IN ('running', 'pending');

EXIT WHEN ghost_count = 0 OR attempts >= 50;
PERFORM pg_sleep(0.2);
attempts := attempts + 1;
END LOOP;

IF ghost_count > 0 THEN
RAISE EXCEPTION
'TEST FAILED [race-loser-cancelled]: % node(s) still running/pending after race completed',
ghost_count;
END IF;

-- Verify the losing branch root node (the sleep) is specifically 'cancelled'
IF NOT EXISTS (
SELECT 1
FROM df.instance_nodes(inst_id)
WHERE node_type = 'SLEEP' AND status = 'cancelled'
) THEN
RAISE EXCEPTION 'TEST FAILED [race-loser-cancelled]: SLEEP node not marked as cancelled';
END IF;

RAISE NOTICE 'TEST PASSED: race_loser_cancelled (scenario 1)';
END $$;

DROP TABLE _race_loser_cancelled_state;

-- === Scenario 2: losing branch is a multi-node sequence (THEN + SQL) ===

CREATE TEMP TABLE _race_loser_seq_state (instance_id TEXT);

INSERT INTO _race_loser_seq_state
SELECT df.start(
df.race(
'SELECT ''fast'' AS winner',
df.seq(
df.sleep(60),
'SELECT ''slow-follow-up'''
)
),
'test-race-loser-seq'
);

DO $$
DECLARE
inst_id TEXT;
v_status TEXT;
ghost_count INT;
attempts INT := 0;
BEGIN
SELECT instance_id INTO inst_id FROM _race_loser_seq_state;
RAISE NOTICE 'Testing race loser cancellation (multi-node) for instance: %', inst_id;

SELECT df.wait_for_completion(inst_id, 30) INTO v_status;

IF v_status != 'completed' THEN
RAISE EXCEPTION 'TEST FAILED [race-loser-seq]: expected completed, got %', v_status;
END IF;

-- Poll until all nodes reach a terminal state (or time out after ~10 s).
LOOP
SELECT COUNT(*) INTO ghost_count
FROM df.instance_nodes(inst_id)
WHERE status IN ('running', 'pending');

EXIT WHEN ghost_count = 0 OR attempts >= 50;
PERFORM pg_sleep(0.2);
attempts := attempts + 1;
END LOOP;

IF ghost_count > 0 THEN
RAISE EXCEPTION
'TEST FAILED [race-loser-seq]: % node(s) still running/pending after race completed',
ghost_count;
END IF;

RAISE NOTICE 'TEST PASSED: race_loser_cancelled (scenario 2: multi-node loser)';
END $$;

DROP TABLE _race_loser_seq_state;

RESET SESSION AUTHORIZATION;
SELECT 'TEST PASSED: race loser nodes cancelled' AS result;