diff --git a/Cargo.lock b/Cargo.lock index ca1c17a..55e3e3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,7 +1892,7 @@ dependencies = [ [[package]] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" dependencies = [ "bigdecimal", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 6bf0644..45a400a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" edition = "2021" license = "PostgreSQL" repository = "https://github.com/microsoft/pg_durable" diff --git a/docs/move-duroxide-schema.md b/docs/move-duroxide-schema.md new file mode 100644 index 0000000..eeb859f --- /dev/null +++ b/docs/move-duroxide-schema.md @@ -0,0 +1,230 @@ +# Move Duroxide Provider Schema + +Issue: [Move PostgresProvider's schema out of "duroxide" microsoft/pg_durable#175](https://github.com/microsoft/pg_durable/issues/175) + +## Goal + +Move pg_durable's internal duroxide provider schema away from the generic `duroxide` name for new installations, while preserving existing installations that already have an extension-owned `duroxide` schema. + +The chosen default provider schema name for new installations is: + +```text +_duroxide +``` + +Rationale for the name: + +- Bare identifier (no quoting required anywhere). `_` is a legal leading character for PostgreSQL identifiers. +- The leading underscore signals "internal / not part of the public API," matching common PostgreSQL convention for implementation-detail objects. +- Makes the relationship to duroxide-pg obvious without overloading a more generic prefix like `_df`. + +There is no GUC. The schema name is an implementation detail of pg_durable, not an operator-facing setting. + +## Current State + +The provider schema is currently hardcoded as `duroxide` in several places: + +- `src/types.rs` defines `DUROXIDE_SCHEMA = "duroxide"` and passes it to both backend and worker `duroxide_pg::ProviderConfig` values. +- `src/lib.rs` creates `CREATE SCHEMA duroxide;` as an extension-owned schema during `CREATE EXTENSION pg_durable`. +- `src/worker.rs` checks that `duroxide` exists and is owned by the `pg_durable` extension before running `MigrationPolicy::ApplyAll`. +- `src/worker.rs` writes readiness state to `duroxide._worker_ready`. +- `src/client.rs`, E2E setup SQL, upgrade tests, and helper scripts poll `duroxide._worker_ready`. + +The security model intentionally depends on two properties: + +1. `CREATE EXTENSION` creates the provider schema without `IF NOT EXISTS`, so a pre-existing schema with that name blocks installation instead of being adopted. +2. The background worker verifies the provider schema is extension-owned before applying duroxide migrations. + +Any implementation must preserve both properties. + +## Compatibility Requirement + +Already-shipped versions in Azure and open source assume the provider schema is named `duroxide`: + +- Azure-shipped: v0.1.1, v0.2.1, v0.2.2 in progress +- Open source supported baseline: v0.2.2 + +Therefore, a new binary must continue to work with existing databases where `pg_durable` already owns a `duroxide` schema. Existing instances and engine state must remain in place and must not be migrated implicitly to a different schema. + +The compatibility rules: + +- If the install records `duroxide` as its provider schema, use `duroxide`. +- If the install records `_duroxide` (or any future name) as its provider schema, use that name. +- Do not rename, copy, drop, or migrate provider state automatically. +- A fresh `CREATE EXTENSION pg_durable` under the new SQL creates and uses `_duroxide`. +- A `.so` upgrade that arrives **without** `ALTER EXTENSION pg_durable UPDATE` must continue to operate against the legacy `duroxide` schema (see "Selection algorithm" below). + +There is **no in-place migration path** from `duroxide` to `_duroxide` for an existing cluster that wants to adopt the new name while preserving engine state. The only supported "adopt the new name" path is `DROP EXTENSION pg_durable CASCADE` followed by `CREATE EXTENSION pg_durable`, which is a destructive reset of durable engine state. This is acknowledged as a deliberate non-goal of this work. + +## Design Overview + +Rather than a GUC, the selected provider schema is exposed by a small extension-owned SQL function: + +```sql +CREATE FUNCTION df.duroxide_schema() RETURNS TEXT + LANGUAGE SQL IMMUTABLE PARALLEL SAFE + AS $$ SELECT '_duroxide'::TEXT $$; +``` + +Both the install SQL and any future upgrade scripts are responsible for defining this function with the correct value for the lifecycle path being taken: + +- The **fresh install** SQL (the new version's primary install script) defines the function to return `'_duroxide'`. +- The **upgrade script** `pg_durable--0.2.2--.sql` defines the function to return `'duroxide'`. This pins existing clusters to their already-created legacy schema deterministically, regardless of any other heuristics. + +The background worker and backend sessions read the value once at startup (or whenever they need it) and use it everywhere the provider schema is referenced. + +### Why a function instead of a table? + +- Mirrors the existing pattern of [`df.target_database()`](../src/lib.rs) — a parameterless function used to expose install-time configuration to validation SQL and to Rust code. +- No row management, no `CHECK` constraints to enforce a single row, no `UPDATE` ergonomics. +- The value is baked into an extension-owned object, which makes it tamper-resistant by default (non-superusers cannot `CREATE OR REPLACE` it). +- Changing the value across versions is a straightforward `CREATE OR REPLACE FUNCTION` in the relevant upgrade script. + +### Selection algorithm (BGW + backend) + +At runtime the selected schema is computed once per connection / once at BGW startup: + +1. Try to call `df.duroxide_schema()`. If it returns a non-empty value, use that value. +2. If the function does not exist (PostgreSQL error code `42883`, `undefined_function`), fall back to `'duroxide'`. + +Rule 2 is the **only** fallback, and exists strictly for the documented operational reality that customers may receive a new `.so` through a maintenance update without running `ALTER EXTENSION pg_durable UPDATE`. In that case: + +- The cluster is still at the old extension version, so the helper function does not yet exist. +- The pre-existing extension-owned `duroxide` schema is the only possible provider schema. +- Falling back to `'duroxide'` is unambiguous and safe. + +The fallback is self-deleting: as soon as the operator runs `ALTER EXTENSION pg_durable UPDATE`, the function is defined (by the upgrade script) to return `'duroxide'`, and selection step 1 wins on every subsequent startup. + +No GUC source inspection, no `pg_depend` scan, no metadata-vs-GUC priority puzzle. + +## Compatibility Matrix + +| Scenario | Selection outcome | Provider schema actually used | +|---|---|---| +| Fresh `CREATE EXTENSION` on new version | Step 1: function returns `'_duroxide'` | `_duroxide` | +| Existing v0.2.2 cluster, new `.so` deployed, **no** `ALTER EXTENSION UPDATE` | Step 2: function missing, fallback | `duroxide` | +| Existing v0.2.2 cluster, new `.so` deployed, `ALTER EXTENSION UPDATE` run | Step 1: upgrade script defined function to return `'duroxide'` | `duroxide` | +| Future fresh install on v0.2.4+ where default changes again | Step 1: install script defines function to return the new value | New value | +| Operator manually drops `_duroxide` schema on a fresh install | Worker readiness check fails (extension-owned schema missing) | N/A — operator error, loud failure | + +## Implementation Plan + +### Phase 1: Schema-name abstraction + +- Replace the hardcoded `DUROXIDE_SCHEMA` constant in `src/types.rs` with a runtime-resolved value cached at BGW startup and per backend session. +- Introduce a small helper, e.g. `resolve_duroxide_schema(conn) -> String`, implementing the selection algorithm (call function, catch `42883`, fall back to `"duroxide"`). +- Update `backend_provider_config()` and `worker_provider_config()` to consume the resolved value. +- Update debug/log messages to display the resolved schema. + +### Phase 2: Install SQL changes + +- Define `df.duroxide_schema()` in the new version's install SQL, returning `'_duroxide'`. +- Replace the literal `CREATE SCHEMA duroxide;` with `CREATE SCHEMA _duroxide;` (still **without** `IF NOT EXISTS`, preserving the no-adoption rule). +- Both objects are extension members by virtue of being declared inside the extension install SQL. +- No additional install-time validation is needed: a pre-existing `_duroxide` schema makes `CREATE SCHEMA` fail, which fails `CREATE EXTENSION` — the same protection the current literal `duroxide` enjoys. + +### Phase 3: Upgrade script + +- `sql/pg_durable--0.2.2--.sql` defines `df.duroxide_schema()` returning `'duroxide'`. +- The script must **not** create `_duroxide`, must **not** rename `duroxide`, and must **not** touch existing provider state. +- The script is the contract that says "this cluster is staying on `duroxide` forever." + +### Phase 4: Worker ownership and migration flow + +- Generalize `check_duroxide_schema_owned()` to accept the resolved schema name. +- Generalize `has_extension_owned_duroxide_objects()` and `release_extension_owned_duroxide_objects()` to filter on the resolved schema. +- Generalize `write_worker_ready()` to write to `._worker_ready`. +- Keep `MigrationPolicy::ApplyAll` in the worker and `VerifyOnly` in backend sessions. +- Because `_duroxide` is a bare identifier, no special quoting is required for the new default. The schema-name string can be interpolated into SQL via the same code paths used today, but it is still good practice to use `quote_ident` for any dynamic-schema SQL to remain robust against future name choices. + +### Phase 5: Backend readiness checks + +- Generalize `is_worker_ready()` in `src/client.rs` to check `._worker_ready`. +- Retain the catalog-existence pre-check before querying the readiness table so missing-schema cases produce a clear "not ready" signal rather than a SQL error. +- Ensure non-superuser backend sessions have `USAGE` on the resolved schema and `SELECT` on `_worker_ready` (existing grants on the literal `duroxide` schema move to the new name). + +### Phase 6: Tests and scripts + +Add or update checks for: + +- Fresh install creates `_duroxide` and `df.duroxide_schema()` returns `'_duroxide'`. +- Pre-existing `_duroxide` schema blocks `CREATE EXTENSION`. +- New `.so` against an unmigrated v0.2.2 schema: + - `df.duroxide_schema()` does not exist. + - BGW resolves to `'duroxide'` via fallback. + - Existing workflows continue to run. +- After `ALTER EXTENSION UPDATE` on a v0.2.2 cluster: + - `df.duroxide_schema()` exists and returns `'duroxide'`. + - Selection step 1 is taken on subsequent restarts. + - Provider state is unchanged. +- E2E setup SQL and helper scripts no longer hardcode the string `duroxide`. Where direct SQL must reference the schema, fetch the name via `SELECT df.duroxide_schema()` with the same `42883` fallback. + +Touch points likely include: + +- `tests/e2e/sql/00_setup_playground.sql` +- `sql/00_init.sql` +- `scripts/test-e2e-local.sh` +- `scripts/test-upgrade.sh` +- Any E2E tests that directly reference `duroxide._worker_ready` + +### Phase 7: Documentation + +Update: + +- `docs/bgw-applies-migrations.md` +- `docs/extension_lifecycle.md` +- `docs/upgrade-testing.md` +- `USER_GUIDE.md` connection/troubleshooting sections if readiness probes or drop/recreate guidance changes + +Document clearly that: + +- The provider schema is an implementation detail, not a configurable setting. +- Existing `duroxide`-based installs are not migrated to `_duroxide`; they keep using `duroxide` indefinitely. +- The only way to adopt `_duroxide` on an existing cluster is `DROP EXTENSION pg_durable CASCADE` followed by `CREATE EXTENSION pg_durable`, which destroys durable engine state. + +## Security Notes + +- `df.duroxide_schema()` is created by the extension install / upgrade scripts and is therefore owned by the extension owner (typically a superuser). Non-superusers cannot `CREATE OR REPLACE` it. +- The function is `IMMUTABLE PARALLEL SAFE` and contains a literal string; no SQL injection surface. +- Falling back to `'duroxide'` on `42883` is safe because that fallback only fires when the new helper function is genuinely absent, which can only happen on a pre-upgrade-script extension version. At that version the only possible extension-owned provider schema is `duroxide`. +- The BGW must still verify extension ownership of the resolved schema before applying duroxide migrations. This invariant is unchanged. + +## Open Questions + +1. **Cache lifetime in backend sessions.** Resolving the schema per connection is cheap (one SQL call). Caching it for the process lifetime is fine because the value cannot change without an extension upgrade, which in turn requires a session reconnect to see new function definitions reliably. Recommend: resolve once on first use per session, cache for session lifetime. +2. **Whether to expose `df.duroxide_schema()` as `SECURITY DEFINER` or rely on default invoker rights.** Default invoker rights are sufficient since the function only returns a literal. Recommend: leave as default to minimize surface area. +3. **Whether to also remove the `DUROXIDE_SCHEMA` constant from any Rust test fixtures.** Yes, but only where tests run against a real PostgreSQL backend. Pure unit tests that never touch the schema can keep using a constant for clarity. + +## Validation Strategy + +Minimum validation after implementation: + +```bash +cargo fmt -p pg_durable -- --check +cargo build --features pg17 +./scripts/test-e2e-local.sh 00_setup_playground +./scripts/test-upgrade.sh --verbose +``` + +If time is short, prioritize: + +1. A targeted E2E that verifies a fresh install creates `_duroxide` and that `df.duroxide_schema()` returns `'_duroxide'`. +2. An upgrade path (B1) test that verifies the new `.so` against a v0.2.2 schema (with no `ALTER EXTENSION UPDATE` run) continues to use `duroxide` via the `42883` fallback. +3. An upgrade-then-restart test that verifies, after `ALTER EXTENSION UPDATE`, selection step 1 is taken and the cluster still uses `duroxide`. + +## Issue Update Draft + +Proposed summary to add to the GitHub issue: + +> We will rename the duroxide provider schema for new pg_durable installs to `_duroxide` (bare identifier, no quoting required, leading underscore signals internal/private). No GUC will be added — the schema name is an implementation detail of pg_durable, not an operator-facing setting. Existing installs that already own a `duroxide` schema will continue to use it indefinitely; there is no in-place migration to `_duroxide`. The selected schema is exposed by a small extension-owned function `df.duroxide_schema()`: the fresh-install SQL defines it to return `'_duroxide'`, and the `0.2.2 -> ` upgrade script defines it to return `'duroxide'`. The BGW and backend sessions resolve the schema by calling this function with a single fallback: if the function does not exist (error 42883), assume legacy `'duroxide'`. This fallback covers the case where a new `.so` is deployed without `ALTER EXTENSION UPDATE` being run, and is self-deleting once the upgrade script has run. + +## Current Recommendation + +Implement as described above. This design: + +- Removes all GUC-related ambiguity from the original proposal. +- Has a single, well-defined fallback path tied to a concrete PostgreSQL error code rather than to fuzzy heuristics about admin intent or `pg_depend` state. +- Keeps the security invariants (no schema adoption, BGW verifies extension ownership) intact. +- Avoids identifier-quoting churn by choosing a bare-identifier default (`_duroxide`). +- Localizes "which schema does this version use" into the version-specific install and upgrade SQL, where version-specific decisions naturally belong. +- Explicitly declines to offer in-place schema migration, making the operational contract clear to operators: keep state on `duroxide`, or destroy state and adopt `_duroxide`. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index c2cb307..51059ed 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -147,7 +147,8 @@ Returns the version that was last installed/updated. Compare against known thres ### Test infrastructure -- `sql/pg_durable--0.1.1.sql` — first install SQL for the current major version (only the first version per major needs a fixture; intermediate versions are reconstructed by chaining upgrade scripts) +- `sql/pg_durable--0.1.1.sql` — first install SQL for the current major version +- `sql/pg_durable--0.2.2.sql` — install SQL fixture at the start of the current provider compatibility line (`PROVIDER_COMPAT_START_VERSION`). The harness reconstructs a target version from the **highest install fixture at or below it** (see `base_fixture_for_version` in `scripts/test-upgrade.sh`), then chains `ALTER EXTENSION UPDATE`. A fixture is required at the provider-compat-start boundary so reconstruction never has to chain across it — the pre-0.2.2 install SQL embeds a hand-written `duroxide` schema that is incompatible with the duroxide-pg provider's migration tracking (`_duroxide_migrations`). - `sql/pg_durable--0.1.1--0.2.0.sql` — upgrade script (initially empty, populated by subsequent PRs) - `scripts/test-upgrade.sh` — runs Scenarios A, B1, and B2. The `PROVIDER_COMPAT_START_VERSION` environment variable/default controls the first version in the current provider compatibility line. Versions before that boundary are excluded from B1 and cannot be used as A/B2 upgrade sources. - CI step in `.github/workflows/ci.yml` @@ -171,7 +172,7 @@ Each PR that changes the extension schema or modifies SQL queries in Rust code s **Minor release** (e.g. 0.2.0 → 0.3.0): 1. Create empty `sql/pg_durable----.sql` upgrade script 2. Bump `Cargo.toml` version to `` -3. If this release starts a new provider compatibility line, update the `PROVIDER_COMPAT_START_VERSION` default in `scripts/test-upgrade.sh` and document the boundary under "Version-Specific Changes". Downstream forks can instead override `PROVIDER_COMPAT_START_VERSION` in CI to keep the script shared. +3. If this release starts a new provider compatibility line, update the `PROVIDER_COMPAT_START_VERSION` default in `scripts/test-upgrade.sh`, check in an install SQL fixture (`sql/pg_durable--.sql`) at that boundary so reconstruction never chains across it, and document the boundary under "Version-Specific Changes". Downstream forks can instead override `PROVIDER_COMPAT_START_VERSION` in CI to keep the script shared. If this is the first minor after a new major (e.g. 1.0.0 → 1.1.0), also: @@ -202,6 +203,17 @@ gate, so they never need to be added to the exclude list. Each schema-changing PR should add a section here documenting what changed, what the upgrade script handles, and any backward compatibility considerations. +### v0.2.2 → v0.2.3 + +#### Rename duroxide provider schema to `_duroxide` for fresh installs +- **DDL change (df schema):** Adds `df.duroxide_schema()`, an `IMMUTABLE`/`PARALLEL SAFE` SQL function that returns the name of the schema holding the duroxide provider objects. Fresh 0.2.3 installs create the function (in `src/lib.rs`) returning `'_duroxide'`; the upgrade script `sql/pg_durable--0.2.2--0.2.3.sql` creates the same function returning `'duroxide'` so pre-existing installs keep using the legacy schema. Both bodies set `search_path = pg_catalog, pg_temp` to satisfy the pgspot gate. +- **DDL change (provider schema):** Fresh installs now run `CREATE SCHEMA _duroxide` (was `CREATE SCHEMA duroxide`). The upgrade script does **not** rename, drop, or move the existing `duroxide` schema — renaming an in-use provider schema would orphan the BGW's durable state. Upgraded installs therefore continue to use `duroxide`. +- **Runtime selection:** Backend sessions resolve the provider schema once per session via `backend_duroxide_schema()` (cached in a `OnceLock`); the BGW resolves it once per epoch via `resolve_duroxide_schema_pool()` (re-resolved after every CREATE EXTENSION so drop+recreate with a different schema version is handled). Both call `df.duroxide_schema()` and fall back to `'duroxide'` (`LEGACY_DUROXIDE_SCHEMA`) when the helper is absent — i.e. a new `.so` deployed against a ≤0.2.2 schema that has not run `ALTER EXTENSION pg_durable UPDATE`. Presence is detected via a `pg_proc` catalog lookup rather than catching `42883`, so the surrounding (sub)transaction is never aborted. +- **Scenario A considerations:** The Scenario A equivalence contract covers the `df` schema only and compares function signatures, not bodies. `df.duroxide_schema()` has an identical signature on the fresh-install and upgrade paths (only the returned literal differs), so Scenario A passes. The provider schema name (`_duroxide` vs `duroxide`) is intentionally excluded from the snapshot diff, as it was for v0.2.0. +- **Scenario B1 considerations:** The new `.so` works against all previous schemas: when `df.duroxide_schema()` does not exist (≤0.2.2 schemas without the upgrade applied) the runtime falls back to `'duroxide'`, which is exactly the schema those installs use. +- **Scenario B2 considerations:** No data migration. The existing `duroxide` schema and its tables are untouched; the upgrade only adds one `df` function. +- **Test fixture:** A new `sql/pg_durable--0.2.2.sql` install fixture is checked in at the provider-compat-start boundary. The upgrade harness now reconstructs 0.2.2 directly from it (an empty `duroxide` schema that the BGW populates via the duroxide-pg `ApplyAll` migration) instead of chaining from the pre-provider `pg_durable--0.1.1.sql` fixture, whose embedded hand-written `duroxide` schema lacks the `_duroxide_migrations` tracking table and is therefore incompatible with the duroxide-pg provider. + ### v0.2.1 → v0.2.2 #### #162 quote_ident-wrapped `current_user::regrole` (fixes #161) diff --git a/examples/invoice-approval/sql/11_reset.sql b/examples/invoice-approval/sql/11_reset.sql index 991d662..9e1b5c9 100644 --- a/examples/invoice-approval/sql/11_reset.sql +++ b/examples/invoice-approval/sql/11_reset.sql @@ -29,17 +29,20 @@ END $$; -- 2. Clean up df extension tables (instances + nodes) TRUNCATE TABLE df.nodes, df.instances; --- 3. Clean up duroxide engine state -TRUNCATE TABLE - duroxide.history, - duroxide.executions, - duroxide.instances, - duroxide.instance_locks, - duroxide.orchestrator_queue, - duroxide.worker_queue, - duroxide.kv_delta, - duroxide.kv_store, - duroxide.sessions; +-- 3. Clean up duroxide engine state. +-- The provider schema is '_duroxide' on fresh installs and 'duroxide' on +-- installs upgraded from <= 0.2.2; resolve it via df.duroxide_schema(). +DO $$ +DECLARE + dx_schema TEXT := df.duroxide_schema(); +BEGIN + EXECUTE format( + 'TRUNCATE TABLE %1$I.history, %1$I.executions, %1$I.instances, ' + '%1$I.instance_locks, %1$I.orchestrator_queue, %1$I.worker_queue, ' + '%1$I.kv_delta, %1$I.kv_store, %1$I.sessions', + dx_schema + ); +END $$; -- 4. Reset demo tables TRUNCATE TABLE demo.invoice_audit, demo.invoices RESTART IDENTITY; diff --git a/expected/00_init.out b/expected/00_init.out index 2c1b1ef..d7980f4 100644 --- a/expected/00_init.out +++ b/expected/00_init.out @@ -18,24 +18,30 @@ SELECT nspname FROM pg_namespace WHERE nspname = 'df'; (1 row) -- Wait for the background worker to apply duroxide migrations. --- The BGW populates the duroxide schema asynchronously after CREATE EXTENSION; --- subsequent tests that call df.start() will fail if this isn't done first. --- We poll duroxide._worker_ready directly (is_worker_ready is internal Rust, --- not exposed as a SQL function). +-- The BGW populates the duroxide provider schema asynchronously after CREATE +-- EXTENSION; subsequent tests that call df.start() will fail if this isn't done +-- first. We poll ._worker_ready directly (is_worker_ready is internal +-- Rust, not exposed as a SQL function). The provider schema name is resolved +-- via df.duroxide_schema() ('_duroxide' on fresh installs, 'duroxide' on +-- installs upgraded from <= 0.2.2). DO $$ DECLARE attempts INT := 0; table_exists BOOLEAN; ready BOOLEAN; + dx_schema TEXT := df.duroxide_schema(); BEGIN LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO ready; ELSE ready := FALSE; END IF; diff --git a/pg_durable.control b/pg_durable.control index 09a87f1..3640796 100644 --- a/pg_durable.control +++ b/pg_durable.control @@ -8,9 +8,11 @@ relocatable = false superuser = true trusted = false # Note: 'schema' is intentionally omitted. This extension manages two schemas -# (df and duroxide), and PostgreSQL's control file only supports a single schema -# directive. The df schema is created by pgrx (#[pg_schema]), and the duroxide -# schema is created by sql/duroxide_install.sql. relocatable = false prevents -# schema relocation attacks. +# (df and the duroxide provider schema), and PostgreSQL's control file only +# supports a single schema directive. The df schema is created by pgrx +# (#[pg_schema]); the duroxide provider schema is created by the +# create_duroxide_schema extension_sql block in src/lib.rs (named '_duroxide' +# on fresh installs, or the legacy 'duroxide' on installs upgraded from +# <= 0.2.2). relocatable = false prevents schema relocation attacks. diff --git a/scripts/test-e2e-local.sh b/scripts/test-e2e-local.sh index c6a299b..6b51b0d 100755 --- a/scripts/test-e2e-local.sh +++ b/scripts/test-e2e-local.sh @@ -389,9 +389,17 @@ SQL wait_for_worker_ready() { local ready="f" local attempts=0 + local dx_schema="" while [ "$attempts" -lt 120 ]; do - ready=$("$PSQL" -h localhost -p "$PG_PORT" -U "$PG_USER" -d "$PG_DB" -Atqc "SELECT CASE WHEN EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'duroxide' AND table_name = '_worker_ready') THEN EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) ELSE FALSE END;" 2>/dev/null | tr -d ' \n' || true) + # Resolve the duroxide provider schema via df.duroxide_schema(). + # Falls back to the legacy 'duroxide' schema when the helper is absent + # (extension not yet created, or installs predating the helper). + dx_schema=$("$PSQL" -h localhost -p "$PG_PORT" -U "$PG_USER" -d "$PG_DB" -Atqc "SELECT df.duroxide_schema();" 2>/dev/null | tr -d ' \n' || true) + if [ -z "$dx_schema" ]; then + dx_schema="duroxide" + fi + ready=$("$PSQL" -h localhost -p "$PG_PORT" -U "$PG_USER" -d "$PG_DB" -Atqc "SELECT CASE WHEN EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '$dx_schema' AND table_name = '_worker_ready') THEN EXISTS(SELECT 1 FROM $dx_schema._worker_ready WHERE schema_version >= 1) ELSE FALSE END;" 2>/dev/null | tr -d ' \n' || true) if [ "$ready" = "t" ]; then return fi diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 0523f91..5b7ff80 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -117,6 +117,44 @@ first_fixture_for_major() { fi } +# Selects the best checked-in install fixture to use as the base for +# reconstructing a target version: the highest install fixture (same major) +# that is <= the target version. This lets the harness install a version +# directly from its own fixture (e.g. 0.2.2) instead of always chaining from +# the major's first fixture (0.1.1), which would drag in the pre-provider +# legacy duroxide schema. Versions before PROVIDER_COMPAT_START_VERSION are +# never selected as a base for targets within the provider compatibility line. +base_fixture_for_version() { + local target_version="$1" + local target_major + target_major=$(echo "$target_version" | cut -d. -f1) + + local best="" + for f in "$PROJECT_DIR"/sql/pg_durable--*.sql; do + local fname + fname=$(basename "$f") + if [[ "$fname" =~ ^pg_durable--([0-9]+\.[0-9]+\.[0-9]+)\.sql$ ]]; then + local candidate="${BASH_REMATCH[1]}" + local candidate_major + candidate_major=$(echo "$candidate" | cut -d. -f1) + [ "$candidate_major" = "$target_major" ] || continue + # Skip fixtures newer than the target. + version_ge "$target_version" "$candidate" || continue + # When the target is within the provider compatibility line, never + # base it on a fixture from before that line started. + if version_ge "$target_version" "$PROVIDER_COMPAT_START_VERSION" \ + && ! version_ge "$candidate" "$PROVIDER_COMPAT_START_VERSION"; then + continue + fi + if [ -z "$best" ] || version_ge "$candidate" "$best"; then + best="$candidate" + fi + fi + done + + printf '%s' "$best" +} + # Find the previous version by looking for upgrade SQL scripts PREV_VERSION=$(for f in "$PROJECT_DIR"/sql/pg_durable--*--"${CURRENT_VERSION}".sql; do fname=$(basename "$f") @@ -396,19 +434,21 @@ wait_for_ready() { return 1 } -# Creates the extension at a specific version by installing from that major's -# first checked-in fixture and chaining ALTER EXTENSION UPDATE if needed. +# Creates the extension at a specific version by installing from the closest +# checked-in fixture at or below the target (see base_fixture_for_version) and +# chaining ALTER EXTENSION UPDATE if needed. This keeps reconstruction within +# the provider compatibility line and avoids chaining across the pre-0.2.2 +# boundary, where the legacy in-extension duroxide schema is incompatible with +# the duroxide-pg provider. create_extension_at_version() { local target_version="$1" - local target_major local base_version - target_major=$(echo "$target_version" | cut -d. -f1) - base_version=$(first_fixture_for_major "$target_major") + base_version=$(base_fixture_for_version "$target_version") if [ -z "$base_version" ]; then - echo "No install SQL fixture found for major version $target_major" - echo "Expected: sql/pg_durable--.sql" + echo "No install SQL fixture found at or below version $target_version" + echo "Expected: sql/pg_durable--.sql" return 1 fi diff --git a/sql/00_init.sql b/sql/00_init.sql index cdf21d7..379f21c 100644 --- a/sql/00_init.sql +++ b/sql/00_init.sql @@ -12,24 +12,30 @@ SELECT extname FROM pg_extension WHERE extname = 'pg_durable'; SELECT nspname FROM pg_namespace WHERE nspname = 'df'; -- Wait for the background worker to apply duroxide migrations. --- The BGW populates the duroxide schema asynchronously after CREATE EXTENSION; --- subsequent tests that call df.start() will fail if this isn't done first. --- We poll duroxide._worker_ready directly (is_worker_ready is internal Rust, --- not exposed as a SQL function). +-- The BGW populates the duroxide provider schema asynchronously after CREATE +-- EXTENSION; subsequent tests that call df.start() will fail if this isn't done +-- first. We poll ._worker_ready directly (is_worker_ready is internal +-- Rust, not exposed as a SQL function). The provider schema name is resolved +-- via df.duroxide_schema() ('_duroxide' on fresh installs, 'duroxide' on +-- installs upgraded from <= 0.2.2). DO $$ DECLARE attempts INT := 0; table_exists BOOLEAN; ready BOOLEAN; + dx_schema TEXT := df.duroxide_schema(); BEGIN LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO ready; ELSE ready := FALSE; END IF; diff --git a/sql/pg_durable--0.2.2--0.2.3.sql b/sql/pg_durable--0.2.2--0.2.3.sql new file mode 100644 index 0000000..470d4d1 --- /dev/null +++ b/sql/pg_durable--0.2.2--0.2.3.sql @@ -0,0 +1,21 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- pg_durable upgrade: 0.2.2 → 0.2.3 +-- +-- Introduces df.duroxide_schema(), a helper that reports which schema holds the +-- duroxide provider objects for this install. Fresh 0.2.3 installs create the +-- provider objects in the '_duroxide' schema (see lib.rs). Installs upgrading +-- from <= 0.2.2 already have their provider objects in the legacy 'duroxide' +-- schema and must keep using it — renaming an in-use schema would orphan the +-- background worker's durable state. This upgrade therefore defines +-- df.duroxide_schema() to return 'duroxide' for pre-existing installs. +-- +-- Backend sessions and the background worker call df.duroxide_schema() to learn +-- which schema to use, falling back to 'duroxide' when the helper is absent +-- (installs predating it). No schema rename, drop, or data movement occurs. + +CREATE FUNCTION df.duroxide_schema() RETURNS text + LANGUAGE sql IMMUTABLE PARALLEL SAFE + SET search_path = pg_catalog, pg_temp + AS $$ SELECT 'duroxide'::text $$; diff --git a/sql/pg_durable--0.2.2.sql b/sql/pg_durable--0.2.2.sql new file mode 100644 index 0000000..799bb5c --- /dev/null +++ b/sql/pg_durable--0.2.2.sql @@ -0,0 +1,1047 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +/* */ +/* +This file is auto generated by pgrx. + +The ordering of items is not stable, it is driven by a dependency graph. +*/ +/* */ + +/* */ +-- src/lib.rs:146 +CREATE SCHEMA IF NOT EXISTS df; /* pg_durable::df */ +/* */ + +/* */ +-- src/lib.rs:153 +-- requires: +-- df + + +-- Table to store function nodes (SQL steps, THEN chains, etc.) +CREATE TABLE IF NOT EXISTS df.nodes ( + id VARCHAR(8) PRIMARY KEY, + instance_id VARCHAR(8), + node_type TEXT NOT NULL, + query TEXT, + result_name TEXT, + left_node VARCHAR(8), + right_node VARCHAR(8), + status TEXT DEFAULT 'pending', + result JSONB, + error TEXT, + submitted_by REGROLE, + database TEXT, + created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + updated_at TIMESTAMPTZ DEFAULT pg_catalog.now() +); + +COMMENT ON COLUMN df.nodes.submitted_by IS + 'Effective role (current_user) at df.start() time - used for connection authentication and SQL execution'; + +-- Table to store function instances +CREATE TABLE IF NOT EXISTS df.instances ( + id VARCHAR(8) PRIMARY KEY, + label TEXT, + root_node VARCHAR(8) NOT NULL, + status TEXT DEFAULT 'pending', + submitted_by REGROLE NOT NULL, + database TEXT, + created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + updated_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + completed_at TIMESTAMPTZ +); + +COMMENT ON COLUMN df.instances.submitted_by IS + 'Effective role (current_user) at df.start() time - used for connection authentication and SQL execution'; + +-- Index for finding pending instances +CREATE INDEX IF NOT EXISTS idx_instances_status ON df.instances(status); + +-- Index for finding nodes by instance +CREATE INDEX IF NOT EXISTS idx_nodes_instance ON df.nodes(instance_id); + +-- Table to store workflow variables (captured at df.start()) +-- Per-user scoping: each user has their own variable namespace. +CREATE TABLE IF NOT EXISTS df.vars ( + name TEXT NOT NULL, + value TEXT, + owner REGROLE NOT NULL DEFAULT pg_catalog.quote_ident(current_user)::pg_catalog.regrole, + PRIMARY KEY (owner, name) +); + +-- Sentinel table: the background worker writes its epoch_id here after +-- initialising. If the extension is DROP-ed and re-CREATEd between +-- two poll ticks the epoch row disappears, so the worker detects the +-- recreation even though the extension is always "present" in pg_extension. +CREATE TABLE IF NOT EXISTS df._worker_epoch ( + epoch_id UUID PRIMARY KEY, + started_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + last_seen_at TIMESTAMPTZ DEFAULT pg_catalog.now() +); + +ALTER TABLE df.instances + ADD CONSTRAINT instances_id_format_chk + -- Operators (OPERATOR(pg_catalog.)) and functions (e.g. pg_catalog.now) + -- are schema-qualified throughout this install DDL so name resolution never + -- depends on the session search_path -- closing the CVE-2018-1058 vector + -- (a malicious schema shadowing `=`, `~`, etc.). Enforced by the pgspot CI + -- gate (scripts/pgspot-gate.sh). + CHECK (id OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT instances_root_node_format_chk + CHECK (root_node OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT instances_status_chk + CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed', 'cancelled'])) NOT VALID, + -- Supports the composite FK from df.nodes that ties node identity to the instance row. + ADD CONSTRAINT instances_identity_key + UNIQUE (id, submitted_by); + +ALTER TABLE df.nodes + ADD CONSTRAINT nodes_instance_id_present_chk + CHECK (instance_id IS NOT NULL) NOT VALID, + ADD CONSTRAINT nodes_submitted_by_present_chk + CHECK (submitted_by IS NOT NULL) NOT VALID, + ADD CONSTRAINT nodes_id_format_chk + CHECK (id OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_instance_id_format_chk + CHECK (instance_id OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_left_node_format_chk + CHECK (left_node IS NULL OR left_node OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_right_node_format_chk + CHECK (right_node IS NULL OR right_node OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_node_type_chk + CHECK (node_type OPERATOR(pg_catalog.=) ANY (ARRAY['SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL'])) NOT VALID, + ADD CONSTRAINT nodes_result_name_chk + CHECK (result_name IS NULL OR result_name OPERATOR(pg_catalog.~) '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID, + ADD CONSTRAINT nodes_status_chk + CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed'])) NOT VALID, + ADD CONSTRAINT nodes_result_status_chk + CHECK (result IS NULL OR status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed'])) NOT VALID, + ADD CONSTRAINT nodes_structure_chk + CHECK ( + CASE + WHEN node_type OPERATOR(pg_catalog.=) ANY (ARRAY['SQL', 'SLEEP', 'WAIT_SCHEDULE', 'BREAK', 'HTTP', 'SIGNAL']) + THEN left_node IS NULL AND right_node IS NULL AND query IS NOT NULL + WHEN node_type OPERATOR(pg_catalog.=) 'THEN' + THEN left_node IS NOT NULL AND right_node IS NOT NULL AND query IS NULL + WHEN node_type OPERATOR(pg_catalog.=) 'IF' + THEN left_node IS NOT NULL AND right_node IS NOT NULL AND query IS NOT NULL + WHEN node_type OPERATOR(pg_catalog.=) 'LOOP' + THEN left_node IS NOT NULL AND right_node IS NULL + WHEN node_type OPERATOR(pg_catalog.=) 'JOIN' + THEN left_node IS NOT NULL AND right_node IS NOT NULL + WHEN node_type OPERATOR(pg_catalog.=) 'RACE' + THEN left_node IS NOT NULL AND right_node IS NOT NULL AND query IS NULL + ELSE FALSE + END + ) NOT VALID, + ADD CONSTRAINT nodes_instance_node_key + UNIQUE (instance_id, id); + +ALTER TABLE df.nodes + ADD CONSTRAINT nodes_instance_identity_fkey + FOREIGN KEY (instance_id, submitted_by) + REFERENCES df.instances (id, submitted_by) + DEFERRABLE INITIALLY DEFERRED NOT VALID, + ADD CONSTRAINT nodes_left_node_same_instance_fkey + FOREIGN KEY (instance_id, left_node) + REFERENCES df.nodes (instance_id, id) + DEFERRABLE INITIALLY DEFERRED NOT VALID, + ADD CONSTRAINT nodes_right_node_same_instance_fkey + FOREIGN KEY (instance_id, right_node) + REFERENCES df.nodes (instance_id, id) + DEFERRABLE INITIALLY DEFERRED NOT VALID; + +ALTER TABLE df.instances + ADD CONSTRAINT instances_root_node_same_instance_fkey + FOREIGN KEY (id, root_node) + REFERENCES df.nodes (instance_id, id) + DEFERRABLE INITIALLY DEFERRED NOT VALID; +/* */ + +/* */ +-- src/dsl.rs:924 +-- pg_durable::dsl::cancel +CREATE FUNCTION df."cancel"( + "instance_id" TEXT, /* &str */ + "reason" TEXT DEFAULT 'Cancelled by user' /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'cancel_wrapper'; +/* */ + +/* */ +-- src/explain.rs:36 +-- pg_durable::explain::explain +CREATE FUNCTION df."explain"( + "input" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'explain_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:48 +-- pg_durable::dsl::debug_connection +CREATE FUNCTION df."debug_connection"() RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'debug_connection_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:973 +-- pg_durable::dsl::run +CREATE FUNCTION df."run"( + "instance_id" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'run_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:38 +-- pg_durable::dsl::version +CREATE FUNCTION df."version"() RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'version_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:587 +-- pg_durable::dsl::signal +CREATE FUNCTION df."signal"( + "instance_id" TEXT, /* &str */ + "signal_name" TEXT, /* &str */ + "signal_data" TEXT DEFAULT '{}' /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'signal_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:369 +-- pg_durable::dsl::if +CREATE FUNCTION df."if"( + "condition" TEXT, /* &str */ + "then_branch" TEXT, /* &str */ + "else_branch" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'if_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:414 +-- pg_durable::dsl::join +CREATE FUNCTION df."join"( + "a" TEXT, /* &str */ + "b" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'join_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:1011 +-- pg_durable::dsl::wait_for_completion +CREATE FUNCTION df."wait_for_completion"( + "instance_id" TEXT, /* &str */ + "timeout_seconds" INT DEFAULT 30 /* i32 */ +) RETURNS TEXT /* core::result::Result> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'wait_for_completion_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:275 +-- pg_durable::monitoring::metrics +CREATE FUNCTION df."metrics"() RETURNS TABLE ( + "total_instances" bigint, /* i64 */ + "running_instances" bigint, /* i64 */ + "completed_instances" bigint, /* i64 */ + "failed_instances" bigint, /* i64 */ + "total_executions" bigint, /* i64 */ + "total_events" bigint /* i64 */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'metrics_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:147 +-- pg_durable::dsl::getvar +CREATE FUNCTION df."getvar"( + "name" TEXT /* &str */ +) RETURNS TEXT /* core::option::Option */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'getvar_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:124 +-- pg_durable::dsl::setvar +CREATE FUNCTION df."setvar"( + "name" TEXT, /* &str */ + "value" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'setvar_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:323 +-- pg_durable::monitoring::instance_nodes +CREATE FUNCTION df."instance_nodes"( + "instance_id_param" TEXT, /* &str */ + "last_n_executions" INT DEFAULT 5 /* i32 */ +) RETURNS TABLE ( + "execution_id" bigint, /* i64 */ + "node_id" TEXT, /* alloc::string::String */ + "node_type" TEXT, /* alloc::string::String */ + "query" TEXT, /* core::option::Option */ + "result_name" TEXT, /* core::option::Option */ + "left_node" TEXT, /* core::option::Option */ + "right_node" TEXT, /* core::option::Option */ + "status" TEXT, /* core::option::Option */ + "result" TEXT, /* core::option::Option */ + "updated_at" timestamp with time zone /* core::option::Option */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'instance_nodes_wrapper'; +/* */ + +/* */ +-- src/types.rs:129 +-- pg_durable::types::target_database +CREATE FUNCTION df."target_database"() RETURNS TEXT /* alloc::string::String */ +IMMUTABLE STRICT PARALLEL SAFE +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'target_database_wrapper'; +/* */ + +/* */ +-- src/lib.rs:574 +-- requires: +-- df +-- target_database + + +-- Validate that CREATE EXTENSION is run in the correct database +-- The background worker connects to one specific database (determined by +-- the pg_durable.database GUC, defaults to "postgres"). +-- The extension must be created in that database for workflows to execute. +DO $$ +DECLARE + current_db TEXT; + target_db TEXT; +BEGIN + -- Get the current database + SELECT pg_catalog.current_database() INTO current_db; + + -- Get the target database that the background worker will connect to + SELECT df.target_database() INTO target_db; + + IF current_db OPERATOR(pg_catalog.<>) target_db THEN + RAISE EXCEPTION 'pg_durable extension must be created in database "%" (currently in "%"). The background worker only processes functions in the database specified by the pg_durable.database GUC (defaults to "postgres").', target_db, current_db + USING HINT = 'Connect to the correct database and run: CREATE EXTENSION pg_durable;'; + END IF; +END $$; +/* */ + +/* */ +-- src/lib.rs:621 +-- requires: +-- validate_database + + +-- The duroxide provider schema is created here so the extension owns it. +-- No IF NOT EXISTS: fails loudly if a duroxide schema already exists, +-- preventing adoption of a potentially attacker-crafted schema. +-- The background worker populates this schema at startup via ApplyAll. +CREATE SCHEMA duroxide; + +/* */ + +/* */ +-- src/dsl.rs:181 +-- pg_durable::dsl::clearvars +CREATE FUNCTION df."clearvars"() RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'clearvars_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:452 +-- pg_durable::dsl::race +CREATE FUNCTION df."race"( + "a" TEXT, /* &str */ + "b" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'race_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:249 +-- pg_durable::dsl::sleep +CREATE FUNCTION df."sleep"( + "seconds" bigint /* i64 */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'sleep_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:237 +-- pg_durable::dsl::as +CREATE FUNCTION df."as"( + "fut" TEXT, /* &str */ + "name" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'as_named_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:116 +-- pg_durable::monitoring::instance_info +CREATE FUNCTION df."instance_info"( + "instance_id" TEXT /* &str */ +) RETURNS TABLE ( + "instance_id" TEXT, /* alloc::string::String */ + "label" TEXT, /* core::option::Option */ + "function_name" TEXT, /* alloc::string::String */ + "function_version" TEXT, /* alloc::string::String */ + "current_execution_id" bigint, /* i64 */ + "status" TEXT, /* alloc::string::String */ + "output" TEXT /* core::option::Option */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'instance_info_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:161 +-- pg_durable::dsl::unsetvar +CREATE FUNCTION df."unsetvar"( + "name" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'unsetvar_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:206 +-- pg_durable::dsl::sql +CREATE FUNCTION df."sql"( + "query" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'sql_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:219 +-- pg_durable::dsl::seq +CREATE FUNCTION df."seq"( + "a" TEXT, /* &str */ + "b" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'then_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:430 +-- pg_durable::dsl::join3 +CREATE FUNCTION df."join3"( + "a" TEXT, /* &str */ + "b" TEXT, /* &str */ + "c" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'join3_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:392 +-- pg_durable::dsl::if_rows +CREATE FUNCTION df."if_rows"( + "result_name" TEXT, /* &str */ + "then_branch" TEXT, /* &str */ + "else_branch" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'if_rows_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:265 +-- pg_durable::dsl::wait_for_schedule +CREATE FUNCTION df."wait_for_schedule"( + "cron_expr" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'wait_for_schedule_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:983 +-- pg_durable::dsl::result +CREATE FUNCTION df."result"( + "instance_id" TEXT /* &str */ +) RETURNS TEXT /* core::option::Option */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'result_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:353 +-- pg_durable::dsl::break +CREATE FUNCTION df."break"( + "value" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'break_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:553 +-- pg_durable::dsl::wait_for_signal +CREATE FUNCTION df."wait_for_signal"( + "name" TEXT, /* &str */ + "timeout_seconds" INT DEFAULT NULL /* core::option::Option */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'wait_for_signal_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:312 +-- pg_durable::dsl::loop +CREATE FUNCTION df."loop"( + "body" TEXT, /* &str */ + "condition" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'loop_fn_wrapper'; +/* */ + +/* */ +-- src/lib.rs:652 +-- requires: +-- dsl::then_fn +-- dsl::as_named +-- dsl::join +-- dsl::race +-- dsl::if_fn +-- dsl::loop_fn + + +-- Operator ~> for sequencing: a ~> b means "run a, then run b" +CREATE OPERATOR ~> ( + FUNCTION = df.seq, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator |=> for naming: fut |=> 'name' means "name this result as $name" +CREATE OR REPLACE FUNCTION df.as_op(fut text, name text) RETURNS text AS $$ + SELECT df.as(fut, name); +$$ LANGUAGE SQL IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OPERATOR |=> ( + FUNCTION = df.as_op, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator & for parallel join: a & b means "run a and b in parallel, wait for both" +CREATE OPERATOR & ( + FUNCTION = df.join, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator | for race: a | b means "run a and b in parallel, first wins" +CREATE OPERATOR | ( + FUNCTION = df.race, + LEFTARG = text, + RIGHTARG = text +); + +-- Operators ?> and !> for if-then-else: cond ?> then_branch !> else_branch +-- We need helper functions to build the if node incrementally + +-- Helper: cond ?> then creates a partial if (stores condition and then branch) +CREATE OR REPLACE FUNCTION df.if_then_op(condition text, then_branch text) RETURNS text AS $$ +DECLARE + cond_fut jsonb; + then_fut jsonb; + result_obj jsonb; +BEGIN + -- Ensure both are durofuts + cond_fut := df.ensure_durofut(condition)::jsonb; + then_fut := df.ensure_durofut(then_branch)::jsonb; + + -- Return a special marker object for the partial if + result_obj := jsonb_build_object( + '_partial_if', true, + 'condition', cond_fut, + 'then_branch', then_fut + ); + RETURN result_obj::text; +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +-- Helper: partial_if !> else completes the if node +CREATE OR REPLACE FUNCTION df.if_else_op(partial_if text, else_branch text) RETURNS text AS $$ +DECLARE + partial jsonb; + else_fut text; + cond_text text; + then_text text; +BEGIN + partial := partial_if::jsonb; + + -- Check if it's a partial if + IF partial->>'_partial_if' IS NULL THEN + RAISE EXCEPTION 'Invalid if-then-else: left side of !> must be a ?> expression'; + END IF; + + cond_text := partial->'condition'::text; + then_text := partial->'then_branch'::text; + else_fut := df.ensure_durofut(else_branch); + + -- Now call the real df.if function + RETURN df.if(cond_text, then_text, else_fut); +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +-- Helper to ensure a value is a durofut (returns JSON string) +-- Rejects JSON with unknown node_type values. +-- NOTE: The valid node type list here must be kept in sync with +-- VALID_NODE_TYPES in src/types.rs (the Rust constant is the canonical source). +CREATE OR REPLACE FUNCTION df.ensure_durofut(val text) RETURNS text AS $$ +DECLARE + node_type_val text; +BEGIN + -- Try to parse as JSON to check if it's already a durofut + BEGIN + node_type_val := (val::jsonb)->>'node_type'; + IF node_type_val IS NOT NULL THEN + -- Has a node_type - validate it + IF node_type_val NOT IN ('SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL') THEN + RAISE EXCEPTION 'Unknown node_type ''%''. Valid types: SQL, THEN, IF, JOIN, LOOP, BREAK, RACE, SLEEP, WAIT_SCHEDULE, HTTP, SIGNAL', node_type_val; + END IF; + RETURN val; + END IF; + EXCEPTION WHEN invalid_text_representation THEN + -- Not valid JSON, treat as SQL + NULL; + WHEN raise_exception THEN + -- Re-raise our validation error + RAISE; + WHEN OTHERS THEN + -- Not valid JSON, treat as SQL + NULL; + END; + + -- It's plain SQL, wrap it + RETURN df.sql(val); +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OPERATOR ?> ( + FUNCTION = df.if_then_op, + LEFTARG = text, + RIGHTARG = text +); + +CREATE OPERATOR !> ( + FUNCTION = df.if_else_op, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator @> for loop: @> body means "repeat body forever" +-- This is a PREFIX operator with lowest precedence +CREATE OR REPLACE FUNCTION df.loop_prefix_op(body text) RETURNS text AS $$ + SELECT df.loop(body); +$$ LANGUAGE SQL IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OPERATOR @> ( + FUNCTION = df.loop_prefix_op, + RIGHTARG = text +); +/* */ + +/* */ +-- src/monitoring.rs:195 +-- pg_durable::monitoring::instance_executions +CREATE FUNCTION df."instance_executions"( + "instance_id" TEXT, /* &str */ + "limit_count" INT DEFAULT 5 /* i32 */ +) RETURNS TABLE ( + "execution_id" bigint, /* i64 */ + "status" TEXT, /* alloc::string::String */ + "event_count" bigint, /* i64 */ + "duration_ms" bigint, /* i64 */ + "output" TEXT /* core::option::Option */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'instance_executions_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:18 +-- pg_durable::monitoring::list_instances +CREATE FUNCTION df."list_instances"( + "status_filter" TEXT DEFAULT NULL, /* core::option::Option<&str> */ + "limit_count" INT DEFAULT 100 /* i32 */ +) RETURNS TABLE ( + "instance_id" TEXT, /* alloc::string::String */ + "label" TEXT, /* core::option::Option */ + "function_name" TEXT, /* alloc::string::String */ + "status" TEXT, /* alloc::string::String */ + "execution_count" bigint, /* i64 */ + "output" TEXT /* core::option::Option */ +) + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'list_instances_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:962 +-- pg_durable::dsl::status +CREATE FUNCTION df."status"( + "instance_id" TEXT /* &str */ +) RETURNS TEXT /* core::option::Option */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'status_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:478 +-- pg_durable::dsl::http +CREATE FUNCTION df."http"( + "url" TEXT, /* &str */ + "method" TEXT DEFAULT 'POST', /* &str */ + "body" TEXT DEFAULT NULL, /* core::option::Option<&str> */ + "headers" jsonb DEFAULT NULL, /* core::option::Option */ + "timeout_seconds" INT DEFAULT 30 /* i32 */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'http_wrapper'; +/* */ + +/* */ +-- src/lib.rs:303 +-- requires: +-- create_tables +-- dsl::http + + +-- Enable RLS on df.instances (no FORCE — superuser/table-owner bypasses RLS) +ALTER TABLE df.instances ENABLE ROW LEVEL SECURITY; + +CREATE POLICY instances_user_isolation ON df.instances + FOR ALL + USING (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole) + WITH CHECK (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole); + +-- Enable RLS on df.nodes +ALTER TABLE df.nodes ENABLE ROW LEVEL SECURITY; + +CREATE POLICY nodes_user_isolation ON df.nodes + FOR ALL + USING (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole) + WITH CHECK (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole); + +-- Enable RLS on df.vars (per-user variable isolation) +ALTER TABLE df.vars ENABLE ROW LEVEL SECURITY; + +CREATE POLICY vars_user_isolation ON df.vars + FOR ALL + USING (owner OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole) + WITH CHECK (owner OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole); + +-- No automatic PUBLIC grants. Admins must explicitly grant privileges +-- to application roles after CREATE EXTENSION. +-- Use df.grant_usage('role_name') (recommended) or see USER_GUIDE.md +-- "Privilege Grants" for the equivalent manual GRANT statements. + +-- Helper: grant all required df privileges to a role in one call. +-- Authorization model: This function is SECURITY INVOKER and EXECUTE is +-- revoked from PUBLIC, so only roles explicitly granted EXECUTE (via +-- with_grant => true or a direct superuser GRANT) can call it. The inner +-- GRANT statements run as the caller, so the caller must also hold the +-- underlying privileges WITH GRANT OPTION (automatically true for +-- superusers; for delegated admins, df.grant_usage(..., with_grant => true) +-- grants all privileges WITH GRANT OPTION). +-- +-- This function is purely additive — it never issues REVOKE. To downgrade +-- a role's privileges, call df.revoke_usage() first, then df.grant_usage() +-- with the desired options. +-- +-- include_http controls whether the role is granted EXECUTE on df.http(). +-- Default is false: HTTP access is opt-in because df.http() makes outbound +-- network requests and requires explicit administrator approval. +-- +-- with_grant controls whether the target role receives privileges WITH GRANT +-- OPTION and can itself call df.grant_usage() / df.revoke_usage() to manage +-- other roles' access. Authorization is enforced by PostgreSQL's native +-- WITH GRANT OPTION mechanism: the caller must hold each underlying +-- privilege WITH GRANT OPTION, which is automatically true for superusers +-- and for delegated admins granted via with_grant => true. +-- +-- MAINTENANCE: when adding a new df.* function, add it to the func_sigs +-- array below. Functions NOT in this list are deny-by-default. +CREATE OR REPLACE FUNCTION df.grant_usage( + p_role TEXT, + include_http boolean DEFAULT false, + with_grant boolean DEFAULT false +) +RETURNS VOID +LANGUAGE plpgsql +SET search_path = pg_catalog, df, pg_temp +AS $fn$ +DECLARE + grant_opt TEXT := ''; + func_sig TEXT; + -- Explicit list of df.* functions to grant. Sensitive functions + -- (df.http, df.grant_usage, df.revoke_usage) are excluded from this + -- list and granted conditionally below. + func_sigs TEXT[] := ARRAY[ + -- DSL functions + 'df.sql(text)', + 'df.seq(text, text)', + 'df.as(text, text)', + 'df.sleep(bigint)', + 'df.wait_for_schedule(text)', + 'df.loop(text, text)', + 'df.break(text)', + 'df.if(text, text, text)', + 'df.if_rows(text, text, text)', + 'df.join(text, text)', + 'df.join3(text, text, text)', + 'df.race(text, text)', + 'df.wait_for_signal(text, integer)', + 'df.signal(text, text, text)', + 'df.start(text, text, text)', + 'df.setvar(text, text)', + 'df.getvar(text)', + 'df.unsetvar(text)', + 'df.clearvars()', + -- Monitoring functions + 'df.status(text)', + 'df.result(text)', + 'df.cancel(text, text)', + 'df.wait_for_completion(text, integer)', + 'df.run(text)', + 'df.list_instances(text, integer)', + 'df.instance_info(text)', + 'df.instance_nodes(text, integer)', + 'df.instance_executions(text, integer)', + 'df.metrics()', + -- Internal helpers (operators, version, etc.) + 'df.as_op(text, text)', + 'df.if_then_op(text, text)', + 'df.if_else_op(text, text)', + 'df.ensure_durofut(text)', + 'df.loop_prefix_op(text)', + 'df.version()', + 'df.debug_connection()', + 'df.explain(text)', + 'df.target_database()' + ]; +BEGIN + -- Validate the role exists + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = p_role) THEN + RAISE EXCEPTION 'role "%" does not exist', p_role; + END IF; + + IF with_grant THEN + grant_opt := ' WITH GRANT OPTION'; + END IF; + + -- Schema access + EXECUTE format('GRANT USAGE ON SCHEMA df TO %I', p_role) || grant_opt; + + -- Grant EXECUTE on each standard function explicitly. + FOREACH func_sig IN ARRAY func_sigs LOOP + EXECUTE format('GRANT EXECUTE ON FUNCTION %s TO %I', func_sig, p_role) || grant_opt; + END LOOP; + + -- df.http() — opt-in because it makes outbound network requests. + IF include_http THEN + EXECUTE format('GRANT EXECUTE ON FUNCTION df.http(text, text, text, jsonb, integer) TO %I', p_role) || grant_opt; + END IF; + + -- Admin helpers — only for delegated administrators. + IF with_grant THEN + EXECUTE format('GRANT EXECUTE ON FUNCTION df.grant_usage(text, boolean, boolean) TO %I', p_role) || grant_opt; + EXECUTE format('GRANT EXECUTE ON FUNCTION df.revoke_usage(text) TO %I', p_role) || grant_opt; + END IF; + + -- Table privileges + EXECUTE format('GRANT SELECT ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT SELECT ON df.nodes TO %I', p_role) || grant_opt; + EXECUTE format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) || grant_opt; + EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) || grant_opt; + + RAISE NOTICE 'pg_durable: granted df usage privileges to "%"', p_role; +END; +$fn$; + +-- Revoke all df privileges previously granted by df.grant_usage(). +-- Authorization: same model as df.grant_usage() — EXECUTE is revoked from +-- PUBLIC, caller must hold the underlying privileges to revoke them. +-- Safety: format(%I) quotes identifiers to prevent SQL injection. Additionally, +-- this is SECURITY INVOKER so it cannot escalate beyond the caller's privileges. +CREATE OR REPLACE FUNCTION df.revoke_usage(p_role TEXT) +RETURNS VOID +LANGUAGE plpgsql +SET search_path = pg_catalog, df, pg_temp +AS $fn$ +DECLARE + func_oid oid; +BEGIN + -- Validate the role exists + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = p_role) THEN + RAISE EXCEPTION 'role "%" does not exist', p_role; + END IF; + + -- Prevent accidentally revoking your own access. pg_has_role checks + -- both direct identity (current_user = p_role) and inherited membership + -- (current_user is a member of p_role), so revoking a parent role that + -- the caller depends on is also caught. + -- Superusers are exempt: pg_has_role returns true for all roles when the + -- caller is a superuser, and superusers can always re-grant themselves. + IF NOT EXISTS ( + SELECT 1 + FROM pg_roles + WHERE rolname = current_user + AND rolsuper + ) + AND pg_has_role(current_user, p_role, 'MEMBER') THEN + RAISE EXCEPTION 'cannot revoke df privileges from "%" because the current role ("%") is a member of it — use a different administrator', p_role, current_user; + END IF; + + -- CASCADE: if the target role granted sub-grants (via WITH GRANT OPTION), + -- CASCADE ensures those dependent privileges are also revoked. + -- Column-level revokes must match the column-level grants from grant_usage(). + EXECUTE format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); + EXECUTE format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); + EXECUTE format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); + EXECUTE format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); + EXECUTE format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); + EXECUTE format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); + + -- Revoke EXECUTE per-function rather than using the blanket + -- REVOKE ON ALL FUNCTIONS. A delegated admin may lack privilege on + -- some functions (e.g. df.http); per-function revokes let us skip those. + FOR func_oid IN + SELECT p.oid FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE n.nspname = 'df' + LOOP + BEGIN + EXECUTE format('REVOKE EXECUTE ON FUNCTION %s FROM %I CASCADE', func_oid::regprocedure, p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + END LOOP; + + EXECUTE format('REVOKE USAGE ON SCHEMA df FROM %I CASCADE', p_role); + + RAISE NOTICE 'pg_durable: revoked df usage privileges granted by "%" from "%"', current_user, p_role; +END; +$fn$; + +-- Validate that the worker role is a superuser. +-- The background worker must bypass RLS to manage all users' instances/nodes. +-- If the worker role is not a superuser, workflows will silently fail because +-- RLS will filter out rows the worker needs to read/update. +DO $$ +DECLARE + wrole TEXT; + is_super BOOLEAN; +BEGIN + wrole := pg_catalog.current_setting('pg_durable.worker_role', true); + IF wrole IS NULL OR wrole OPERATOR(pg_catalog.=) '' THEN + wrole := 'azuresu'; + END IF; + + SELECT rolsuper INTO is_super FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) wrole; + IF is_super IS NULL THEN + RAISE WARNING 'pg_durable: worker role "%" does not exist. The background worker will not be able to process workflows. Create the role as a superuser before using pg_durable.', wrole; + ELSIF NOT is_super THEN + RAISE WARNING 'pg_durable: worker role "%" is not a superuser. The background worker must be a superuser to bypass RLS and manage all users'' instances. Grant superuser or BYPASSRLS to this role.', wrole; + END IF; +END $$; + +-- df.http() carries network-access implications and must be opt-in. +-- PostgreSQL grants EXECUTE to PUBLIC by default when functions are created; +-- revoke that so only roles explicitly granted access (via +-- df.grant_usage(role, include_http => true) or a direct GRANT) can make +-- HTTP requests. +REVOKE EXECUTE ON FUNCTION df.http(text, text, text, jsonb, integer) FROM PUBLIC; + +-- df.grant_usage() and df.revoke_usage() are admin-only helpers. +-- Revoke PUBLIC's default EXECUTE privilege so that only roles explicitly +-- granted access (via with_grant => true or a direct superuser GRANT) can +-- manage other roles' df privileges. +REVOKE EXECUTE ON FUNCTION df.grant_usage(text, boolean, boolean) FROM PUBLIC; +REVOKE EXECUTE ON FUNCTION df.revoke_usage(text) FROM PUBLIC; +/* */ + +/* */ +-- src/dsl.rs:630 +-- pg_durable::dsl::start +CREATE FUNCTION df."start"( + "fut" TEXT, /* &str */ + "label" TEXT DEFAULT NULL, /* core::option::Option<&str> */ + "database" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'start_wrapper'; +/* */ + diff --git a/src/client.rs b/src/client.rs index f358fe4..625ea73 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,7 +12,7 @@ use duroxide::Client; use pgrx::prelude::*; use tokio::runtime::Runtime; -use crate::types::{new_backend_provider, postgres_connection_string}; +use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; /// Cached tokio runtime for client operations. static CLIENT_RUNTIME: OnceLock = OnceLock::new(); @@ -23,17 +23,20 @@ static DUROXIDE_CLIENT: OnceLock = OnceLock::new(); /// Check whether the background worker has finished initializing the duroxide /// schema for the current binary's expected schema version. /// -/// Returns `false` if `duroxide._worker_ready` does not exist, has no row, or -/// has a `schema_version` below `WORKER_SCHEMA_VERSION`. This is a fast SPI -/// read called once per session on the first call to any `df.*` function that -/// needs the duroxide client. +/// Returns `false` if `._worker_ready` does not exist, has no +/// row, or has a `schema_version` below `WORKER_SCHEMA_VERSION`. This is a fast +/// SPI read called once per session on the first call to any `df.*` function +/// that needs the duroxide client. fn is_worker_ready() -> bool { + let schema = backend_duroxide_schema(); + // First check if the readiness table exists via the catalogue. Querying // the non-existent table directly would raise a PostgreSQL ERROR that // aborts the current (sub)transaction — even if caught in Rust. - let table_exists = Spi::get_one::( + let table_exists = Spi::get_one_with_args::( "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_tables \ - WHERE schemaname = 'duroxide' AND tablename = '_worker_ready')", + WHERE schemaname = $1 AND tablename = '_worker_ready')", + &[schema.into()], ) .ok() .flatten() @@ -44,7 +47,10 @@ fn is_worker_ready() -> bool { } Spi::get_one_with_args::( - "SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= $1)", + &format!( + "SELECT EXISTS(SELECT 1 FROM {}._worker_ready WHERE schema_version >= $1)", + schema + ), &[crate::WORKER_SCHEMA_VERSION.into()], ) .ok() @@ -76,6 +82,7 @@ fn get_duroxide_client() -> Result<&'static Client, String> { let rt = get_client_runtime(); let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); rt.block_on(async { // Limit backend provider to 1 connection — backends need minimal duroxide @@ -83,7 +90,7 @@ fn get_duroxide_client() -> Result<&'static Client, String> { // (new_current_thread). Note: std::env::set_var becomes unsafe in Rust 2024 edition. std::env::set_var("DUROXIDE_PG_POOL_MAX", "1"); - let store = new_backend_provider(&pg_conn_str).await?; + let store = new_backend_provider(&pg_conn_str, schema).await?; let _ = DUROXIDE_CLIENT.set(Client::new(store)); DUROXIDE_CLIENT diff --git a/src/dsl.rs b/src/dsl.rs index 0b8da51..8db0910 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -47,11 +47,11 @@ pub fn version() -> String { /// Debug function to see what duroxide connection is being used #[pg_extern(schema = "df")] pub fn debug_connection() -> String { - use crate::types::{postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{backend_duroxide_schema, postgres_connection_string}; format!( "{} (schema: {})", postgres_connection_string(), - DUROXIDE_SCHEMA + backend_duroxide_schema() ) } diff --git a/src/explain.rs b/src/explain.rs index 499afa2..7c5dab9 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -124,10 +124,11 @@ fn explain_instance(instance_id: &str) -> String { /// Get instance info from Duroxide store fn get_duroxide_instance_info(instance_id: &str) -> (String, Option) { - use crate::types::{new_backend_provider, postgres_connection_string}; + use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; use duroxide::Client; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() @@ -138,7 +139,7 @@ fn get_duroxide_instance_info(instance_id: &str) -> (String, Option) { }; rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, schema).await { Ok(s) => s, Err(_) => return (String::new(), None), }; diff --git a/src/lib.rs b/src/lib.rs index 7dd88d9..fc5a533 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -620,11 +620,26 @@ END $$; extension_sql!( r#" --- The duroxide schema is created here so the extension owns it. --- No IF NOT EXISTS: fails loudly if a duroxide schema already exists, +-- The duroxide provider schema is created here so the extension owns it. +-- No IF NOT EXISTS: fails loudly if a _duroxide schema already exists, -- preventing adoption of a potentially attacker-crafted schema. -- The background worker populates this schema at startup via ApplyAll. -CREATE SCHEMA duroxide; +-- +-- Fresh installs use the '_duroxide' schema. Installs that originated on +-- pg_durable <= 0.2.2 keep the legacy 'duroxide' schema; the upgrade script +-- pg_durable--0.2.2--0.2.3.sql defines df.duroxide_schema() to return +-- 'duroxide' for those installs. Both backend sessions and the background +-- worker call df.duroxide_schema() to discover which schema to use, falling +-- back to 'duroxide' when the helper is absent (installs predating it). +CREATE SCHEMA _duroxide; + +-- Returns the name of the duroxide provider schema selected for this install. +-- Fresh installs return '_duroxide'. The body is version-specific: the upgrade +-- script for pre-existing installs replaces it to return 'duroxide'. +CREATE FUNCTION df.duroxide_schema() RETURNS text + LANGUAGE sql IMMUTABLE PARALLEL SAFE + SET search_path = pg_catalog, pg_temp + AS $$ SELECT '_duroxide'::text $$; "#, name = "create_duroxide_schema", requires = ["validate_database"] @@ -800,10 +815,13 @@ mod tests { /// Ensure the Duroxide store exists and is ready fn ensure_store_ready() -> Result { - use crate::types::{new_backend_provider, postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{ + backend_duroxide_schema, new_backend_provider, postgres_connection_string, + }; use std::time::{Duration, Instant}; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -816,8 +834,8 @@ mod tests { let timeout = Duration::from_secs(10); loop { - match new_backend_provider(&pg_conn_str).await { - Ok(_) => return Ok(format!("{pg_conn_str} (schema: {DUROXIDE_SCHEMA})")), + match new_backend_provider(&pg_conn_str, schema).await { + Ok(_) => return Ok(format!("{pg_conn_str} (schema: {schema})")), Err(e) => { if start.elapsed() > timeout { return Err(format!( @@ -835,7 +853,9 @@ mod tests { /// Wait for a durable function to complete, polling Duroxide status fn wait_for_completion(instance_id: &str, timeout_secs: u64) -> Result { - use crate::types::{new_backend_provider, postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{ + backend_duroxide_schema, new_backend_provider, postgres_connection_string, + }; use duroxide::Client; use std::time::{Duration, Instant}; @@ -843,6 +863,7 @@ mod tests { let _ = ensure_store_ready()?; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let start = Instant::now(); let timeout = Duration::from_secs(timeout_secs); @@ -852,7 +873,7 @@ mod tests { .map_err(|e| format!("Failed to create runtime: {e}"))?; rt.block_on(async { - let store = new_backend_provider(&pg_conn_str).await?; + let store = new_backend_provider(&pg_conn_str, schema).await?; let client = Client::new(store); loop { @@ -892,11 +913,14 @@ mod tests { /// Get the current status from Duroxide fn get_duroxide_status(instance_id: &str) -> Option { - use crate::types::{new_backend_provider, postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{ + backend_duroxide_schema, new_backend_provider, postgres_connection_string, + }; use duroxide::Client; let _ = ensure_store_ready().ok()?; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -904,7 +928,7 @@ mod tests { .ok()?; rt.block_on(async { - let store = new_backend_provider(&pg_conn_str).await.ok()?; + let store = new_backend_provider(&pg_conn_str, schema).await.ok()?; let client = Client::new(store); client .get_instance_info(instance_id) diff --git a/src/monitoring.rs b/src/monitoring.rs index a42c74d..723ce07 100644 --- a/src/monitoring.rs +++ b/src/monitoring.rs @@ -8,7 +8,7 @@ use duroxide::Client; use pgrx::prelude::*; -use crate::types::{new_backend_provider, postgres_connection_string}; +use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; // ============================================================================ // Monitoring Functions @@ -36,6 +36,7 @@ pub fn list_instances( let limit_count = limit_count.min(10000); let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); // Query df.instances via SPI first — RLS filters to calling user's rows only. // We also fetch status here so that all three monitoring APIs (df.status(), @@ -82,7 +83,7 @@ pub fn list_instances( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -128,6 +129,7 @@ pub fn instance_info( ), > { let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); let instance_id_str = instance_id.to_string(); // Ownership check: SPI goes through RLS, returning NULL for non-owned instances. @@ -165,7 +167,7 @@ pub fn instance_info( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -205,6 +207,7 @@ pub fn instance_executions( ), > { let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); let instance_id_owned = instance_id.to_string(); // Ownership check: SPI goes through RLS, so non-owned instances are invisible. @@ -229,7 +232,7 @@ pub fn instance_executions( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -282,6 +285,7 @@ pub fn metrics() -> TableIterator< ), > { let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() @@ -292,7 +296,7 @@ pub fn metrics() -> TableIterator< }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -339,6 +343,7 @@ pub fn instance_nodes( let instance_id = instance_id_param.to_string(); let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); // Get node definitions from PostgreSQL (including status, result and updated_at) let node_defs: Vec<( @@ -392,7 +397,7 @@ pub fn instance_nodes( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; diff --git a/src/types.rs b/src/types.rs index dd9c56b..27806d1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::ffi::CString; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::time::Duration; use uuid::Uuid; @@ -218,16 +218,87 @@ pub async fn connect_as_user( Ok(conn) } -/// Schema name for Duroxide internal tables -pub const DUROXIDE_SCHEMA: &str = "duroxide"; +/// Legacy duroxide provider schema name used by installs created before the +/// `df.duroxide_schema()` helper existed (pg_durable ≤ 0.2.2). It is the only +/// fallback when that helper is absent, and the value the upgrade script pins +/// existing clusters to. +pub const LEGACY_DUROXIDE_SCHEMA: &str = "duroxide"; + +/// Resolve the duroxide provider schema name by calling the extension-owned +/// `df.duroxide_schema()` helper. +/// +/// Returns [`LEGACY_DUROXIDE_SCHEMA`] when the helper does not exist (an install +/// that predates it — e.g. a new `.so` deployed against a ≤0.2.2 schema without +/// running `ALTER EXTENSION pg_durable UPDATE`). The presence check uses the +/// catalog rather than catching `42883` so it never aborts the surrounding +/// (sub)transaction in a backend session. +fn resolve_duroxide_schema_spi() -> String { + let helper_exists = Spi::get_one::( + "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' AND p.proname = 'duroxide_schema' AND p.pronargs = 0)", + ) + .ok() + .flatten() + .unwrap_or(false); + + if !helper_exists { + return LEGACY_DUROXIDE_SCHEMA.to_string(); + } + + match Spi::get_one::("SELECT df.duroxide_schema()") { + Ok(Some(s)) if !s.is_empty() => s, + _ => LEGACY_DUROXIDE_SCHEMA.to_string(), + } +} + +/// Resolve the duroxide provider schema for the current backend session, +/// caching it for the session lifetime. The value cannot change without an +/// extension upgrade, which requires a reconnect to observe reliably, so a +/// per-session cache is safe. +pub fn backend_duroxide_schema() -> &'static str { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA.get_or_init(resolve_duroxide_schema_spi) +} + +/// Resolve the duroxide provider schema name from the background worker using an +/// async pool. Mirrors [`resolve_duroxide_schema_spi`] but for the BGW context. +/// The BGW resolves this once per epoch (after the extension is detected) rather +/// than caching for the process lifetime, because drop+recreate can switch the +/// provider schema within a single worker lifetime. +pub async fn resolve_duroxide_schema_pool(pool: &sqlx::PgPool) -> String { + let helper_exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM pg_proc p \ + JOIN pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' AND p.proname = 'duroxide_schema' AND p.pronargs = 0)", + ) + .fetch_one(pool) + .await + .unwrap_or(false); + + if !helper_exists { + return LEGACY_DUROXIDE_SCHEMA.to_string(); + } + + match sqlx::query_scalar::<_, String>("SELECT df.duroxide_schema()") + .fetch_one(pool) + .await + { + Ok(s) if !s.is_empty() => s, + _ => LEGACY_DUROXIDE_SCHEMA.to_string(), + } +} /// Create a `ProviderConfig` for backend (request/response) operations. /// /// - `VerifyOnly`: never create schema/tables, reject unknown migrations. /// Backend sessions must not run DDL — the BGW owns schema lifecycle. -pub fn backend_provider_config(database_url: &str) -> duroxide_pg::ProviderConfig { +pub fn backend_provider_config( + database_url: &str, + schema_name: &str, +) -> duroxide_pg::ProviderConfig { let mut config = duroxide_pg::ProviderConfig::url(database_url); - config.schema_name = Some(DUROXIDE_SCHEMA.to_string()); + config.schema_name = Some(schema_name.to_string()); config.migration_policy = duroxide_pg::MigrationPolicy::VerifyOnly; config } @@ -235,22 +306,29 @@ pub fn backend_provider_config(database_url: &str) -> duroxide_pg::ProviderConfi /// Create a backend provider for request/response operations. pub async fn new_backend_provider( database_url: &str, + schema_name: &str, ) -> Result, String> { - duroxide_pg::PostgresProvider::new_with_config(backend_provider_config(database_url)) - .await - .map(Arc::new) - .map_err(|e| format!("Failed to connect to duroxide store: {e}")) + duroxide_pg::PostgresProvider::new_with_config(backend_provider_config( + database_url, + schema_name, + )) + .await + .map(Arc::new) + .map_err(|e| format!("Failed to connect to duroxide store: {e}")) } /// Create a `ProviderConfig` for the background worker runtime. /// /// - `ApplyAll`: applies pending duroxide migrations at startup; creates tables -/// inside the extension-owned `duroxide` schema. Safe because the BGW verifies +/// inside the extension-owned provider schema. Safe because the BGW verifies /// schema ownership via `pg_depend` before calling /// `PostgresProvider::new_with_config`. -pub fn worker_provider_config(database_url: &str) -> duroxide_pg::ProviderConfig { +pub fn worker_provider_config( + database_url: &str, + schema_name: &str, +) -> duroxide_pg::ProviderConfig { let mut config = duroxide_pg::ProviderConfig::url(database_url); - config.schema_name = Some(DUROXIDE_SCHEMA.to_string()); + config.schema_name = Some(schema_name.to_string()); config.migration_policy = duroxide_pg::MigrationPolicy::ApplyAll; config } diff --git a/src/worker.rs b/src/worker.rs index 6a1cdee..4f6d66a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -18,7 +18,7 @@ use tracing_subscriber::EnvFilter; use crate::registry::{create_activity_registry, create_orchestration_registry}; use crate::types::{ get_max_duroxide_connections, get_max_management_connections, get_max_user_connections, - postgres_connection_string, worker_provider_config, DUROXIDE_SCHEMA, + postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, }; /// Initialize tracing subscriber for duroxide logs. @@ -92,9 +92,8 @@ async fn run_duroxide_runtime() { let pg_conn_str = postgres_connection_string(); log!( - "pg_durable: background worker connected to PostgreSQL at {} (schema: {})", + "pg_durable: background worker connected to PostgreSQL at {}", pg_conn_str, - DUROXIDE_SCHEMA ); // Validate connection limit GUCs at startup @@ -161,8 +160,23 @@ async fn run_duroxide_runtime() { break; } - let Some(duroxide_runtime) = - initialize_duroxide_runtime(&pg_conn_str, INIT_RETRY_INTERVAL, &mgmt_pool).await + // Resolve the duroxide provider schema for this epoch. The extension may + // have been dropped and recreated with a different schema version (e.g. + // a fresh `_duroxide` install vs. a legacy `duroxide` install), so we + // must re-resolve after every CREATE EXTENSION rather than once at startup. + let duroxide_schema = resolve_duroxide_schema_pool(&mgmt_pool).await; + log!( + "pg_durable: using duroxide provider schema '{}' for this epoch", + duroxide_schema + ); + + let Some(duroxide_runtime) = initialize_duroxide_runtime( + &pg_conn_str, + INIT_RETRY_INTERVAL, + &mgmt_pool, + &duroxide_schema, + ) + .await else { // Shutdown requested or extension dropped while initializing. continue; @@ -171,7 +185,7 @@ async fn run_duroxide_runtime() { // Write the worker readiness record so backend sessions know the // duroxide schema is fully initialized for this schema version. // Skipped if the row already has the current WORKER_SCHEMA_VERSION. - if let Err(e) = write_worker_ready(&mgmt_pool).await { + if let Err(e) = write_worker_ready(&mgmt_pool, &duroxide_schema).await { log!("pg_durable: failed to write worker readiness record: {}", e); } @@ -233,7 +247,7 @@ async fn check_extension_exists(pool: &sqlx::PgPool) -> bool { /// /// This prevents the BGW from running ApplyAll into an attacker-crafted schema /// that happens to be named "duroxide" but was not created by CREATE EXTENSION. -async fn check_duroxide_schema_owned(pool: &sqlx::PgPool) -> bool { +async fn check_duroxide_schema_owned(pool: &sqlx::PgPool, schema_name: &str) -> bool { let result: Result<(bool,), sqlx::Error> = sqlx::query_as( "SELECT EXISTS ( SELECT 1 @@ -245,9 +259,10 @@ async fn check_duroxide_schema_owned(pool: &sqlx::PgPool) -> bool { JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' + WHERE n.nspname = $1 )", ) + .bind(schema_name) .fetch_one(pool) .await; @@ -262,8 +277,11 @@ async fn check_duroxide_schema_owned(pool: &sqlx::PgPool) -> bool { /// the schema namespace itself). On upgrades from v0.1.1 — where CREATE EXTENSION /// embedded the full duroxide DDL — this de-registers those embedded objects from /// the extension before the BGW applies any new migrations. -async fn release_extension_owned_duroxide_objects(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> { - sqlx::query( +async fn release_extension_owned_duroxide_objects( + pool: &sqlx::PgPool, + schema_name: &str, +) -> Result<(), sqlx::Error> { + sqlx::query(&format!( r#"DO $$ DECLARE r RECORD; @@ -285,7 +303,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' + WHERE n.nspname = '{schema}' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP TRIGGER ' || r.trigger_name || ' ON ' || r.table_name; @@ -303,7 +321,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' + WHERE n.nspname = '{schema}' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP FUNCTION ' || r.sig; END LOOP; @@ -320,7 +338,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' AND c.relkind = 'r' + WHERE n.nspname = '{schema}' AND c.relkind = 'r' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP TABLE ' || r.name; END LOOP; @@ -339,7 +357,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' AND c.relkind = 'i' + WHERE n.nspname = '{schema}' AND c.relkind = 'i' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP INDEX ' || r.name; END LOOP; @@ -356,12 +374,13 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' AND c.relkind = 'S' + WHERE n.nspname = '{schema}' AND c.relkind = 'S' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP SEQUENCE ' || r.name; END LOOP; END $$"#, - ) + schema = schema_name + )) .execute(pool) .await?; Ok(()) @@ -371,21 +390,21 @@ END $$"#, /// schema namespace entry itself) is still registered as an extension member. /// Used to short-circuit `release_extension_owned_duroxide_objects` on the /// common path (fresh 0.2.0 installs and all restarts after the first upgrade). -async fn has_extension_owned_duroxide_objects(pool: &sqlx::PgPool) -> bool { +async fn has_extension_owned_duroxide_objects(pool: &sqlx::PgPool, schema_name: &str) -> bool { let result: Result<(bool,), sqlx::Error> = sqlx::query_as( "SELECT EXISTS ( SELECT 1 FROM pg_depend d JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' JOIN pg_class c ON c.oid = d.objid - JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = 'duroxide' + JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = $1 WHERE d.classid = 'pg_class'::regclass AND d.deptype = 'e' UNION ALL SELECT 1 FROM pg_depend d JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' JOIN pg_proc p ON p.oid = d.objid - JOIN pg_namespace n ON n.oid = p.pronamespace AND n.nspname = 'duroxide' + JOIN pg_namespace n ON n.oid = p.pronamespace AND n.nspname = $1 WHERE d.classid = 'pg_proc'::regclass AND d.deptype = 'e' UNION ALL SELECT 1 @@ -393,10 +412,11 @@ async fn has_extension_owned_duroxide_objects(pool: &sqlx::PgPool) -> bool { JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' JOIN pg_trigger t ON t.oid = d.objid JOIN pg_class c ON c.oid = t.tgrelid - JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = 'duroxide' + JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = $1 WHERE d.classid = 'pg_trigger'::regclass AND d.deptype = 'e' )", ) + .bind(schema_name) .fetch_one(pool) .await; result.map(|(b,)| b).unwrap_or(false) @@ -406,6 +426,7 @@ async fn initialize_duroxide_runtime( pg_conn_str: &str, retry_interval: Duration, mgmt_pool: &sqlx::PgPool, + schema_name: &str, ) -> Option> { log!("pg_durable: initializing duroxide runtime..."); @@ -434,7 +455,7 @@ async fn initialize_duroxide_runtime( return None; } - if !check_duroxide_schema_owned(mgmt_pool).await { + if !check_duroxide_schema_owned(mgmt_pool, schema_name).await { log!( "pg_durable: duroxide schema missing or not extension-owned \ (CREATE EXTENSION may still be in progress) — will retry" @@ -449,8 +470,8 @@ async fn initialize_duroxide_runtime( // embedded DDL from the extension before ApplyAll runs. // The existence check avoids executing the five-loop DO block on every // clean restart once the upgrade has already been applied. - if has_extension_owned_duroxide_objects(mgmt_pool).await { - if let Err(e) = release_extension_owned_duroxide_objects(mgmt_pool).await { + if has_extension_owned_duroxide_objects(mgmt_pool, schema_name).await { + if let Err(e) = release_extension_owned_duroxide_objects(mgmt_pool, schema_name).await { log!( "pg_durable: failed to release extension-owned duroxide objects (will retry): {}", e @@ -460,18 +481,22 @@ async fn initialize_duroxide_runtime( } } - let store = - match PostgresProvider::new_with_config(worker_provider_config(pg_conn_str)).await { - Ok(s) => Arc::new(s), - Err(e) => { - log!( - "pg_durable: failed to create PostgreSQL store (will retry): {}", - e - ); - tokio::time::sleep(retry_interval).await; - continue; - } - }; + let store = match PostgresProvider::new_with_config(worker_provider_config( + pg_conn_str, + schema_name, + )) + .await + { + Ok(s) => Arc::new(s), + Err(e) => { + log!( + "pg_durable: failed to create PostgreSQL store (will retry): {}", + e + ); + tokio::time::sleep(retry_interval).await; + continue; + } + }; // Reuse the management pool for activities (graph loading, status updates). // The former dedicated activity pool with its df.in_workflow hook is no @@ -510,35 +535,43 @@ async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result Result<(), sqlx::Error> { - sqlx::query( - "CREATE TABLE IF NOT EXISTS duroxide._worker_ready ( +async fn write_worker_ready(pool: &sqlx::PgPool, schema_name: &str) -> Result<(), sqlx::Error> { + sqlx::query(&format!( + "CREATE TABLE IF NOT EXISTS {schema}._worker_ready ( sentinel BOOLEAN PRIMARY KEY DEFAULT TRUE, CONSTRAINT only_one_sentinel CHECK (sentinel), schema_version INT NOT NULL, initialized_at TIMESTAMPTZ NOT NULL DEFAULT now() )", - ) + schema = schema_name + )) .execute(pool) .await?; // Allow non-superuser sessions to read the readiness record via // is_worker_ready() which runs SPI in the caller's security context. - sqlx::query("GRANT USAGE ON SCHEMA duroxide TO PUBLIC") - .execute(pool) - .await?; - sqlx::query("GRANT SELECT ON duroxide._worker_ready TO PUBLIC") - .execute(pool) - .await?; + sqlx::query(&format!( + "GRANT USAGE ON SCHEMA {schema} TO PUBLIC", + schema = schema_name + )) + .execute(pool) + .await?; + sqlx::query(&format!( + "GRANT SELECT ON {schema}._worker_ready TO PUBLIC", + schema = schema_name + )) + .execute(pool) + .await?; - sqlx::query( - "INSERT INTO duroxide._worker_ready (sentinel, schema_version, initialized_at) \ + sqlx::query(&format!( + "INSERT INTO {schema}._worker_ready (sentinel, schema_version, initialized_at) \ VALUES (TRUE, $1, now()) \ ON CONFLICT (sentinel) DO UPDATE SET \ schema_version = EXCLUDED.schema_version, \ initialized_at = EXCLUDED.initialized_at \ - WHERE duroxide._worker_ready.schema_version != EXCLUDED.schema_version", - ) + WHERE {schema}._worker_ready.schema_version != EXCLUDED.schema_version", + schema = schema_name + )) .bind(crate::WORKER_SCHEMA_VERSION) .execute(pool) .await?; diff --git a/tests/e2e/sql/00_setup_playground.sql b/tests/e2e/sql/00_setup_playground.sql index 616b64e..fb89350 100644 --- a/tests/e2e/sql/00_setup_playground.sql +++ b/tests/e2e/sql/00_setup_playground.sql @@ -38,15 +38,19 @@ DECLARE max_attempts INT := p_timeout_secs * 10; -- poll every 100ms table_exists BOOLEAN; is_ready BOOLEAN; + dx_schema TEXT := df.duroxide_schema(); BEGIN LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO is_ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO is_ready; ELSE is_ready := FALSE; END IF; diff --git a/tests/e2e/sql/12_extension_lifecycle.sql b/tests/e2e/sql/12_extension_lifecycle.sql index 7da90a3..d66635b 100644 --- a/tests/e2e/sql/12_extension_lifecycle.sql +++ b/tests/e2e/sql/12_extension_lifecycle.sql @@ -13,25 +13,26 @@ -- Ensure a clean starting point SELECT public._e2e_drop_extension_safe(); --- 1) Verify BGW does not create duroxide schema pre-extension +-- 1) Verify BGW does not create the duroxide provider schema pre-extension. +-- Fresh installs use the '_duroxide' schema (created by CREATE EXTENSION). DO $$ DECLARE exists_before BOOLEAN; exists_after BOOLEAN; BEGIN - SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'duroxide') INTO exists_before; + SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '_duroxide') INTO exists_before; IF exists_before THEN - RAISE EXCEPTION 'TEST FAILED: duroxide schema exists before CREATE EXTENSION'; + RAISE EXCEPTION 'TEST FAILED: _duroxide schema exists before CREATE EXTENSION'; END IF; PERFORM pg_sleep(6); - SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'duroxide') INTO exists_after; + SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '_duroxide') INTO exists_after; IF exists_after THEN - RAISE EXCEPTION 'TEST FAILED: duroxide schema was created before CREATE EXTENSION'; + RAISE EXCEPTION 'TEST FAILED: _duroxide schema was created before CREATE EXTENSION'; END IF; - RAISE NOTICE 'PASSED: BGW did not create duroxide schema pre-extension'; + RAISE NOTICE 'PASSED: BGW did not create _duroxide schema pre-extension'; END $$; -- 2) Create extension and run a simple workflow @@ -78,12 +79,12 @@ DO $$ DECLARE schema_exists BOOLEAN; BEGIN - SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'duroxide') INTO schema_exists; + SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '_duroxide') INTO schema_exists; IF schema_exists THEN - RAISE EXCEPTION 'TEST FAILED: duroxide schema still exists after DROP EXTENSION'; + RAISE EXCEPTION 'TEST FAILED: _duroxide schema still exists after DROP EXTENSION'; END IF; - RAISE NOTICE 'PASSED: DROP EXTENSION removed duroxide schema'; + RAISE NOTICE 'PASSED: DROP EXTENSION removed _duroxide schema'; END $$; -- 4) Re-create extension and run another workflow diff --git a/tests/e2e/sql/46_connection_limit_startup_validation.sql b/tests/e2e/sql/46_connection_limit_startup_validation.sql index dd00bd6..e0093ad 100644 --- a/tests/e2e/sql/46_connection_limit_startup_validation.sql +++ b/tests/e2e/sql/46_connection_limit_startup_validation.sql @@ -13,16 +13,20 @@ DECLARE ready BOOLEAN; table_exists BOOLEAN; attempts INT := 0; + dx_schema TEXT := df.duroxide_schema(); BEGIN - -- Poll duroxide._worker_ready for up to 15 seconds — the worker should never become ready. + -- Poll ._worker_ready for up to 15 seconds — the worker should never become ready. LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO ready; ELSE ready := FALSE; END IF;