From d33d3a5cb1e318e9aefabccb202d58fc06929dea Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Tue, 2 Jun 2026 20:45:20 +0000 Subject: [PATCH 1/3] Add duroxide schema move plan --- docs/move-duroxide-schema.md | 257 +++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 docs/move-duroxide-schema.md diff --git a/docs/move-duroxide-schema.md b/docs/move-duroxide-schema.md new file mode 100644 index 0000000..9aa297c --- /dev/null +++ b/docs/move-duroxide-schema.md @@ -0,0 +1,257 @@ +# Move Duroxide Provider Schema + +Issue: [Move PostgresProvider's schema out of "duroxide" microsoft/pg_durable#175](https://github.com/microsoft/pg_durable/issues/175) + +## Goal + +Move pg_durable's internal duroxide provider schema away from the generic `duroxide` name for new installations, while preserving existing installations that already have an extension-owned `duroxide` schema. + +The proposed default provider schema name for new installations is: + +```text +df-duroxide +``` + +The schema name should also be configurable through a postmaster-context, superuser-only GUC so deployments can choose a different provider schema before creating the extension. + +## Current State + +The provider schema is currently hardcoded as `duroxide` in several places: + +- `src/types.rs` defines `DUROXIDE_SCHEMA = "duroxide"` and passes it to both backend and worker `duroxide_pg::ProviderConfig` values. +- `src/lib.rs` creates `CREATE SCHEMA duroxide;` as an extension-owned schema during `CREATE EXTENSION pg_durable`. +- `src/worker.rs` checks that `duroxide` exists and is owned by the `pg_durable` extension before running `MigrationPolicy::ApplyAll`. +- `src/worker.rs` writes readiness state to `duroxide._worker_ready`. +- `src/client.rs`, E2E setup SQL, upgrade tests, and helper scripts poll `duroxide._worker_ready`. + +The security model intentionally depends on two properties: + +1. `CREATE EXTENSION` creates the provider schema without `IF NOT EXISTS`, so a pre-existing schema with that name blocks installation instead of being adopted. +2. The background worker verifies the provider schema is extension-owned before applying duroxide migrations. + +Any implementation must preserve both properties. + +## Compatibility Requirement + +Already-shipped versions in Azure and open source assume the provider schema is named `duroxide`: + +- Azure-shipped: v0.1.1, v0.2.1, v0.2.2 in progress +- Open source supported baseline: v0.2.2 + +Therefore, a new binary must continue to work with existing databases where `pg_durable` already owns a `duroxide` schema. Existing instances and engine state must remain in place and must not be migrated implicitly to a different schema. + +The compatibility rule should be: + +- If the installed extension already owns `duroxide`, use `duroxide`. +- If the installed extension already owns the configured/new schema, use that schema. +- Do not rename, copy, drop, or migrate provider state automatically. +- A fresh `CREATE EXTENSION pg_durable` under the new SQL should create and use `df-duroxide` by default. + +## Proposed GUC + +Add a new GUC: + +```text +pg_durable.duroxide_schema = 'df-duroxide' +``` + +Recommended properties: + +- Context: `Postmaster` +- Flags: `SUPERUSER_ONLY` +- Default: `df-duroxide` +- Validated as a PostgreSQL identifier/name suitable for a schema name +- Documented as install-time configuration, not a runtime migration switch + +The setting should mean: "which provider schema should new extension installs create and which schema should the background worker expect when there is no legacy extension-owned `duroxide` schema." + +## Desired Behavior + +### Fresh install, default GUC + +1. `pg_durable.duroxide_schema` is unset or set to `df-duroxide`. +2. `CREATE EXTENSION pg_durable` creates an extension-owned schema named `df-duroxide`. +3. The background worker verifies `df-duroxide` is extension-owned. +4. The worker runs duroxide-pg migrations in `df-duroxide`. +5. Backend sessions use `df-duroxide._worker_ready` and a provider configured with `schema_name = "df-duroxide"`. + +### Fresh install, custom GUC + +1. Admin sets `pg_durable.duroxide_schema = 'custom_schema'` in `postgresql.conf` and restarts PostgreSQL. +2. `CREATE EXTENSION pg_durable` creates an extension-owned schema named `custom_schema`. +3. The worker and backend sessions use `custom_schema`. + +### Existing install using `duroxide`, new binary deployed, GUC unset/default + +1. The database already has `pg_durable` installed and owns schema `duroxide`. +2. The new binary default is `df-duroxide`. +3. The worker detects the extension-owned `duroxide` schema and keeps using it. +4. No provider state is moved. +5. Existing instances continue to run and monitoring APIs continue to work. + +This is the most important backward-compatibility path. + +### Existing install using `duroxide`, admin changes GUC to `df-duroxide` or custom value without dropping extension + +Current requested behavior: do not delete or migrate the old schema. The extension still has the schema it already owns, while the worker is configured to wait for the new schema name to exist and be extension-owned. Functions will not make progress until the extension is dropped and recreated with the new setting. + +This mirrors the existing operational hazard for `pg_durable.database`: changing the GUC after extension creation can leave the worker watching a database/schema state that does not match the existing extension installation. + +This requirement has one tension with the previous compatibility path: if the new binary always falls back to extension-owned `duroxide`, then changing the GUC would not intentionally stall the worker. We need a crisp rule to distinguish "legacy default compatibility" from "admin explicitly changed the schema setting." See Open Questions. + +### Drop and recreate after changing GUC + +1. Admin sets `pg_durable.duroxide_schema` to the desired schema and restarts. +2. Admin runs `DROP EXTENSION pg_durable CASCADE`. +3. The extension-owned provider schema and provider state are dropped by PostgreSQL cascade. +4. Admin runs `CREATE EXTENSION pg_durable`. +5. The new extension creates the configured schema and starts with fresh provider state. + +This is a destructive reset of durable engine state, not a migration. + +### Upgrade script path + +`ALTER EXTENSION pg_durable UPDATE` should not rename `duroxide` or create `df-duroxide` for existing installations. The upgrade path should preserve existing provider state and should leave schema selection to runtime detection/configuration. + +Scenario A schema-equivalence tests must account for the fact that provider schema state is intentionally not compared as part of `df` schema snapshots. Scenario B1 is the critical test: new `.so` against an older compatible schema must still use `duroxide`. + +## Open Questions + +### How do we detect an explicit schema GUC change? + +Postmaster GUC access normally gives the effective value, not whether it came from the compiled default or from a config file. If the compiled default changes from `duroxide` to `df-duroxide`, an existing installation with no explicit setting will also observe `df-duroxide`. + +That means these two cases look identical unless we add another signal: + +1. Existing install, admin did nothing, new binary default is now `df-duroxide`. +2. Existing install, admin explicitly set `pg_durable.duroxide_schema = 'df-duroxide'` without dropping/recreating. + +The requested behavior wants case 1 to keep using `duroxide`, but case 2 to wait for `df-duroxide`. We need a way to tell them apart. + +Possible approaches: + +- Use PostgreSQL GUC source inspection if pgrx exposes enough information or if we can safely call the relevant PostgreSQL APIs. Treat `PGC_S_DEFAULT` as compatibility mode and non-default config sources as explicit admin intent. +- Avoid trying to detect explicitness. Rule: an existing extension-owned `duroxide` schema always wins. This is simpler and safer for upgrades, but changing the GUC alone would not stall/move an old install. +- Persist the selected provider schema in `df` metadata during `CREATE EXTENSION`. This would make runtime behavior explicit after install, but it requires new DDL and does not help already-shipped installs unless absence of metadata means legacy `duroxide`. +- Create both a GUC and a SQL helper that records the selected schema at install time. This is probably overkill unless PostgreSQL GUC source inspection is not viable. + +Recommendation: prefer explicit metadata if we want deterministic behavior independent of GUC-source quirks. Add a small `df._provider_config` or similar table with `duroxide_schema TEXT NOT NULL`, populated by install SQL from the GUC value. For old installs without the table/row, fallback to extension-owned `duroxide`. + +### Can `CREATE SCHEMA` use a GUC-derived dynamic name in extension SQL? + +Static extension SQL currently uses literal `CREATE SCHEMA duroxide;`. A configurable schema name probably requires a `DO` block that reads `current_setting('pg_durable.duroxide_schema')`, validates it, executes `CREATE SCHEMA %I`, and then runs `ALTER EXTENSION pg_durable ADD SCHEMA %I` if dynamic schema creation is not automatically registered as an extension member. + +This needs a prototype. The cheap check is to package/install locally and inspect `pg_depend` for the dynamically created schema. + +### Is `df-duroxide` an acceptable PostgreSQL schema name? + +Yes if quoted: `"df-duroxide"`. It is not a bare identifier because of the hyphen. All SQL that references it must use identifier quoting. Rust/provider config can pass the raw name, assuming duroxide-pg quotes identifiers correctly internally. pg_durable's own dynamic SQL must use `quote_ident()` or equivalent. + +This also affects test scripts and readiness probes: direct SQL must refer to `"df-duroxide"._worker_ready`, or better use formatted SQL with `%I`. + +### Should we choose `df_duroxide` instead? + +`df-duroxide` clearly signals an internal implementation schema and avoids ordinary bare-identifier collisions, but it increases quoting requirements and test churn. `df_duroxide` is simpler operationally. The current requested default is `df-duroxide`; keep it unless we decide the quoting burden is not worth it. + +## Implementation Plan + +### Phase 1: Schema-name abstraction + +- Replace the hardcoded `DUROXIDE_SCHEMA` constant with functions that return the selected provider schema. +- Add helpers for quoted identifier rendering in SQL snippets that must mention the schema directly. +- Update `backend_provider_config()` and `worker_provider_config()` to use the selected schema. +- Update debug/status messages to display the selected schema. + +### Phase 2: Install-time schema creation + +- Add `pg_durable.duroxide_schema` GUC in `src/lib.rs`. +- Replace literal `CREATE SCHEMA duroxide;` with dynamic schema creation using the configured name. +- Ensure the created schema is an extension member. +- Preserve the no-adoption rule: if the target schema already exists, `CREATE EXTENSION` must fail. +- Decide whether to persist the chosen schema in `df` metadata. + +### Phase 3: Runtime schema selection + +Recommended selection algorithm if metadata is added: + +1. If `df._provider_config.duroxide_schema` exists and has a value, use it. +2. Else if the extension owns `duroxide`, use `duroxide` for legacy compatibility. +3. Else use the current GUC value. + +If metadata is not added, the selection algorithm must explicitly resolve the open question about GUC explicitness. + +### Phase 4: Worker ownership and migration flow + +- Generalize `check_duroxide_schema_owned()` to accept the selected schema name. +- Generalize `has_extension_owned_duroxide_objects()` and `release_extension_owned_duroxide_objects()` to filter on the selected schema. +- Generalize `write_worker_ready()` to create/grant/upsert in the selected schema. +- Ensure all dynamic SQL uses identifier quoting. +- Keep `MigrationPolicy::ApplyAll` in the worker and `VerifyOnly` in backend sessions. + +### Phase 5: Backend readiness checks + +- Generalize `is_worker_ready()` in `src/client.rs` to check the selected schema's `_worker_ready` table. +- Avoid directly querying a possibly missing table; retain the current catalog-existence pre-check. +- Ensure non-superuser backend sessions can read readiness state in quoted/custom schemas. + +### Phase 6: Tests and scripts + +Add or update checks for: + +- Fresh install uses `df-duroxide` by default. +- Fresh install with custom `pg_durable.duroxide_schema` uses the custom schema. +- Pre-existing schema with the configured name blocks `CREATE EXTENSION`. +- New binary against old schema uses existing `duroxide` and can run a workflow. +- Changing the GUC without drop/recreate has the decided behavior and emits clear logs/errors. +- Drop/recreate after changing the GUC creates the new schema and no old provider state remains unless separately preserved by the admin outside the extension lifecycle. + +Touch points likely include: + +- `tests/e2e/sql/00_setup_playground.sql` +- `sql/00_init.sql` +- `scripts/test-e2e-local.sh` +- `scripts/test-upgrade.sh` +- Any E2E tests that directly reference `duroxide._worker_ready` + +### Phase 7: Documentation + +Update: + +- `docs/bgw-applies-migrations.md` +- `docs/extension_lifecycle.md` +- `docs/upgrade-testing.md` +- `USER_GUIDE.md` connection/troubleshooting sections if readiness probes or drop/recreate guidance changes + +Document clearly that changing `pg_durable.duroxide_schema` does not migrate existing durable state. + +## Validation Strategy + +Minimum validation after implementation: + +```bash +cargo fmt -p pg_durable -- --check +cargo build --features pg17 +./scripts/test-e2e-local.sh 00_setup_playground +./scripts/test-upgrade.sh --verbose +``` + +If time is short, prioritize a targeted E2E or SQL smoke test that verifies a default fresh install creates `df-duroxide`, and the upgrade B1 path still works with an existing `duroxide` schema. + +## Issue Update Draft + +Proposed summary to add to the GitHub issue: + +> We should make the duroxide provider schema configurable for new installs, with a new default of `df-duroxide`, but preserve existing installations that already have extension-owned `duroxide` provider state. The implementation needs to avoid automatic rename/copy/drop of provider state. Fresh installs should create the configured schema as an extension-owned object and the BGW should only run `ApplyAll` after verifying extension ownership. Existing installs should continue using their current extension-owned `duroxide` schema unless the admin intentionally drops and recreates the extension under a new setting. The main design question is how to distinguish an unchanged legacy install from an admin explicitly changing the new GUC without drop/recreate; options are GUC-source inspection, always letting legacy `duroxide` win, or persisting the selected provider schema in `df` metadata at install time. + +## Current Recommendation + +Do not implement the schema rename as a simple constant change. The safe implementation needs a selected-schema abstraction and a persisted install-time provider schema record, or an equally precise rule for GUC explicitness. + +The metadata approach is the most deterministic: + +- New installs record and use the configured schema. +- Old installs without metadata keep using `duroxide`. +- Changing the GUC after install does not mutate the recorded schema and therefore does not migrate state. +- Drop/recreate is the supported way to adopt a different provider schema. + +This differs slightly from the requested "worker waits for the new schema if GUC changes" behavior, but it avoids ambiguity and matches PostgreSQL extension practice: install-time state should define which schema belongs to that extension instance. \ No newline at end of file From efbdfc93da2fb2e8ba2c35c0ce1482d39b147f52 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Wed, 3 Jun 2026 22:04:18 +0000 Subject: [PATCH 2/3] Revise move-duroxide-schema proposal: drop GUC, use df.duroxide_schema() --- docs/move-duroxide-schema.md | 249 ++++++++++++++++------------------- 1 file changed, 111 insertions(+), 138 deletions(-) diff --git a/docs/move-duroxide-schema.md b/docs/move-duroxide-schema.md index 9aa297c..eeb859f 100644 --- a/docs/move-duroxide-schema.md +++ b/docs/move-duroxide-schema.md @@ -6,13 +6,19 @@ Issue: [Move PostgresProvider's schema out of "duroxide" microsoft/pg_durable#17 Move pg_durable's internal duroxide provider schema away from the generic `duroxide` name for new installations, while preserving existing installations that already have an extension-owned `duroxide` schema. -The proposed default provider schema name for new installations is: +The chosen default provider schema name for new installations is: ```text -df-duroxide +_duroxide ``` -The schema name should also be configurable through a postmaster-context, superuser-only GUC so deployments can choose a different provider schema before creating the extension. +Rationale for the name: + +- Bare identifier (no quoting required anywhere). `_` is a legal leading character for PostgreSQL identifiers. +- The leading underscore signals "internal / not part of the public API," matching common PostgreSQL convention for implementation-detail objects. +- Makes the relationship to duroxide-pg obvious without overloading a more generic prefix like `_df`. + +There is no GUC. The schema name is an implementation detail of pg_durable, not an operator-facing setting. ## Current State @@ -40,170 +46,118 @@ Already-shipped versions in Azure and open source assume the provider schema is Therefore, a new binary must continue to work with existing databases where `pg_durable` already owns a `duroxide` schema. Existing instances and engine state must remain in place and must not be migrated implicitly to a different schema. -The compatibility rule should be: +The compatibility rules: -- If the installed extension already owns `duroxide`, use `duroxide`. -- If the installed extension already owns the configured/new schema, use that schema. +- If the install records `duroxide` as its provider schema, use `duroxide`. +- If the install records `_duroxide` (or any future name) as its provider schema, use that name. - Do not rename, copy, drop, or migrate provider state automatically. -- A fresh `CREATE EXTENSION pg_durable` under the new SQL should create and use `df-duroxide` by default. - -## Proposed GUC - -Add a new GUC: - -```text -pg_durable.duroxide_schema = 'df-duroxide' -``` - -Recommended properties: - -- Context: `Postmaster` -- Flags: `SUPERUSER_ONLY` -- Default: `df-duroxide` -- Validated as a PostgreSQL identifier/name suitable for a schema name -- Documented as install-time configuration, not a runtime migration switch - -The setting should mean: "which provider schema should new extension installs create and which schema should the background worker expect when there is no legacy extension-owned `duroxide` schema." - -## Desired Behavior - -### Fresh install, default GUC - -1. `pg_durable.duroxide_schema` is unset or set to `df-duroxide`. -2. `CREATE EXTENSION pg_durable` creates an extension-owned schema named `df-duroxide`. -3. The background worker verifies `df-duroxide` is extension-owned. -4. The worker runs duroxide-pg migrations in `df-duroxide`. -5. Backend sessions use `df-duroxide._worker_ready` and a provider configured with `schema_name = "df-duroxide"`. - -### Fresh install, custom GUC - -1. Admin sets `pg_durable.duroxide_schema = 'custom_schema'` in `postgresql.conf` and restarts PostgreSQL. -2. `CREATE EXTENSION pg_durable` creates an extension-owned schema named `custom_schema`. -3. The worker and backend sessions use `custom_schema`. - -### Existing install using `duroxide`, new binary deployed, GUC unset/default - -1. The database already has `pg_durable` installed and owns schema `duroxide`. -2. The new binary default is `df-duroxide`. -3. The worker detects the extension-owned `duroxide` schema and keeps using it. -4. No provider state is moved. -5. Existing instances continue to run and monitoring APIs continue to work. - -This is the most important backward-compatibility path. - -### Existing install using `duroxide`, admin changes GUC to `df-duroxide` or custom value without dropping extension - -Current requested behavior: do not delete or migrate the old schema. The extension still has the schema it already owns, while the worker is configured to wait for the new schema name to exist and be extension-owned. Functions will not make progress until the extension is dropped and recreated with the new setting. +- A fresh `CREATE EXTENSION pg_durable` under the new SQL creates and uses `_duroxide`. +- A `.so` upgrade that arrives **without** `ALTER EXTENSION pg_durable UPDATE` must continue to operate against the legacy `duroxide` schema (see "Selection algorithm" below). -This mirrors the existing operational hazard for `pg_durable.database`: changing the GUC after extension creation can leave the worker watching a database/schema state that does not match the existing extension installation. +There is **no in-place migration path** from `duroxide` to `_duroxide` for an existing cluster that wants to adopt the new name while preserving engine state. The only supported "adopt the new name" path is `DROP EXTENSION pg_durable CASCADE` followed by `CREATE EXTENSION pg_durable`, which is a destructive reset of durable engine state. This is acknowledged as a deliberate non-goal of this work. -This requirement has one tension with the previous compatibility path: if the new binary always falls back to extension-owned `duroxide`, then changing the GUC would not intentionally stall the worker. We need a crisp rule to distinguish "legacy default compatibility" from "admin explicitly changed the schema setting." See Open Questions. +## Design Overview -### Drop and recreate after changing GUC +Rather than a GUC, the selected provider schema is exposed by a small extension-owned SQL function: -1. Admin sets `pg_durable.duroxide_schema` to the desired schema and restarts. -2. Admin runs `DROP EXTENSION pg_durable CASCADE`. -3. The extension-owned provider schema and provider state are dropped by PostgreSQL cascade. -4. Admin runs `CREATE EXTENSION pg_durable`. -5. The new extension creates the configured schema and starts with fresh provider state. - -This is a destructive reset of durable engine state, not a migration. - -### Upgrade script path - -`ALTER EXTENSION pg_durable UPDATE` should not rename `duroxide` or create `df-duroxide` for existing installations. The upgrade path should preserve existing provider state and should leave schema selection to runtime detection/configuration. - -Scenario A schema-equivalence tests must account for the fact that provider schema state is intentionally not compared as part of `df` schema snapshots. Scenario B1 is the critical test: new `.so` against an older compatible schema must still use `duroxide`. - -## Open Questions - -### How do we detect an explicit schema GUC change? - -Postmaster GUC access normally gives the effective value, not whether it came from the compiled default or from a config file. If the compiled default changes from `duroxide` to `df-duroxide`, an existing installation with no explicit setting will also observe `df-duroxide`. +```sql +CREATE FUNCTION df.duroxide_schema() RETURNS TEXT + LANGUAGE SQL IMMUTABLE PARALLEL SAFE + AS $$ SELECT '_duroxide'::TEXT $$; +``` -That means these two cases look identical unless we add another signal: +Both the install SQL and any future upgrade scripts are responsible for defining this function with the correct value for the lifecycle path being taken: -1. Existing install, admin did nothing, new binary default is now `df-duroxide`. -2. Existing install, admin explicitly set `pg_durable.duroxide_schema = 'df-duroxide'` without dropping/recreating. +- The **fresh install** SQL (the new version's primary install script) defines the function to return `'_duroxide'`. +- The **upgrade script** `pg_durable--0.2.2--.sql` defines the function to return `'duroxide'`. This pins existing clusters to their already-created legacy schema deterministically, regardless of any other heuristics. -The requested behavior wants case 1 to keep using `duroxide`, but case 2 to wait for `df-duroxide`. We need a way to tell them apart. +The background worker and backend sessions read the value once at startup (or whenever they need it) and use it everywhere the provider schema is referenced. -Possible approaches: +### Why a function instead of a table? -- Use PostgreSQL GUC source inspection if pgrx exposes enough information or if we can safely call the relevant PostgreSQL APIs. Treat `PGC_S_DEFAULT` as compatibility mode and non-default config sources as explicit admin intent. -- Avoid trying to detect explicitness. Rule: an existing extension-owned `duroxide` schema always wins. This is simpler and safer for upgrades, but changing the GUC alone would not stall/move an old install. -- Persist the selected provider schema in `df` metadata during `CREATE EXTENSION`. This would make runtime behavior explicit after install, but it requires new DDL and does not help already-shipped installs unless absence of metadata means legacy `duroxide`. -- Create both a GUC and a SQL helper that records the selected schema at install time. This is probably overkill unless PostgreSQL GUC source inspection is not viable. +- Mirrors the existing pattern of [`df.target_database()`](../src/lib.rs) — a parameterless function used to expose install-time configuration to validation SQL and to Rust code. +- No row management, no `CHECK` constraints to enforce a single row, no `UPDATE` ergonomics. +- The value is baked into an extension-owned object, which makes it tamper-resistant by default (non-superusers cannot `CREATE OR REPLACE` it). +- Changing the value across versions is a straightforward `CREATE OR REPLACE FUNCTION` in the relevant upgrade script. -Recommendation: prefer explicit metadata if we want deterministic behavior independent of GUC-source quirks. Add a small `df._provider_config` or similar table with `duroxide_schema TEXT NOT NULL`, populated by install SQL from the GUC value. For old installs without the table/row, fallback to extension-owned `duroxide`. +### Selection algorithm (BGW + backend) -### Can `CREATE SCHEMA` use a GUC-derived dynamic name in extension SQL? +At runtime the selected schema is computed once per connection / once at BGW startup: -Static extension SQL currently uses literal `CREATE SCHEMA duroxide;`. A configurable schema name probably requires a `DO` block that reads `current_setting('pg_durable.duroxide_schema')`, validates it, executes `CREATE SCHEMA %I`, and then runs `ALTER EXTENSION pg_durable ADD SCHEMA %I` if dynamic schema creation is not automatically registered as an extension member. +1. Try to call `df.duroxide_schema()`. If it returns a non-empty value, use that value. +2. If the function does not exist (PostgreSQL error code `42883`, `undefined_function`), fall back to `'duroxide'`. -This needs a prototype. The cheap check is to package/install locally and inspect `pg_depend` for the dynamically created schema. +Rule 2 is the **only** fallback, and exists strictly for the documented operational reality that customers may receive a new `.so` through a maintenance update without running `ALTER EXTENSION pg_durable UPDATE`. In that case: -### Is `df-duroxide` an acceptable PostgreSQL schema name? +- The cluster is still at the old extension version, so the helper function does not yet exist. +- The pre-existing extension-owned `duroxide` schema is the only possible provider schema. +- Falling back to `'duroxide'` is unambiguous and safe. -Yes if quoted: `"df-duroxide"`. It is not a bare identifier because of the hyphen. All SQL that references it must use identifier quoting. Rust/provider config can pass the raw name, assuming duroxide-pg quotes identifiers correctly internally. pg_durable's own dynamic SQL must use `quote_ident()` or equivalent. +The fallback is self-deleting: as soon as the operator runs `ALTER EXTENSION pg_durable UPDATE`, the function is defined (by the upgrade script) to return `'duroxide'`, and selection step 1 wins on every subsequent startup. -This also affects test scripts and readiness probes: direct SQL must refer to `"df-duroxide"._worker_ready`, or better use formatted SQL with `%I`. +No GUC source inspection, no `pg_depend` scan, no metadata-vs-GUC priority puzzle. -### Should we choose `df_duroxide` instead? +## Compatibility Matrix -`df-duroxide` clearly signals an internal implementation schema and avoids ordinary bare-identifier collisions, but it increases quoting requirements and test churn. `df_duroxide` is simpler operationally. The current requested default is `df-duroxide`; keep it unless we decide the quoting burden is not worth it. +| Scenario | Selection outcome | Provider schema actually used | +|---|---|---| +| Fresh `CREATE EXTENSION` on new version | Step 1: function returns `'_duroxide'` | `_duroxide` | +| Existing v0.2.2 cluster, new `.so` deployed, **no** `ALTER EXTENSION UPDATE` | Step 2: function missing, fallback | `duroxide` | +| Existing v0.2.2 cluster, new `.so` deployed, `ALTER EXTENSION UPDATE` run | Step 1: upgrade script defined function to return `'duroxide'` | `duroxide` | +| Future fresh install on v0.2.4+ where default changes again | Step 1: install script defines function to return the new value | New value | +| Operator manually drops `_duroxide` schema on a fresh install | Worker readiness check fails (extension-owned schema missing) | N/A — operator error, loud failure | ## Implementation Plan ### Phase 1: Schema-name abstraction -- Replace the hardcoded `DUROXIDE_SCHEMA` constant with functions that return the selected provider schema. -- Add helpers for quoted identifier rendering in SQL snippets that must mention the schema directly. -- Update `backend_provider_config()` and `worker_provider_config()` to use the selected schema. -- Update debug/status messages to display the selected schema. - -### Phase 2: Install-time schema creation +- Replace the hardcoded `DUROXIDE_SCHEMA` constant in `src/types.rs` with a runtime-resolved value cached at BGW startup and per backend session. +- Introduce a small helper, e.g. `resolve_duroxide_schema(conn) -> String`, implementing the selection algorithm (call function, catch `42883`, fall back to `"duroxide"`). +- Update `backend_provider_config()` and `worker_provider_config()` to consume the resolved value. +- Update debug/log messages to display the resolved schema. -- Add `pg_durable.duroxide_schema` GUC in `src/lib.rs`. -- Replace literal `CREATE SCHEMA duroxide;` with dynamic schema creation using the configured name. -- Ensure the created schema is an extension member. -- Preserve the no-adoption rule: if the target schema already exists, `CREATE EXTENSION` must fail. -- Decide whether to persist the chosen schema in `df` metadata. +### Phase 2: Install SQL changes -### Phase 3: Runtime schema selection +- Define `df.duroxide_schema()` in the new version's install SQL, returning `'_duroxide'`. +- Replace the literal `CREATE SCHEMA duroxide;` with `CREATE SCHEMA _duroxide;` (still **without** `IF NOT EXISTS`, preserving the no-adoption rule). +- Both objects are extension members by virtue of being declared inside the extension install SQL. +- No additional install-time validation is needed: a pre-existing `_duroxide` schema makes `CREATE SCHEMA` fail, which fails `CREATE EXTENSION` — the same protection the current literal `duroxide` enjoys. -Recommended selection algorithm if metadata is added: +### Phase 3: Upgrade script -1. If `df._provider_config.duroxide_schema` exists and has a value, use it. -2. Else if the extension owns `duroxide`, use `duroxide` for legacy compatibility. -3. Else use the current GUC value. - -If metadata is not added, the selection algorithm must explicitly resolve the open question about GUC explicitness. +- `sql/pg_durable--0.2.2--.sql` defines `df.duroxide_schema()` returning `'duroxide'`. +- The script must **not** create `_duroxide`, must **not** rename `duroxide`, and must **not** touch existing provider state. +- The script is the contract that says "this cluster is staying on `duroxide` forever." ### Phase 4: Worker ownership and migration flow -- Generalize `check_duroxide_schema_owned()` to accept the selected schema name. -- Generalize `has_extension_owned_duroxide_objects()` and `release_extension_owned_duroxide_objects()` to filter on the selected schema. -- Generalize `write_worker_ready()` to create/grant/upsert in the selected schema. -- Ensure all dynamic SQL uses identifier quoting. +- Generalize `check_duroxide_schema_owned()` to accept the resolved schema name. +- Generalize `has_extension_owned_duroxide_objects()` and `release_extension_owned_duroxide_objects()` to filter on the resolved schema. +- Generalize `write_worker_ready()` to write to `._worker_ready`. - Keep `MigrationPolicy::ApplyAll` in the worker and `VerifyOnly` in backend sessions. +- Because `_duroxide` is a bare identifier, no special quoting is required for the new default. The schema-name string can be interpolated into SQL via the same code paths used today, but it is still good practice to use `quote_ident` for any dynamic-schema SQL to remain robust against future name choices. ### Phase 5: Backend readiness checks -- Generalize `is_worker_ready()` in `src/client.rs` to check the selected schema's `_worker_ready` table. -- Avoid directly querying a possibly missing table; retain the current catalog-existence pre-check. -- Ensure non-superuser backend sessions can read readiness state in quoted/custom schemas. +- Generalize `is_worker_ready()` in `src/client.rs` to check `._worker_ready`. +- Retain the catalog-existence pre-check before querying the readiness table so missing-schema cases produce a clear "not ready" signal rather than a SQL error. +- Ensure non-superuser backend sessions have `USAGE` on the resolved schema and `SELECT` on `_worker_ready` (existing grants on the literal `duroxide` schema move to the new name). ### Phase 6: Tests and scripts Add or update checks for: -- Fresh install uses `df-duroxide` by default. -- Fresh install with custom `pg_durable.duroxide_schema` uses the custom schema. -- Pre-existing schema with the configured name blocks `CREATE EXTENSION`. -- New binary against old schema uses existing `duroxide` and can run a workflow. -- Changing the GUC without drop/recreate has the decided behavior and emits clear logs/errors. -- Drop/recreate after changing the GUC creates the new schema and no old provider state remains unless separately preserved by the admin outside the extension lifecycle. +- Fresh install creates `_duroxide` and `df.duroxide_schema()` returns `'_duroxide'`. +- Pre-existing `_duroxide` schema blocks `CREATE EXTENSION`. +- New `.so` against an unmigrated v0.2.2 schema: + - `df.duroxide_schema()` does not exist. + - BGW resolves to `'duroxide'` via fallback. + - Existing workflows continue to run. +- After `ALTER EXTENSION UPDATE` on a v0.2.2 cluster: + - `df.duroxide_schema()` exists and returns `'duroxide'`. + - Selection step 1 is taken on subsequent restarts. + - Provider state is unchanged. +- E2E setup SQL and helper scripts no longer hardcode the string `duroxide`. Where direct SQL must reference the schema, fetch the name via `SELECT df.duroxide_schema()` with the same `42883` fallback. Touch points likely include: @@ -222,7 +176,24 @@ Update: - `docs/upgrade-testing.md` - `USER_GUIDE.md` connection/troubleshooting sections if readiness probes or drop/recreate guidance changes -Document clearly that changing `pg_durable.duroxide_schema` does not migrate existing durable state. +Document clearly that: + +- The provider schema is an implementation detail, not a configurable setting. +- Existing `duroxide`-based installs are not migrated to `_duroxide`; they keep using `duroxide` indefinitely. +- The only way to adopt `_duroxide` on an existing cluster is `DROP EXTENSION pg_durable CASCADE` followed by `CREATE EXTENSION pg_durable`, which destroys durable engine state. + +## Security Notes + +- `df.duroxide_schema()` is created by the extension install / upgrade scripts and is therefore owned by the extension owner (typically a superuser). Non-superusers cannot `CREATE OR REPLACE` it. +- The function is `IMMUTABLE PARALLEL SAFE` and contains a literal string; no SQL injection surface. +- Falling back to `'duroxide'` on `42883` is safe because that fallback only fires when the new helper function is genuinely absent, which can only happen on a pre-upgrade-script extension version. At that version the only possible extension-owned provider schema is `duroxide`. +- The BGW must still verify extension ownership of the resolved schema before applying duroxide migrations. This invariant is unchanged. + +## Open Questions + +1. **Cache lifetime in backend sessions.** Resolving the schema per connection is cheap (one SQL call). Caching it for the process lifetime is fine because the value cannot change without an extension upgrade, which in turn requires a session reconnect to see new function definitions reliably. Recommend: resolve once on first use per session, cache for session lifetime. +2. **Whether to expose `df.duroxide_schema()` as `SECURITY DEFINER` or rely on default invoker rights.** Default invoker rights are sufficient since the function only returns a literal. Recommend: leave as default to minimize surface area. +3. **Whether to also remove the `DUROXIDE_SCHEMA` constant from any Rust test fixtures.** Yes, but only where tests run against a real PostgreSQL backend. Pure unit tests that never touch the schema can keep using a constant for clarity. ## Validation Strategy @@ -235,23 +206,25 @@ cargo build --features pg17 ./scripts/test-upgrade.sh --verbose ``` -If time is short, prioritize a targeted E2E or SQL smoke test that verifies a default fresh install creates `df-duroxide`, and the upgrade B1 path still works with an existing `duroxide` schema. +If time is short, prioritize: + +1. A targeted E2E that verifies a fresh install creates `_duroxide` and that `df.duroxide_schema()` returns `'_duroxide'`. +2. An upgrade path (B1) test that verifies the new `.so` against a v0.2.2 schema (with no `ALTER EXTENSION UPDATE` run) continues to use `duroxide` via the `42883` fallback. +3. An upgrade-then-restart test that verifies, after `ALTER EXTENSION UPDATE`, selection step 1 is taken and the cluster still uses `duroxide`. ## Issue Update Draft Proposed summary to add to the GitHub issue: -> We should make the duroxide provider schema configurable for new installs, with a new default of `df-duroxide`, but preserve existing installations that already have extension-owned `duroxide` provider state. The implementation needs to avoid automatic rename/copy/drop of provider state. Fresh installs should create the configured schema as an extension-owned object and the BGW should only run `ApplyAll` after verifying extension ownership. Existing installs should continue using their current extension-owned `duroxide` schema unless the admin intentionally drops and recreates the extension under a new setting. The main design question is how to distinguish an unchanged legacy install from an admin explicitly changing the new GUC without drop/recreate; options are GUC-source inspection, always letting legacy `duroxide` win, or persisting the selected provider schema in `df` metadata at install time. +> We will rename the duroxide provider schema for new pg_durable installs to `_duroxide` (bare identifier, no quoting required, leading underscore signals internal/private). No GUC will be added — the schema name is an implementation detail of pg_durable, not an operator-facing setting. Existing installs that already own a `duroxide` schema will continue to use it indefinitely; there is no in-place migration to `_duroxide`. The selected schema is exposed by a small extension-owned function `df.duroxide_schema()`: the fresh-install SQL defines it to return `'_duroxide'`, and the `0.2.2 -> ` upgrade script defines it to return `'duroxide'`. The BGW and backend sessions resolve the schema by calling this function with a single fallback: if the function does not exist (error 42883), assume legacy `'duroxide'`. This fallback covers the case where a new `.so` is deployed without `ALTER EXTENSION UPDATE` being run, and is self-deleting once the upgrade script has run. ## Current Recommendation -Do not implement the schema rename as a simple constant change. The safe implementation needs a selected-schema abstraction and a persisted install-time provider schema record, or an equally precise rule for GUC explicitness. - -The metadata approach is the most deterministic: - -- New installs record and use the configured schema. -- Old installs without metadata keep using `duroxide`. -- Changing the GUC after install does not mutate the recorded schema and therefore does not migrate state. -- Drop/recreate is the supported way to adopt a different provider schema. +Implement as described above. This design: -This differs slightly from the requested "worker waits for the new schema if GUC changes" behavior, but it avoids ambiguity and matches PostgreSQL extension practice: install-time state should define which schema belongs to that extension instance. \ No newline at end of file +- Removes all GUC-related ambiguity from the original proposal. +- Has a single, well-defined fallback path tied to a concrete PostgreSQL error code rather than to fuzzy heuristics about admin intent or `pg_depend` state. +- Keeps the security invariants (no schema adoption, BGW verifies extension ownership) intact. +- Avoids identifier-quoting churn by choosing a bare-identifier default (`_duroxide`). +- Localizes "which schema does this version use" into the version-specific install and upgrade SQL, where version-specific decisions naturally belong. +- Explicitly declines to offer in-place schema migration, making the operational contract clear to operators: keep state on `duroxide`, or destroy state and adopt `_duroxide`. From 03a93dda787064ce6549950f6859abeccc22c3cb Mon Sep 17 00:00:00 2001 From: Pino de Candia Date: Thu, 4 Jun 2026 15:04:39 +0000 Subject: [PATCH 3/3] Implement duroxide schema rename --- Cargo.lock | 2 +- Cargo.toml | 2 +- docs/upgrade-testing.md | 16 +- examples/invoice-approval/sql/11_reset.sql | 25 +- expected/00_init.out | 18 +- pg_durable.control | 10 +- scripts/test-e2e-local.sh | 10 +- scripts/test-upgrade.sh | 54 +- sql/00_init.sql | 18 +- sql/pg_durable--0.2.2--0.2.3.sql | 21 + sql/pg_durable--0.2.2.sql | 1047 +++++++++++++++++ src/client.rs | 25 +- src/dsl.rs | 4 +- src/explain.rs | 5 +- src/lib.rs | 44 +- src/monitoring.rs | 17 +- src/types.rs | 102 +- src/worker.rs | 131 ++- tests/e2e/sql/00_setup_playground.sql | 8 +- tests/e2e/sql/12_extension_lifecycle.sql | 19 +- ...46_connection_limit_startup_validation.sql | 10 +- 21 files changed, 1445 insertions(+), 143 deletions(-) create mode 100644 sql/pg_durable--0.2.2--0.2.3.sql create mode 100644 sql/pg_durable--0.2.2.sql diff --git a/Cargo.lock b/Cargo.lock index ca1c17a..55e3e3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,7 +1892,7 @@ dependencies = [ [[package]] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" dependencies = [ "bigdecimal", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 6bf0644..45a400a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_durable" -version = "0.2.2" +version = "0.2.3" edition = "2021" license = "PostgreSQL" repository = "https://github.com/microsoft/pg_durable" diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index c2cb307..51059ed 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -147,7 +147,8 @@ Returns the version that was last installed/updated. Compare against known thres ### Test infrastructure -- `sql/pg_durable--0.1.1.sql` — first install SQL for the current major version (only the first version per major needs a fixture; intermediate versions are reconstructed by chaining upgrade scripts) +- `sql/pg_durable--0.1.1.sql` — first install SQL for the current major version +- `sql/pg_durable--0.2.2.sql` — install SQL fixture at the start of the current provider compatibility line (`PROVIDER_COMPAT_START_VERSION`). The harness reconstructs a target version from the **highest install fixture at or below it** (see `base_fixture_for_version` in `scripts/test-upgrade.sh`), then chains `ALTER EXTENSION UPDATE`. A fixture is required at the provider-compat-start boundary so reconstruction never has to chain across it — the pre-0.2.2 install SQL embeds a hand-written `duroxide` schema that is incompatible with the duroxide-pg provider's migration tracking (`_duroxide_migrations`). - `sql/pg_durable--0.1.1--0.2.0.sql` — upgrade script (initially empty, populated by subsequent PRs) - `scripts/test-upgrade.sh` — runs Scenarios A, B1, and B2. The `PROVIDER_COMPAT_START_VERSION` environment variable/default controls the first version in the current provider compatibility line. Versions before that boundary are excluded from B1 and cannot be used as A/B2 upgrade sources. - CI step in `.github/workflows/ci.yml` @@ -171,7 +172,7 @@ Each PR that changes the extension schema or modifies SQL queries in Rust code s **Minor release** (e.g. 0.2.0 → 0.3.0): 1. Create empty `sql/pg_durable----.sql` upgrade script 2. Bump `Cargo.toml` version to `` -3. If this release starts a new provider compatibility line, update the `PROVIDER_COMPAT_START_VERSION` default in `scripts/test-upgrade.sh` and document the boundary under "Version-Specific Changes". Downstream forks can instead override `PROVIDER_COMPAT_START_VERSION` in CI to keep the script shared. +3. If this release starts a new provider compatibility line, update the `PROVIDER_COMPAT_START_VERSION` default in `scripts/test-upgrade.sh`, check in an install SQL fixture (`sql/pg_durable--.sql`) at that boundary so reconstruction never chains across it, and document the boundary under "Version-Specific Changes". Downstream forks can instead override `PROVIDER_COMPAT_START_VERSION` in CI to keep the script shared. If this is the first minor after a new major (e.g. 1.0.0 → 1.1.0), also: @@ -202,6 +203,17 @@ gate, so they never need to be added to the exclude list. Each schema-changing PR should add a section here documenting what changed, what the upgrade script handles, and any backward compatibility considerations. +### v0.2.2 → v0.2.3 + +#### Rename duroxide provider schema to `_duroxide` for fresh installs +- **DDL change (df schema):** Adds `df.duroxide_schema()`, an `IMMUTABLE`/`PARALLEL SAFE` SQL function that returns the name of the schema holding the duroxide provider objects. Fresh 0.2.3 installs create the function (in `src/lib.rs`) returning `'_duroxide'`; the upgrade script `sql/pg_durable--0.2.2--0.2.3.sql` creates the same function returning `'duroxide'` so pre-existing installs keep using the legacy schema. Both bodies set `search_path = pg_catalog, pg_temp` to satisfy the pgspot gate. +- **DDL change (provider schema):** Fresh installs now run `CREATE SCHEMA _duroxide` (was `CREATE SCHEMA duroxide`). The upgrade script does **not** rename, drop, or move the existing `duroxide` schema — renaming an in-use provider schema would orphan the BGW's durable state. Upgraded installs therefore continue to use `duroxide`. +- **Runtime selection:** Backend sessions resolve the provider schema once per session via `backend_duroxide_schema()` (cached in a `OnceLock`); the BGW resolves it once per epoch via `resolve_duroxide_schema_pool()` (re-resolved after every CREATE EXTENSION so drop+recreate with a different schema version is handled). Both call `df.duroxide_schema()` and fall back to `'duroxide'` (`LEGACY_DUROXIDE_SCHEMA`) when the helper is absent — i.e. a new `.so` deployed against a ≤0.2.2 schema that has not run `ALTER EXTENSION pg_durable UPDATE`. Presence is detected via a `pg_proc` catalog lookup rather than catching `42883`, so the surrounding (sub)transaction is never aborted. +- **Scenario A considerations:** The Scenario A equivalence contract covers the `df` schema only and compares function signatures, not bodies. `df.duroxide_schema()` has an identical signature on the fresh-install and upgrade paths (only the returned literal differs), so Scenario A passes. The provider schema name (`_duroxide` vs `duroxide`) is intentionally excluded from the snapshot diff, as it was for v0.2.0. +- **Scenario B1 considerations:** The new `.so` works against all previous schemas: when `df.duroxide_schema()` does not exist (≤0.2.2 schemas without the upgrade applied) the runtime falls back to `'duroxide'`, which is exactly the schema those installs use. +- **Scenario B2 considerations:** No data migration. The existing `duroxide` schema and its tables are untouched; the upgrade only adds one `df` function. +- **Test fixture:** A new `sql/pg_durable--0.2.2.sql` install fixture is checked in at the provider-compat-start boundary. The upgrade harness now reconstructs 0.2.2 directly from it (an empty `duroxide` schema that the BGW populates via the duroxide-pg `ApplyAll` migration) instead of chaining from the pre-provider `pg_durable--0.1.1.sql` fixture, whose embedded hand-written `duroxide` schema lacks the `_duroxide_migrations` tracking table and is therefore incompatible with the duroxide-pg provider. + ### v0.2.1 → v0.2.2 #### #162 quote_ident-wrapped `current_user::regrole` (fixes #161) diff --git a/examples/invoice-approval/sql/11_reset.sql b/examples/invoice-approval/sql/11_reset.sql index 991d662..9e1b5c9 100644 --- a/examples/invoice-approval/sql/11_reset.sql +++ b/examples/invoice-approval/sql/11_reset.sql @@ -29,17 +29,20 @@ END $$; -- 2. Clean up df extension tables (instances + nodes) TRUNCATE TABLE df.nodes, df.instances; --- 3. Clean up duroxide engine state -TRUNCATE TABLE - duroxide.history, - duroxide.executions, - duroxide.instances, - duroxide.instance_locks, - duroxide.orchestrator_queue, - duroxide.worker_queue, - duroxide.kv_delta, - duroxide.kv_store, - duroxide.sessions; +-- 3. Clean up duroxide engine state. +-- The provider schema is '_duroxide' on fresh installs and 'duroxide' on +-- installs upgraded from <= 0.2.2; resolve it via df.duroxide_schema(). +DO $$ +DECLARE + dx_schema TEXT := df.duroxide_schema(); +BEGIN + EXECUTE format( + 'TRUNCATE TABLE %1$I.history, %1$I.executions, %1$I.instances, ' + '%1$I.instance_locks, %1$I.orchestrator_queue, %1$I.worker_queue, ' + '%1$I.kv_delta, %1$I.kv_store, %1$I.sessions', + dx_schema + ); +END $$; -- 4. Reset demo tables TRUNCATE TABLE demo.invoice_audit, demo.invoices RESTART IDENTITY; diff --git a/expected/00_init.out b/expected/00_init.out index 2c1b1ef..d7980f4 100644 --- a/expected/00_init.out +++ b/expected/00_init.out @@ -18,24 +18,30 @@ SELECT nspname FROM pg_namespace WHERE nspname = 'df'; (1 row) -- Wait for the background worker to apply duroxide migrations. --- The BGW populates the duroxide schema asynchronously after CREATE EXTENSION; --- subsequent tests that call df.start() will fail if this isn't done first. --- We poll duroxide._worker_ready directly (is_worker_ready is internal Rust, --- not exposed as a SQL function). +-- The BGW populates the duroxide provider schema asynchronously after CREATE +-- EXTENSION; subsequent tests that call df.start() will fail if this isn't done +-- first. We poll ._worker_ready directly (is_worker_ready is internal +-- Rust, not exposed as a SQL function). The provider schema name is resolved +-- via df.duroxide_schema() ('_duroxide' on fresh installs, 'duroxide' on +-- installs upgraded from <= 0.2.2). DO $$ DECLARE attempts INT := 0; table_exists BOOLEAN; ready BOOLEAN; + dx_schema TEXT := df.duroxide_schema(); BEGIN LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO ready; ELSE ready := FALSE; END IF; diff --git a/pg_durable.control b/pg_durable.control index 09a87f1..3640796 100644 --- a/pg_durable.control +++ b/pg_durable.control @@ -8,9 +8,11 @@ relocatable = false superuser = true trusted = false # Note: 'schema' is intentionally omitted. This extension manages two schemas -# (df and duroxide), and PostgreSQL's control file only supports a single schema -# directive. The df schema is created by pgrx (#[pg_schema]), and the duroxide -# schema is created by sql/duroxide_install.sql. relocatable = false prevents -# schema relocation attacks. +# (df and the duroxide provider schema), and PostgreSQL's control file only +# supports a single schema directive. The df schema is created by pgrx +# (#[pg_schema]); the duroxide provider schema is created by the +# create_duroxide_schema extension_sql block in src/lib.rs (named '_duroxide' +# on fresh installs, or the legacy 'duroxide' on installs upgraded from +# <= 0.2.2). relocatable = false prevents schema relocation attacks. diff --git a/scripts/test-e2e-local.sh b/scripts/test-e2e-local.sh index c6a299b..6b51b0d 100755 --- a/scripts/test-e2e-local.sh +++ b/scripts/test-e2e-local.sh @@ -389,9 +389,17 @@ SQL wait_for_worker_ready() { local ready="f" local attempts=0 + local dx_schema="" while [ "$attempts" -lt 120 ]; do - ready=$("$PSQL" -h localhost -p "$PG_PORT" -U "$PG_USER" -d "$PG_DB" -Atqc "SELECT CASE WHEN EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'duroxide' AND table_name = '_worker_ready') THEN EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) ELSE FALSE END;" 2>/dev/null | tr -d ' \n' || true) + # Resolve the duroxide provider schema via df.duroxide_schema(). + # Falls back to the legacy 'duroxide' schema when the helper is absent + # (extension not yet created, or installs predating the helper). + dx_schema=$("$PSQL" -h localhost -p "$PG_PORT" -U "$PG_USER" -d "$PG_DB" -Atqc "SELECT df.duroxide_schema();" 2>/dev/null | tr -d ' \n' || true) + if [ -z "$dx_schema" ]; then + dx_schema="duroxide" + fi + ready=$("$PSQL" -h localhost -p "$PG_PORT" -U "$PG_USER" -d "$PG_DB" -Atqc "SELECT CASE WHEN EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '$dx_schema' AND table_name = '_worker_ready') THEN EXISTS(SELECT 1 FROM $dx_schema._worker_ready WHERE schema_version >= 1) ELSE FALSE END;" 2>/dev/null | tr -d ' \n' || true) if [ "$ready" = "t" ]; then return fi diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 0523f91..5b7ff80 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -117,6 +117,44 @@ first_fixture_for_major() { fi } +# Selects the best checked-in install fixture to use as the base for +# reconstructing a target version: the highest install fixture (same major) +# that is <= the target version. This lets the harness install a version +# directly from its own fixture (e.g. 0.2.2) instead of always chaining from +# the major's first fixture (0.1.1), which would drag in the pre-provider +# legacy duroxide schema. Versions before PROVIDER_COMPAT_START_VERSION are +# never selected as a base for targets within the provider compatibility line. +base_fixture_for_version() { + local target_version="$1" + local target_major + target_major=$(echo "$target_version" | cut -d. -f1) + + local best="" + for f in "$PROJECT_DIR"/sql/pg_durable--*.sql; do + local fname + fname=$(basename "$f") + if [[ "$fname" =~ ^pg_durable--([0-9]+\.[0-9]+\.[0-9]+)\.sql$ ]]; then + local candidate="${BASH_REMATCH[1]}" + local candidate_major + candidate_major=$(echo "$candidate" | cut -d. -f1) + [ "$candidate_major" = "$target_major" ] || continue + # Skip fixtures newer than the target. + version_ge "$target_version" "$candidate" || continue + # When the target is within the provider compatibility line, never + # base it on a fixture from before that line started. + if version_ge "$target_version" "$PROVIDER_COMPAT_START_VERSION" \ + && ! version_ge "$candidate" "$PROVIDER_COMPAT_START_VERSION"; then + continue + fi + if [ -z "$best" ] || version_ge "$candidate" "$best"; then + best="$candidate" + fi + fi + done + + printf '%s' "$best" +} + # Find the previous version by looking for upgrade SQL scripts PREV_VERSION=$(for f in "$PROJECT_DIR"/sql/pg_durable--*--"${CURRENT_VERSION}".sql; do fname=$(basename "$f") @@ -396,19 +434,21 @@ wait_for_ready() { return 1 } -# Creates the extension at a specific version by installing from that major's -# first checked-in fixture and chaining ALTER EXTENSION UPDATE if needed. +# Creates the extension at a specific version by installing from the closest +# checked-in fixture at or below the target (see base_fixture_for_version) and +# chaining ALTER EXTENSION UPDATE if needed. This keeps reconstruction within +# the provider compatibility line and avoids chaining across the pre-0.2.2 +# boundary, where the legacy in-extension duroxide schema is incompatible with +# the duroxide-pg provider. create_extension_at_version() { local target_version="$1" - local target_major local base_version - target_major=$(echo "$target_version" | cut -d. -f1) - base_version=$(first_fixture_for_major "$target_major") + base_version=$(base_fixture_for_version "$target_version") if [ -z "$base_version" ]; then - echo "No install SQL fixture found for major version $target_major" - echo "Expected: sql/pg_durable--.sql" + echo "No install SQL fixture found at or below version $target_version" + echo "Expected: sql/pg_durable--.sql" return 1 fi diff --git a/sql/00_init.sql b/sql/00_init.sql index cdf21d7..379f21c 100644 --- a/sql/00_init.sql +++ b/sql/00_init.sql @@ -12,24 +12,30 @@ SELECT extname FROM pg_extension WHERE extname = 'pg_durable'; SELECT nspname FROM pg_namespace WHERE nspname = 'df'; -- Wait for the background worker to apply duroxide migrations. --- The BGW populates the duroxide schema asynchronously after CREATE EXTENSION; --- subsequent tests that call df.start() will fail if this isn't done first. --- We poll duroxide._worker_ready directly (is_worker_ready is internal Rust, --- not exposed as a SQL function). +-- The BGW populates the duroxide provider schema asynchronously after CREATE +-- EXTENSION; subsequent tests that call df.start() will fail if this isn't done +-- first. We poll ._worker_ready directly (is_worker_ready is internal +-- Rust, not exposed as a SQL function). The provider schema name is resolved +-- via df.duroxide_schema() ('_duroxide' on fresh installs, 'duroxide' on +-- installs upgraded from <= 0.2.2). DO $$ DECLARE attempts INT := 0; table_exists BOOLEAN; ready BOOLEAN; + dx_schema TEXT := df.duroxide_schema(); BEGIN LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO ready; ELSE ready := FALSE; END IF; diff --git a/sql/pg_durable--0.2.2--0.2.3.sql b/sql/pg_durable--0.2.2--0.2.3.sql new file mode 100644 index 0000000..470d4d1 --- /dev/null +++ b/sql/pg_durable--0.2.2--0.2.3.sql @@ -0,0 +1,21 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- pg_durable upgrade: 0.2.2 → 0.2.3 +-- +-- Introduces df.duroxide_schema(), a helper that reports which schema holds the +-- duroxide provider objects for this install. Fresh 0.2.3 installs create the +-- provider objects in the '_duroxide' schema (see lib.rs). Installs upgrading +-- from <= 0.2.2 already have their provider objects in the legacy 'duroxide' +-- schema and must keep using it — renaming an in-use schema would orphan the +-- background worker's durable state. This upgrade therefore defines +-- df.duroxide_schema() to return 'duroxide' for pre-existing installs. +-- +-- Backend sessions and the background worker call df.duroxide_schema() to learn +-- which schema to use, falling back to 'duroxide' when the helper is absent +-- (installs predating it). No schema rename, drop, or data movement occurs. + +CREATE FUNCTION df.duroxide_schema() RETURNS text + LANGUAGE sql IMMUTABLE PARALLEL SAFE + SET search_path = pg_catalog, pg_temp + AS $$ SELECT 'duroxide'::text $$; diff --git a/sql/pg_durable--0.2.2.sql b/sql/pg_durable--0.2.2.sql new file mode 100644 index 0000000..799bb5c --- /dev/null +++ b/sql/pg_durable--0.2.2.sql @@ -0,0 +1,1047 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +/* */ +/* +This file is auto generated by pgrx. + +The ordering of items is not stable, it is driven by a dependency graph. +*/ +/* */ + +/* */ +-- src/lib.rs:146 +CREATE SCHEMA IF NOT EXISTS df; /* pg_durable::df */ +/* */ + +/* */ +-- src/lib.rs:153 +-- requires: +-- df + + +-- Table to store function nodes (SQL steps, THEN chains, etc.) +CREATE TABLE IF NOT EXISTS df.nodes ( + id VARCHAR(8) PRIMARY KEY, + instance_id VARCHAR(8), + node_type TEXT NOT NULL, + query TEXT, + result_name TEXT, + left_node VARCHAR(8), + right_node VARCHAR(8), + status TEXT DEFAULT 'pending', + result JSONB, + error TEXT, + submitted_by REGROLE, + database TEXT, + created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + updated_at TIMESTAMPTZ DEFAULT pg_catalog.now() +); + +COMMENT ON COLUMN df.nodes.submitted_by IS + 'Effective role (current_user) at df.start() time - used for connection authentication and SQL execution'; + +-- Table to store function instances +CREATE TABLE IF NOT EXISTS df.instances ( + id VARCHAR(8) PRIMARY KEY, + label TEXT, + root_node VARCHAR(8) NOT NULL, + status TEXT DEFAULT 'pending', + submitted_by REGROLE NOT NULL, + database TEXT, + created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + updated_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + completed_at TIMESTAMPTZ +); + +COMMENT ON COLUMN df.instances.submitted_by IS + 'Effective role (current_user) at df.start() time - used for connection authentication and SQL execution'; + +-- Index for finding pending instances +CREATE INDEX IF NOT EXISTS idx_instances_status ON df.instances(status); + +-- Index for finding nodes by instance +CREATE INDEX IF NOT EXISTS idx_nodes_instance ON df.nodes(instance_id); + +-- Table to store workflow variables (captured at df.start()) +-- Per-user scoping: each user has their own variable namespace. +CREATE TABLE IF NOT EXISTS df.vars ( + name TEXT NOT NULL, + value TEXT, + owner REGROLE NOT NULL DEFAULT pg_catalog.quote_ident(current_user)::pg_catalog.regrole, + PRIMARY KEY (owner, name) +); + +-- Sentinel table: the background worker writes its epoch_id here after +-- initialising. If the extension is DROP-ed and re-CREATEd between +-- two poll ticks the epoch row disappears, so the worker detects the +-- recreation even though the extension is always "present" in pg_extension. +CREATE TABLE IF NOT EXISTS df._worker_epoch ( + epoch_id UUID PRIMARY KEY, + started_at TIMESTAMPTZ DEFAULT pg_catalog.now(), + last_seen_at TIMESTAMPTZ DEFAULT pg_catalog.now() +); + +ALTER TABLE df.instances + ADD CONSTRAINT instances_id_format_chk + -- Operators (OPERATOR(pg_catalog.)) and functions (e.g. pg_catalog.now) + -- are schema-qualified throughout this install DDL so name resolution never + -- depends on the session search_path -- closing the CVE-2018-1058 vector + -- (a malicious schema shadowing `=`, `~`, etc.). Enforced by the pgspot CI + -- gate (scripts/pgspot-gate.sh). + CHECK (id OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT instances_root_node_format_chk + CHECK (root_node OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT instances_status_chk + CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed', 'cancelled'])) NOT VALID, + -- Supports the composite FK from df.nodes that ties node identity to the instance row. + ADD CONSTRAINT instances_identity_key + UNIQUE (id, submitted_by); + +ALTER TABLE df.nodes + ADD CONSTRAINT nodes_instance_id_present_chk + CHECK (instance_id IS NOT NULL) NOT VALID, + ADD CONSTRAINT nodes_submitted_by_present_chk + CHECK (submitted_by IS NOT NULL) NOT VALID, + ADD CONSTRAINT nodes_id_format_chk + CHECK (id OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_instance_id_format_chk + CHECK (instance_id OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_left_node_format_chk + CHECK (left_node IS NULL OR left_node OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_right_node_format_chk + CHECK (right_node IS NULL OR right_node OPERATOR(pg_catalog.~) '^[0-9a-f]{8}$') NOT VALID, + ADD CONSTRAINT nodes_node_type_chk + CHECK (node_type OPERATOR(pg_catalog.=) ANY (ARRAY['SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL'])) NOT VALID, + ADD CONSTRAINT nodes_result_name_chk + CHECK (result_name IS NULL OR result_name OPERATOR(pg_catalog.~) '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID, + ADD CONSTRAINT nodes_status_chk + CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed'])) NOT VALID, + ADD CONSTRAINT nodes_result_status_chk + CHECK (result IS NULL OR status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed'])) NOT VALID, + ADD CONSTRAINT nodes_structure_chk + CHECK ( + CASE + WHEN node_type OPERATOR(pg_catalog.=) ANY (ARRAY['SQL', 'SLEEP', 'WAIT_SCHEDULE', 'BREAK', 'HTTP', 'SIGNAL']) + THEN left_node IS NULL AND right_node IS NULL AND query IS NOT NULL + WHEN node_type OPERATOR(pg_catalog.=) 'THEN' + THEN left_node IS NOT NULL AND right_node IS NOT NULL AND query IS NULL + WHEN node_type OPERATOR(pg_catalog.=) 'IF' + THEN left_node IS NOT NULL AND right_node IS NOT NULL AND query IS NOT NULL + WHEN node_type OPERATOR(pg_catalog.=) 'LOOP' + THEN left_node IS NOT NULL AND right_node IS NULL + WHEN node_type OPERATOR(pg_catalog.=) 'JOIN' + THEN left_node IS NOT NULL AND right_node IS NOT NULL + WHEN node_type OPERATOR(pg_catalog.=) 'RACE' + THEN left_node IS NOT NULL AND right_node IS NOT NULL AND query IS NULL + ELSE FALSE + END + ) NOT VALID, + ADD CONSTRAINT nodes_instance_node_key + UNIQUE (instance_id, id); + +ALTER TABLE df.nodes + ADD CONSTRAINT nodes_instance_identity_fkey + FOREIGN KEY (instance_id, submitted_by) + REFERENCES df.instances (id, submitted_by) + DEFERRABLE INITIALLY DEFERRED NOT VALID, + ADD CONSTRAINT nodes_left_node_same_instance_fkey + FOREIGN KEY (instance_id, left_node) + REFERENCES df.nodes (instance_id, id) + DEFERRABLE INITIALLY DEFERRED NOT VALID, + ADD CONSTRAINT nodes_right_node_same_instance_fkey + FOREIGN KEY (instance_id, right_node) + REFERENCES df.nodes (instance_id, id) + DEFERRABLE INITIALLY DEFERRED NOT VALID; + +ALTER TABLE df.instances + ADD CONSTRAINT instances_root_node_same_instance_fkey + FOREIGN KEY (id, root_node) + REFERENCES df.nodes (instance_id, id) + DEFERRABLE INITIALLY DEFERRED NOT VALID; +/* */ + +/* */ +-- src/dsl.rs:924 +-- pg_durable::dsl::cancel +CREATE FUNCTION df."cancel"( + "instance_id" TEXT, /* &str */ + "reason" TEXT DEFAULT 'Cancelled by user' /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'cancel_wrapper'; +/* */ + +/* */ +-- src/explain.rs:36 +-- pg_durable::explain::explain +CREATE FUNCTION df."explain"( + "input" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'explain_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:48 +-- pg_durable::dsl::debug_connection +CREATE FUNCTION df."debug_connection"() RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'debug_connection_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:973 +-- pg_durable::dsl::run +CREATE FUNCTION df."run"( + "instance_id" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'run_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:38 +-- pg_durable::dsl::version +CREATE FUNCTION df."version"() RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'version_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:587 +-- pg_durable::dsl::signal +CREATE FUNCTION df."signal"( + "instance_id" TEXT, /* &str */ + "signal_name" TEXT, /* &str */ + "signal_data" TEXT DEFAULT '{}' /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'signal_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:369 +-- pg_durable::dsl::if +CREATE FUNCTION df."if"( + "condition" TEXT, /* &str */ + "then_branch" TEXT, /* &str */ + "else_branch" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'if_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:414 +-- pg_durable::dsl::join +CREATE FUNCTION df."join"( + "a" TEXT, /* &str */ + "b" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'join_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:1011 +-- pg_durable::dsl::wait_for_completion +CREATE FUNCTION df."wait_for_completion"( + "instance_id" TEXT, /* &str */ + "timeout_seconds" INT DEFAULT 30 /* i32 */ +) RETURNS TEXT /* core::result::Result> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'wait_for_completion_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:275 +-- pg_durable::monitoring::metrics +CREATE FUNCTION df."metrics"() RETURNS TABLE ( + "total_instances" bigint, /* i64 */ + "running_instances" bigint, /* i64 */ + "completed_instances" bigint, /* i64 */ + "failed_instances" bigint, /* i64 */ + "total_executions" bigint, /* i64 */ + "total_events" bigint /* i64 */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'metrics_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:147 +-- pg_durable::dsl::getvar +CREATE FUNCTION df."getvar"( + "name" TEXT /* &str */ +) RETURNS TEXT /* core::option::Option */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'getvar_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:124 +-- pg_durable::dsl::setvar +CREATE FUNCTION df."setvar"( + "name" TEXT, /* &str */ + "value" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'setvar_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:323 +-- pg_durable::monitoring::instance_nodes +CREATE FUNCTION df."instance_nodes"( + "instance_id_param" TEXT, /* &str */ + "last_n_executions" INT DEFAULT 5 /* i32 */ +) RETURNS TABLE ( + "execution_id" bigint, /* i64 */ + "node_id" TEXT, /* alloc::string::String */ + "node_type" TEXT, /* alloc::string::String */ + "query" TEXT, /* core::option::Option */ + "result_name" TEXT, /* core::option::Option */ + "left_node" TEXT, /* core::option::Option */ + "right_node" TEXT, /* core::option::Option */ + "status" TEXT, /* core::option::Option */ + "result" TEXT, /* core::option::Option */ + "updated_at" timestamp with time zone /* core::option::Option */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'instance_nodes_wrapper'; +/* */ + +/* */ +-- src/types.rs:129 +-- pg_durable::types::target_database +CREATE FUNCTION df."target_database"() RETURNS TEXT /* alloc::string::String */ +IMMUTABLE STRICT PARALLEL SAFE +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'target_database_wrapper'; +/* */ + +/* */ +-- src/lib.rs:574 +-- requires: +-- df +-- target_database + + +-- Validate that CREATE EXTENSION is run in the correct database +-- The background worker connects to one specific database (determined by +-- the pg_durable.database GUC, defaults to "postgres"). +-- The extension must be created in that database for workflows to execute. +DO $$ +DECLARE + current_db TEXT; + target_db TEXT; +BEGIN + -- Get the current database + SELECT pg_catalog.current_database() INTO current_db; + + -- Get the target database that the background worker will connect to + SELECT df.target_database() INTO target_db; + + IF current_db OPERATOR(pg_catalog.<>) target_db THEN + RAISE EXCEPTION 'pg_durable extension must be created in database "%" (currently in "%"). The background worker only processes functions in the database specified by the pg_durable.database GUC (defaults to "postgres").', target_db, current_db + USING HINT = 'Connect to the correct database and run: CREATE EXTENSION pg_durable;'; + END IF; +END $$; +/* */ + +/* */ +-- src/lib.rs:621 +-- requires: +-- validate_database + + +-- The duroxide provider schema is created here so the extension owns it. +-- No IF NOT EXISTS: fails loudly if a duroxide schema already exists, +-- preventing adoption of a potentially attacker-crafted schema. +-- The background worker populates this schema at startup via ApplyAll. +CREATE SCHEMA duroxide; + +/* */ + +/* */ +-- src/dsl.rs:181 +-- pg_durable::dsl::clearvars +CREATE FUNCTION df."clearvars"() RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'clearvars_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:452 +-- pg_durable::dsl::race +CREATE FUNCTION df."race"( + "a" TEXT, /* &str */ + "b" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'race_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:249 +-- pg_durable::dsl::sleep +CREATE FUNCTION df."sleep"( + "seconds" bigint /* i64 */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'sleep_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:237 +-- pg_durable::dsl::as +CREATE FUNCTION df."as"( + "fut" TEXT, /* &str */ + "name" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'as_named_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:116 +-- pg_durable::monitoring::instance_info +CREATE FUNCTION df."instance_info"( + "instance_id" TEXT /* &str */ +) RETURNS TABLE ( + "instance_id" TEXT, /* alloc::string::String */ + "label" TEXT, /* core::option::Option */ + "function_name" TEXT, /* alloc::string::String */ + "function_version" TEXT, /* alloc::string::String */ + "current_execution_id" bigint, /* i64 */ + "status" TEXT, /* alloc::string::String */ + "output" TEXT /* core::option::Option */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'instance_info_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:161 +-- pg_durable::dsl::unsetvar +CREATE FUNCTION df."unsetvar"( + "name" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'unsetvar_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:206 +-- pg_durable::dsl::sql +CREATE FUNCTION df."sql"( + "query" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'sql_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:219 +-- pg_durable::dsl::seq +CREATE FUNCTION df."seq"( + "a" TEXT, /* &str */ + "b" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'then_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:430 +-- pg_durable::dsl::join3 +CREATE FUNCTION df."join3"( + "a" TEXT, /* &str */ + "b" TEXT, /* &str */ + "c" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'join3_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:392 +-- pg_durable::dsl::if_rows +CREATE FUNCTION df."if_rows"( + "result_name" TEXT, /* &str */ + "then_branch" TEXT, /* &str */ + "else_branch" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'if_rows_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:265 +-- pg_durable::dsl::wait_for_schedule +CREATE FUNCTION df."wait_for_schedule"( + "cron_expr" TEXT /* &str */ +) RETURNS TEXT /* alloc::string::String */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'wait_for_schedule_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:983 +-- pg_durable::dsl::result +CREATE FUNCTION df."result"( + "instance_id" TEXT /* &str */ +) RETURNS TEXT /* core::option::Option */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'result_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:353 +-- pg_durable::dsl::break +CREATE FUNCTION df."break"( + "value" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'break_fn_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:553 +-- pg_durable::dsl::wait_for_signal +CREATE FUNCTION df."wait_for_signal"( + "name" TEXT, /* &str */ + "timeout_seconds" INT DEFAULT NULL /* core::option::Option */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'wait_for_signal_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:312 +-- pg_durable::dsl::loop +CREATE FUNCTION df."loop"( + "body" TEXT, /* &str */ + "condition" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'loop_fn_wrapper'; +/* */ + +/* */ +-- src/lib.rs:652 +-- requires: +-- dsl::then_fn +-- dsl::as_named +-- dsl::join +-- dsl::race +-- dsl::if_fn +-- dsl::loop_fn + + +-- Operator ~> for sequencing: a ~> b means "run a, then run b" +CREATE OPERATOR ~> ( + FUNCTION = df.seq, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator |=> for naming: fut |=> 'name' means "name this result as $name" +CREATE OR REPLACE FUNCTION df.as_op(fut text, name text) RETURNS text AS $$ + SELECT df.as(fut, name); +$$ LANGUAGE SQL IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OPERATOR |=> ( + FUNCTION = df.as_op, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator & for parallel join: a & b means "run a and b in parallel, wait for both" +CREATE OPERATOR & ( + FUNCTION = df.join, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator | for race: a | b means "run a and b in parallel, first wins" +CREATE OPERATOR | ( + FUNCTION = df.race, + LEFTARG = text, + RIGHTARG = text +); + +-- Operators ?> and !> for if-then-else: cond ?> then_branch !> else_branch +-- We need helper functions to build the if node incrementally + +-- Helper: cond ?> then creates a partial if (stores condition and then branch) +CREATE OR REPLACE FUNCTION df.if_then_op(condition text, then_branch text) RETURNS text AS $$ +DECLARE + cond_fut jsonb; + then_fut jsonb; + result_obj jsonb; +BEGIN + -- Ensure both are durofuts + cond_fut := df.ensure_durofut(condition)::jsonb; + then_fut := df.ensure_durofut(then_branch)::jsonb; + + -- Return a special marker object for the partial if + result_obj := jsonb_build_object( + '_partial_if', true, + 'condition', cond_fut, + 'then_branch', then_fut + ); + RETURN result_obj::text; +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +-- Helper: partial_if !> else completes the if node +CREATE OR REPLACE FUNCTION df.if_else_op(partial_if text, else_branch text) RETURNS text AS $$ +DECLARE + partial jsonb; + else_fut text; + cond_text text; + then_text text; +BEGIN + partial := partial_if::jsonb; + + -- Check if it's a partial if + IF partial->>'_partial_if' IS NULL THEN + RAISE EXCEPTION 'Invalid if-then-else: left side of !> must be a ?> expression'; + END IF; + + cond_text := partial->'condition'::text; + then_text := partial->'then_branch'::text; + else_fut := df.ensure_durofut(else_branch); + + -- Now call the real df.if function + RETURN df.if(cond_text, then_text, else_fut); +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +-- Helper to ensure a value is a durofut (returns JSON string) +-- Rejects JSON with unknown node_type values. +-- NOTE: The valid node type list here must be kept in sync with +-- VALID_NODE_TYPES in src/types.rs (the Rust constant is the canonical source). +CREATE OR REPLACE FUNCTION df.ensure_durofut(val text) RETURNS text AS $$ +DECLARE + node_type_val text; +BEGIN + -- Try to parse as JSON to check if it's already a durofut + BEGIN + node_type_val := (val::jsonb)->>'node_type'; + IF node_type_val IS NOT NULL THEN + -- Has a node_type - validate it + IF node_type_val NOT IN ('SQL', 'THEN', 'IF', 'JOIN', 'LOOP', 'BREAK', 'RACE', 'SLEEP', 'WAIT_SCHEDULE', 'HTTP', 'SIGNAL') THEN + RAISE EXCEPTION 'Unknown node_type ''%''. Valid types: SQL, THEN, IF, JOIN, LOOP, BREAK, RACE, SLEEP, WAIT_SCHEDULE, HTTP, SIGNAL', node_type_val; + END IF; + RETURN val; + END IF; + EXCEPTION WHEN invalid_text_representation THEN + -- Not valid JSON, treat as SQL + NULL; + WHEN raise_exception THEN + -- Re-raise our validation error + RAISE; + WHEN OTHERS THEN + -- Not valid JSON, treat as SQL + NULL; + END; + + -- It's plain SQL, wrap it + RETURN df.sql(val); +END; +$$ LANGUAGE plpgsql IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OPERATOR ?> ( + FUNCTION = df.if_then_op, + LEFTARG = text, + RIGHTARG = text +); + +CREATE OPERATOR !> ( + FUNCTION = df.if_else_op, + LEFTARG = text, + RIGHTARG = text +); + +-- Operator @> for loop: @> body means "repeat body forever" +-- This is a PREFIX operator with lowest precedence +CREATE OR REPLACE FUNCTION df.loop_prefix_op(body text) RETURNS text AS $$ + SELECT df.loop(body); +$$ LANGUAGE SQL IMMUTABLE SET search_path = pg_catalog, df, pg_temp; + +CREATE OPERATOR @> ( + FUNCTION = df.loop_prefix_op, + RIGHTARG = text +); +/* */ + +/* */ +-- src/monitoring.rs:195 +-- pg_durable::monitoring::instance_executions +CREATE FUNCTION df."instance_executions"( + "instance_id" TEXT, /* &str */ + "limit_count" INT DEFAULT 5 /* i32 */ +) RETURNS TABLE ( + "execution_id" bigint, /* i64 */ + "status" TEXT, /* alloc::string::String */ + "event_count" bigint, /* i64 */ + "duration_ms" bigint, /* i64 */ + "output" TEXT /* core::option::Option */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'instance_executions_wrapper'; +/* */ + +/* */ +-- src/monitoring.rs:18 +-- pg_durable::monitoring::list_instances +CREATE FUNCTION df."list_instances"( + "status_filter" TEXT DEFAULT NULL, /* core::option::Option<&str> */ + "limit_count" INT DEFAULT 100 /* i32 */ +) RETURNS TABLE ( + "instance_id" TEXT, /* alloc::string::String */ + "label" TEXT, /* core::option::Option */ + "function_name" TEXT, /* alloc::string::String */ + "status" TEXT, /* alloc::string::String */ + "execution_count" bigint, /* i64 */ + "output" TEXT /* core::option::Option */ +) + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'list_instances_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:962 +-- pg_durable::dsl::status +CREATE FUNCTION df."status"( + "instance_id" TEXT /* &str */ +) RETURNS TEXT /* core::option::Option */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'status_wrapper'; +/* */ + +/* */ +-- src/dsl.rs:478 +-- pg_durable::dsl::http +CREATE FUNCTION df."http"( + "url" TEXT, /* &str */ + "method" TEXT DEFAULT 'POST', /* &str */ + "body" TEXT DEFAULT NULL, /* core::option::Option<&str> */ + "headers" jsonb DEFAULT NULL, /* core::option::Option */ + "timeout_seconds" INT DEFAULT 30 /* i32 */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'http_wrapper'; +/* */ + +/* */ +-- src/lib.rs:303 +-- requires: +-- create_tables +-- dsl::http + + +-- Enable RLS on df.instances (no FORCE — superuser/table-owner bypasses RLS) +ALTER TABLE df.instances ENABLE ROW LEVEL SECURITY; + +CREATE POLICY instances_user_isolation ON df.instances + FOR ALL + USING (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole) + WITH CHECK (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole); + +-- Enable RLS on df.nodes +ALTER TABLE df.nodes ENABLE ROW LEVEL SECURITY; + +CREATE POLICY nodes_user_isolation ON df.nodes + FOR ALL + USING (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole) + WITH CHECK (submitted_by OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole); + +-- Enable RLS on df.vars (per-user variable isolation) +ALTER TABLE df.vars ENABLE ROW LEVEL SECURITY; + +CREATE POLICY vars_user_isolation ON df.vars + FOR ALL + USING (owner OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole) + WITH CHECK (owner OPERATOR(pg_catalog.=) pg_catalog.quote_ident(current_user)::pg_catalog.regrole); + +-- No automatic PUBLIC grants. Admins must explicitly grant privileges +-- to application roles after CREATE EXTENSION. +-- Use df.grant_usage('role_name') (recommended) or see USER_GUIDE.md +-- "Privilege Grants" for the equivalent manual GRANT statements. + +-- Helper: grant all required df privileges to a role in one call. +-- Authorization model: This function is SECURITY INVOKER and EXECUTE is +-- revoked from PUBLIC, so only roles explicitly granted EXECUTE (via +-- with_grant => true or a direct superuser GRANT) can call it. The inner +-- GRANT statements run as the caller, so the caller must also hold the +-- underlying privileges WITH GRANT OPTION (automatically true for +-- superusers; for delegated admins, df.grant_usage(..., with_grant => true) +-- grants all privileges WITH GRANT OPTION). +-- +-- This function is purely additive — it never issues REVOKE. To downgrade +-- a role's privileges, call df.revoke_usage() first, then df.grant_usage() +-- with the desired options. +-- +-- include_http controls whether the role is granted EXECUTE on df.http(). +-- Default is false: HTTP access is opt-in because df.http() makes outbound +-- network requests and requires explicit administrator approval. +-- +-- with_grant controls whether the target role receives privileges WITH GRANT +-- OPTION and can itself call df.grant_usage() / df.revoke_usage() to manage +-- other roles' access. Authorization is enforced by PostgreSQL's native +-- WITH GRANT OPTION mechanism: the caller must hold each underlying +-- privilege WITH GRANT OPTION, which is automatically true for superusers +-- and for delegated admins granted via with_grant => true. +-- +-- MAINTENANCE: when adding a new df.* function, add it to the func_sigs +-- array below. Functions NOT in this list are deny-by-default. +CREATE OR REPLACE FUNCTION df.grant_usage( + p_role TEXT, + include_http boolean DEFAULT false, + with_grant boolean DEFAULT false +) +RETURNS VOID +LANGUAGE plpgsql +SET search_path = pg_catalog, df, pg_temp +AS $fn$ +DECLARE + grant_opt TEXT := ''; + func_sig TEXT; + -- Explicit list of df.* functions to grant. Sensitive functions + -- (df.http, df.grant_usage, df.revoke_usage) are excluded from this + -- list and granted conditionally below. + func_sigs TEXT[] := ARRAY[ + -- DSL functions + 'df.sql(text)', + 'df.seq(text, text)', + 'df.as(text, text)', + 'df.sleep(bigint)', + 'df.wait_for_schedule(text)', + 'df.loop(text, text)', + 'df.break(text)', + 'df.if(text, text, text)', + 'df.if_rows(text, text, text)', + 'df.join(text, text)', + 'df.join3(text, text, text)', + 'df.race(text, text)', + 'df.wait_for_signal(text, integer)', + 'df.signal(text, text, text)', + 'df.start(text, text, text)', + 'df.setvar(text, text)', + 'df.getvar(text)', + 'df.unsetvar(text)', + 'df.clearvars()', + -- Monitoring functions + 'df.status(text)', + 'df.result(text)', + 'df.cancel(text, text)', + 'df.wait_for_completion(text, integer)', + 'df.run(text)', + 'df.list_instances(text, integer)', + 'df.instance_info(text)', + 'df.instance_nodes(text, integer)', + 'df.instance_executions(text, integer)', + 'df.metrics()', + -- Internal helpers (operators, version, etc.) + 'df.as_op(text, text)', + 'df.if_then_op(text, text)', + 'df.if_else_op(text, text)', + 'df.ensure_durofut(text)', + 'df.loop_prefix_op(text)', + 'df.version()', + 'df.debug_connection()', + 'df.explain(text)', + 'df.target_database()' + ]; +BEGIN + -- Validate the role exists + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = p_role) THEN + RAISE EXCEPTION 'role "%" does not exist', p_role; + END IF; + + IF with_grant THEN + grant_opt := ' WITH GRANT OPTION'; + END IF; + + -- Schema access + EXECUTE format('GRANT USAGE ON SCHEMA df TO %I', p_role) || grant_opt; + + -- Grant EXECUTE on each standard function explicitly. + FOREACH func_sig IN ARRAY func_sigs LOOP + EXECUTE format('GRANT EXECUTE ON FUNCTION %s TO %I', func_sig, p_role) || grant_opt; + END LOOP; + + -- df.http() — opt-in because it makes outbound network requests. + IF include_http THEN + EXECUTE format('GRANT EXECUTE ON FUNCTION df.http(text, text, text, jsonb, integer) TO %I', p_role) || grant_opt; + END IF; + + -- Admin helpers — only for delegated administrators. + IF with_grant THEN + EXECUTE format('GRANT EXECUTE ON FUNCTION df.grant_usage(text, boolean, boolean) TO %I', p_role) || grant_opt; + EXECUTE format('GRANT EXECUTE ON FUNCTION df.revoke_usage(text) TO %I', p_role) || grant_opt; + END IF; + + -- Table privileges + EXECUTE format('GRANT SELECT ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT SELECT ON df.nodes TO %I', p_role) || grant_opt; + EXECUTE format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) || grant_opt; + EXECUTE format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) || grant_opt; + EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) || grant_opt; + + RAISE NOTICE 'pg_durable: granted df usage privileges to "%"', p_role; +END; +$fn$; + +-- Revoke all df privileges previously granted by df.grant_usage(). +-- Authorization: same model as df.grant_usage() — EXECUTE is revoked from +-- PUBLIC, caller must hold the underlying privileges to revoke them. +-- Safety: format(%I) quotes identifiers to prevent SQL injection. Additionally, +-- this is SECURITY INVOKER so it cannot escalate beyond the caller's privileges. +CREATE OR REPLACE FUNCTION df.revoke_usage(p_role TEXT) +RETURNS VOID +LANGUAGE plpgsql +SET search_path = pg_catalog, df, pg_temp +AS $fn$ +DECLARE + func_oid oid; +BEGIN + -- Validate the role exists + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = p_role) THEN + RAISE EXCEPTION 'role "%" does not exist', p_role; + END IF; + + -- Prevent accidentally revoking your own access. pg_has_role checks + -- both direct identity (current_user = p_role) and inherited membership + -- (current_user is a member of p_role), so revoking a parent role that + -- the caller depends on is also caught. + -- Superusers are exempt: pg_has_role returns true for all roles when the + -- caller is a superuser, and superusers can always re-grant themselves. + IF NOT EXISTS ( + SELECT 1 + FROM pg_roles + WHERE rolname = current_user + AND rolsuper + ) + AND pg_has_role(current_user, p_role, 'MEMBER') THEN + RAISE EXCEPTION 'cannot revoke df privileges from "%" because the current role ("%") is a member of it — use a different administrator', p_role, current_user; + END IF; + + -- CASCADE: if the target role granted sub-grants (via WITH GRANT OPTION), + -- CASCADE ensures those dependent privileges are also revoked. + -- Column-level revokes must match the column-level grants from grant_usage(). + EXECUTE format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); + EXECUTE format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); + EXECUTE format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); + EXECUTE format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); + EXECUTE format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); + EXECUTE format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); + + -- Revoke EXECUTE per-function rather than using the blanket + -- REVOKE ON ALL FUNCTIONS. A delegated admin may lack privilege on + -- some functions (e.g. df.http); per-function revokes let us skip those. + FOR func_oid IN + SELECT p.oid FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE n.nspname = 'df' + LOOP + BEGIN + EXECUTE format('REVOKE EXECUTE ON FUNCTION %s FROM %I CASCADE', func_oid::regprocedure, p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + END LOOP; + + EXECUTE format('REVOKE USAGE ON SCHEMA df FROM %I CASCADE', p_role); + + RAISE NOTICE 'pg_durable: revoked df usage privileges granted by "%" from "%"', current_user, p_role; +END; +$fn$; + +-- Validate that the worker role is a superuser. +-- The background worker must bypass RLS to manage all users' instances/nodes. +-- If the worker role is not a superuser, workflows will silently fail because +-- RLS will filter out rows the worker needs to read/update. +DO $$ +DECLARE + wrole TEXT; + is_super BOOLEAN; +BEGIN + wrole := pg_catalog.current_setting('pg_durable.worker_role', true); + IF wrole IS NULL OR wrole OPERATOR(pg_catalog.=) '' THEN + wrole := 'azuresu'; + END IF; + + SELECT rolsuper INTO is_super FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) wrole; + IF is_super IS NULL THEN + RAISE WARNING 'pg_durable: worker role "%" does not exist. The background worker will not be able to process workflows. Create the role as a superuser before using pg_durable.', wrole; + ELSIF NOT is_super THEN + RAISE WARNING 'pg_durable: worker role "%" is not a superuser. The background worker must be a superuser to bypass RLS and manage all users'' instances. Grant superuser or BYPASSRLS to this role.', wrole; + END IF; +END $$; + +-- df.http() carries network-access implications and must be opt-in. +-- PostgreSQL grants EXECUTE to PUBLIC by default when functions are created; +-- revoke that so only roles explicitly granted access (via +-- df.grant_usage(role, include_http => true) or a direct GRANT) can make +-- HTTP requests. +REVOKE EXECUTE ON FUNCTION df.http(text, text, text, jsonb, integer) FROM PUBLIC; + +-- df.grant_usage() and df.revoke_usage() are admin-only helpers. +-- Revoke PUBLIC's default EXECUTE privilege so that only roles explicitly +-- granted access (via with_grant => true or a direct superuser GRANT) can +-- manage other roles' df privileges. +REVOKE EXECUTE ON FUNCTION df.grant_usage(text, boolean, boolean) FROM PUBLIC; +REVOKE EXECUTE ON FUNCTION df.revoke_usage(text) FROM PUBLIC; +/* */ + +/* */ +-- src/dsl.rs:630 +-- pg_durable::dsl::start +CREATE FUNCTION df."start"( + "fut" TEXT, /* &str */ + "label" TEXT DEFAULT NULL, /* core::option::Option<&str> */ + "database" TEXT DEFAULT NULL /* core::option::Option<&str> */ +) RETURNS TEXT /* alloc::string::String */ + +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'start_wrapper'; +/* */ + diff --git a/src/client.rs b/src/client.rs index f358fe4..625ea73 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,7 +12,7 @@ use duroxide::Client; use pgrx::prelude::*; use tokio::runtime::Runtime; -use crate::types::{new_backend_provider, postgres_connection_string}; +use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; /// Cached tokio runtime for client operations. static CLIENT_RUNTIME: OnceLock = OnceLock::new(); @@ -23,17 +23,20 @@ static DUROXIDE_CLIENT: OnceLock = OnceLock::new(); /// Check whether the background worker has finished initializing the duroxide /// schema for the current binary's expected schema version. /// -/// Returns `false` if `duroxide._worker_ready` does not exist, has no row, or -/// has a `schema_version` below `WORKER_SCHEMA_VERSION`. This is a fast SPI -/// read called once per session on the first call to any `df.*` function that -/// needs the duroxide client. +/// Returns `false` if `._worker_ready` does not exist, has no +/// row, or has a `schema_version` below `WORKER_SCHEMA_VERSION`. This is a fast +/// SPI read called once per session on the first call to any `df.*` function +/// that needs the duroxide client. fn is_worker_ready() -> bool { + let schema = backend_duroxide_schema(); + // First check if the readiness table exists via the catalogue. Querying // the non-existent table directly would raise a PostgreSQL ERROR that // aborts the current (sub)transaction — even if caught in Rust. - let table_exists = Spi::get_one::( + let table_exists = Spi::get_one_with_args::( "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_tables \ - WHERE schemaname = 'duroxide' AND tablename = '_worker_ready')", + WHERE schemaname = $1 AND tablename = '_worker_ready')", + &[schema.into()], ) .ok() .flatten() @@ -44,7 +47,10 @@ fn is_worker_ready() -> bool { } Spi::get_one_with_args::( - "SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= $1)", + &format!( + "SELECT EXISTS(SELECT 1 FROM {}._worker_ready WHERE schema_version >= $1)", + schema + ), &[crate::WORKER_SCHEMA_VERSION.into()], ) .ok() @@ -76,6 +82,7 @@ fn get_duroxide_client() -> Result<&'static Client, String> { let rt = get_client_runtime(); let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); rt.block_on(async { // Limit backend provider to 1 connection — backends need minimal duroxide @@ -83,7 +90,7 @@ fn get_duroxide_client() -> Result<&'static Client, String> { // (new_current_thread). Note: std::env::set_var becomes unsafe in Rust 2024 edition. std::env::set_var("DUROXIDE_PG_POOL_MAX", "1"); - let store = new_backend_provider(&pg_conn_str).await?; + let store = new_backend_provider(&pg_conn_str, schema).await?; let _ = DUROXIDE_CLIENT.set(Client::new(store)); DUROXIDE_CLIENT diff --git a/src/dsl.rs b/src/dsl.rs index 0b8da51..8db0910 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -47,11 +47,11 @@ pub fn version() -> String { /// Debug function to see what duroxide connection is being used #[pg_extern(schema = "df")] pub fn debug_connection() -> String { - use crate::types::{postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{backend_duroxide_schema, postgres_connection_string}; format!( "{} (schema: {})", postgres_connection_string(), - DUROXIDE_SCHEMA + backend_duroxide_schema() ) } diff --git a/src/explain.rs b/src/explain.rs index 499afa2..7c5dab9 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -124,10 +124,11 @@ fn explain_instance(instance_id: &str) -> String { /// Get instance info from Duroxide store fn get_duroxide_instance_info(instance_id: &str) -> (String, Option) { - use crate::types::{new_backend_provider, postgres_connection_string}; + use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; use duroxide::Client; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() @@ -138,7 +139,7 @@ fn get_duroxide_instance_info(instance_id: &str) -> (String, Option) { }; rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, schema).await { Ok(s) => s, Err(_) => return (String::new(), None), }; diff --git a/src/lib.rs b/src/lib.rs index 7dd88d9..fc5a533 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -620,11 +620,26 @@ END $$; extension_sql!( r#" --- The duroxide schema is created here so the extension owns it. --- No IF NOT EXISTS: fails loudly if a duroxide schema already exists, +-- The duroxide provider schema is created here so the extension owns it. +-- No IF NOT EXISTS: fails loudly if a _duroxide schema already exists, -- preventing adoption of a potentially attacker-crafted schema. -- The background worker populates this schema at startup via ApplyAll. -CREATE SCHEMA duroxide; +-- +-- Fresh installs use the '_duroxide' schema. Installs that originated on +-- pg_durable <= 0.2.2 keep the legacy 'duroxide' schema; the upgrade script +-- pg_durable--0.2.2--0.2.3.sql defines df.duroxide_schema() to return +-- 'duroxide' for those installs. Both backend sessions and the background +-- worker call df.duroxide_schema() to discover which schema to use, falling +-- back to 'duroxide' when the helper is absent (installs predating it). +CREATE SCHEMA _duroxide; + +-- Returns the name of the duroxide provider schema selected for this install. +-- Fresh installs return '_duroxide'. The body is version-specific: the upgrade +-- script for pre-existing installs replaces it to return 'duroxide'. +CREATE FUNCTION df.duroxide_schema() RETURNS text + LANGUAGE sql IMMUTABLE PARALLEL SAFE + SET search_path = pg_catalog, pg_temp + AS $$ SELECT '_duroxide'::text $$; "#, name = "create_duroxide_schema", requires = ["validate_database"] @@ -800,10 +815,13 @@ mod tests { /// Ensure the Duroxide store exists and is ready fn ensure_store_ready() -> Result { - use crate::types::{new_backend_provider, postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{ + backend_duroxide_schema, new_backend_provider, postgres_connection_string, + }; use std::time::{Duration, Instant}; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -816,8 +834,8 @@ mod tests { let timeout = Duration::from_secs(10); loop { - match new_backend_provider(&pg_conn_str).await { - Ok(_) => return Ok(format!("{pg_conn_str} (schema: {DUROXIDE_SCHEMA})")), + match new_backend_provider(&pg_conn_str, schema).await { + Ok(_) => return Ok(format!("{pg_conn_str} (schema: {schema})")), Err(e) => { if start.elapsed() > timeout { return Err(format!( @@ -835,7 +853,9 @@ mod tests { /// Wait for a durable function to complete, polling Duroxide status fn wait_for_completion(instance_id: &str, timeout_secs: u64) -> Result { - use crate::types::{new_backend_provider, postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{ + backend_duroxide_schema, new_backend_provider, postgres_connection_string, + }; use duroxide::Client; use std::time::{Duration, Instant}; @@ -843,6 +863,7 @@ mod tests { let _ = ensure_store_ready()?; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let start = Instant::now(); let timeout = Duration::from_secs(timeout_secs); @@ -852,7 +873,7 @@ mod tests { .map_err(|e| format!("Failed to create runtime: {e}"))?; rt.block_on(async { - let store = new_backend_provider(&pg_conn_str).await?; + let store = new_backend_provider(&pg_conn_str, schema).await?; let client = Client::new(store); loop { @@ -892,11 +913,14 @@ mod tests { /// Get the current status from Duroxide fn get_duroxide_status(instance_id: &str) -> Option { - use crate::types::{new_backend_provider, postgres_connection_string, DUROXIDE_SCHEMA}; + use crate::types::{ + backend_duroxide_schema, new_backend_provider, postgres_connection_string, + }; use duroxide::Client; let _ = ensure_store_ready().ok()?; let pg_conn_str = postgres_connection_string(); + let schema = backend_duroxide_schema(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -904,7 +928,7 @@ mod tests { .ok()?; rt.block_on(async { - let store = new_backend_provider(&pg_conn_str).await.ok()?; + let store = new_backend_provider(&pg_conn_str, schema).await.ok()?; let client = Client::new(store); client .get_instance_info(instance_id) diff --git a/src/monitoring.rs b/src/monitoring.rs index a42c74d..723ce07 100644 --- a/src/monitoring.rs +++ b/src/monitoring.rs @@ -8,7 +8,7 @@ use duroxide::Client; use pgrx::prelude::*; -use crate::types::{new_backend_provider, postgres_connection_string}; +use crate::types::{backend_duroxide_schema, new_backend_provider, postgres_connection_string}; // ============================================================================ // Monitoring Functions @@ -36,6 +36,7 @@ pub fn list_instances( let limit_count = limit_count.min(10000); let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); // Query df.instances via SPI first — RLS filters to calling user's rows only. // We also fetch status here so that all three monitoring APIs (df.status(), @@ -82,7 +83,7 @@ pub fn list_instances( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -128,6 +129,7 @@ pub fn instance_info( ), > { let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); let instance_id_str = instance_id.to_string(); // Ownership check: SPI goes through RLS, returning NULL for non-owned instances. @@ -165,7 +167,7 @@ pub fn instance_info( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -205,6 +207,7 @@ pub fn instance_executions( ), > { let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); let instance_id_owned = instance_id.to_string(); // Ownership check: SPI goes through RLS, so non-owned instances are invisible. @@ -229,7 +232,7 @@ pub fn instance_executions( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -282,6 +285,7 @@ pub fn metrics() -> TableIterator< ), > { let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() @@ -292,7 +296,7 @@ pub fn metrics() -> TableIterator< }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; @@ -339,6 +343,7 @@ pub fn instance_nodes( let instance_id = instance_id_param.to_string(); let pg_conn_str = postgres_connection_string(); + let provider_schema = backend_duroxide_schema(); // Get node definitions from PostgreSQL (including status, result and updated_at) let node_defs: Vec<( @@ -392,7 +397,7 @@ pub fn instance_nodes( }; let results = rt.block_on(async { - let store = match new_backend_provider(&pg_conn_str).await { + let store = match new_backend_provider(&pg_conn_str, provider_schema).await { Ok(s) => s, Err(_) => return vec![], }; diff --git a/src/types.rs b/src/types.rs index dd9c56b..27806d1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::ffi::CString; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::time::Duration; use uuid::Uuid; @@ -218,16 +218,87 @@ pub async fn connect_as_user( Ok(conn) } -/// Schema name for Duroxide internal tables -pub const DUROXIDE_SCHEMA: &str = "duroxide"; +/// Legacy duroxide provider schema name used by installs created before the +/// `df.duroxide_schema()` helper existed (pg_durable ≤ 0.2.2). It is the only +/// fallback when that helper is absent, and the value the upgrade script pins +/// existing clusters to. +pub const LEGACY_DUROXIDE_SCHEMA: &str = "duroxide"; + +/// Resolve the duroxide provider schema name by calling the extension-owned +/// `df.duroxide_schema()` helper. +/// +/// Returns [`LEGACY_DUROXIDE_SCHEMA`] when the helper does not exist (an install +/// that predates it — e.g. a new `.so` deployed against a ≤0.2.2 schema without +/// running `ALTER EXTENSION pg_durable UPDATE`). The presence check uses the +/// catalog rather than catching `42883` so it never aborts the surrounding +/// (sub)transaction in a backend session. +fn resolve_duroxide_schema_spi() -> String { + let helper_exists = Spi::get_one::( + "SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' AND p.proname = 'duroxide_schema' AND p.pronargs = 0)", + ) + .ok() + .flatten() + .unwrap_or(false); + + if !helper_exists { + return LEGACY_DUROXIDE_SCHEMA.to_string(); + } + + match Spi::get_one::("SELECT df.duroxide_schema()") { + Ok(Some(s)) if !s.is_empty() => s, + _ => LEGACY_DUROXIDE_SCHEMA.to_string(), + } +} + +/// Resolve the duroxide provider schema for the current backend session, +/// caching it for the session lifetime. The value cannot change without an +/// extension upgrade, which requires a reconnect to observe reliably, so a +/// per-session cache is safe. +pub fn backend_duroxide_schema() -> &'static str { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA.get_or_init(resolve_duroxide_schema_spi) +} + +/// Resolve the duroxide provider schema name from the background worker using an +/// async pool. Mirrors [`resolve_duroxide_schema_spi`] but for the BGW context. +/// The BGW resolves this once per epoch (after the extension is detected) rather +/// than caching for the process lifetime, because drop+recreate can switch the +/// provider schema within a single worker lifetime. +pub async fn resolve_duroxide_schema_pool(pool: &sqlx::PgPool) -> String { + let helper_exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM pg_proc p \ + JOIN pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' AND p.proname = 'duroxide_schema' AND p.pronargs = 0)", + ) + .fetch_one(pool) + .await + .unwrap_or(false); + + if !helper_exists { + return LEGACY_DUROXIDE_SCHEMA.to_string(); + } + + match sqlx::query_scalar::<_, String>("SELECT df.duroxide_schema()") + .fetch_one(pool) + .await + { + Ok(s) if !s.is_empty() => s, + _ => LEGACY_DUROXIDE_SCHEMA.to_string(), + } +} /// Create a `ProviderConfig` for backend (request/response) operations. /// /// - `VerifyOnly`: never create schema/tables, reject unknown migrations. /// Backend sessions must not run DDL — the BGW owns schema lifecycle. -pub fn backend_provider_config(database_url: &str) -> duroxide_pg::ProviderConfig { +pub fn backend_provider_config( + database_url: &str, + schema_name: &str, +) -> duroxide_pg::ProviderConfig { let mut config = duroxide_pg::ProviderConfig::url(database_url); - config.schema_name = Some(DUROXIDE_SCHEMA.to_string()); + config.schema_name = Some(schema_name.to_string()); config.migration_policy = duroxide_pg::MigrationPolicy::VerifyOnly; config } @@ -235,22 +306,29 @@ pub fn backend_provider_config(database_url: &str) -> duroxide_pg::ProviderConfi /// Create a backend provider for request/response operations. pub async fn new_backend_provider( database_url: &str, + schema_name: &str, ) -> Result, String> { - duroxide_pg::PostgresProvider::new_with_config(backend_provider_config(database_url)) - .await - .map(Arc::new) - .map_err(|e| format!("Failed to connect to duroxide store: {e}")) + duroxide_pg::PostgresProvider::new_with_config(backend_provider_config( + database_url, + schema_name, + )) + .await + .map(Arc::new) + .map_err(|e| format!("Failed to connect to duroxide store: {e}")) } /// Create a `ProviderConfig` for the background worker runtime. /// /// - `ApplyAll`: applies pending duroxide migrations at startup; creates tables -/// inside the extension-owned `duroxide` schema. Safe because the BGW verifies +/// inside the extension-owned provider schema. Safe because the BGW verifies /// schema ownership via `pg_depend` before calling /// `PostgresProvider::new_with_config`. -pub fn worker_provider_config(database_url: &str) -> duroxide_pg::ProviderConfig { +pub fn worker_provider_config( + database_url: &str, + schema_name: &str, +) -> duroxide_pg::ProviderConfig { let mut config = duroxide_pg::ProviderConfig::url(database_url); - config.schema_name = Some(DUROXIDE_SCHEMA.to_string()); + config.schema_name = Some(schema_name.to_string()); config.migration_policy = duroxide_pg::MigrationPolicy::ApplyAll; config } diff --git a/src/worker.rs b/src/worker.rs index 6a1cdee..4f6d66a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -18,7 +18,7 @@ use tracing_subscriber::EnvFilter; use crate::registry::{create_activity_registry, create_orchestration_registry}; use crate::types::{ get_max_duroxide_connections, get_max_management_connections, get_max_user_connections, - postgres_connection_string, worker_provider_config, DUROXIDE_SCHEMA, + postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, }; /// Initialize tracing subscriber for duroxide logs. @@ -92,9 +92,8 @@ async fn run_duroxide_runtime() { let pg_conn_str = postgres_connection_string(); log!( - "pg_durable: background worker connected to PostgreSQL at {} (schema: {})", + "pg_durable: background worker connected to PostgreSQL at {}", pg_conn_str, - DUROXIDE_SCHEMA ); // Validate connection limit GUCs at startup @@ -161,8 +160,23 @@ async fn run_duroxide_runtime() { break; } - let Some(duroxide_runtime) = - initialize_duroxide_runtime(&pg_conn_str, INIT_RETRY_INTERVAL, &mgmt_pool).await + // Resolve the duroxide provider schema for this epoch. The extension may + // have been dropped and recreated with a different schema version (e.g. + // a fresh `_duroxide` install vs. a legacy `duroxide` install), so we + // must re-resolve after every CREATE EXTENSION rather than once at startup. + let duroxide_schema = resolve_duroxide_schema_pool(&mgmt_pool).await; + log!( + "pg_durable: using duroxide provider schema '{}' for this epoch", + duroxide_schema + ); + + let Some(duroxide_runtime) = initialize_duroxide_runtime( + &pg_conn_str, + INIT_RETRY_INTERVAL, + &mgmt_pool, + &duroxide_schema, + ) + .await else { // Shutdown requested or extension dropped while initializing. continue; @@ -171,7 +185,7 @@ async fn run_duroxide_runtime() { // Write the worker readiness record so backend sessions know the // duroxide schema is fully initialized for this schema version. // Skipped if the row already has the current WORKER_SCHEMA_VERSION. - if let Err(e) = write_worker_ready(&mgmt_pool).await { + if let Err(e) = write_worker_ready(&mgmt_pool, &duroxide_schema).await { log!("pg_durable: failed to write worker readiness record: {}", e); } @@ -233,7 +247,7 @@ async fn check_extension_exists(pool: &sqlx::PgPool) -> bool { /// /// This prevents the BGW from running ApplyAll into an attacker-crafted schema /// that happens to be named "duroxide" but was not created by CREATE EXTENSION. -async fn check_duroxide_schema_owned(pool: &sqlx::PgPool) -> bool { +async fn check_duroxide_schema_owned(pool: &sqlx::PgPool, schema_name: &str) -> bool { let result: Result<(bool,), sqlx::Error> = sqlx::query_as( "SELECT EXISTS ( SELECT 1 @@ -245,9 +259,10 @@ async fn check_duroxide_schema_owned(pool: &sqlx::PgPool) -> bool { JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' + WHERE n.nspname = $1 )", ) + .bind(schema_name) .fetch_one(pool) .await; @@ -262,8 +277,11 @@ async fn check_duroxide_schema_owned(pool: &sqlx::PgPool) -> bool { /// the schema namespace itself). On upgrades from v0.1.1 — where CREATE EXTENSION /// embedded the full duroxide DDL — this de-registers those embedded objects from /// the extension before the BGW applies any new migrations. -async fn release_extension_owned_duroxide_objects(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> { - sqlx::query( +async fn release_extension_owned_duroxide_objects( + pool: &sqlx::PgPool, + schema_name: &str, +) -> Result<(), sqlx::Error> { + sqlx::query(&format!( r#"DO $$ DECLARE r RECORD; @@ -285,7 +303,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' + WHERE n.nspname = '{schema}' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP TRIGGER ' || r.trigger_name || ' ON ' || r.table_name; @@ -303,7 +321,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' + WHERE n.nspname = '{schema}' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP FUNCTION ' || r.sig; END LOOP; @@ -320,7 +338,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' AND c.relkind = 'r' + WHERE n.nspname = '{schema}' AND c.relkind = 'r' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP TABLE ' || r.name; END LOOP; @@ -339,7 +357,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' AND c.relkind = 'i' + WHERE n.nspname = '{schema}' AND c.relkind = 'i' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP INDEX ' || r.name; END LOOP; @@ -356,12 +374,13 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = 'duroxide' AND c.relkind = 'S' + WHERE n.nspname = '{schema}' AND c.relkind = 'S' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP SEQUENCE ' || r.name; END LOOP; END $$"#, - ) + schema = schema_name + )) .execute(pool) .await?; Ok(()) @@ -371,21 +390,21 @@ END $$"#, /// schema namespace entry itself) is still registered as an extension member. /// Used to short-circuit `release_extension_owned_duroxide_objects` on the /// common path (fresh 0.2.0 installs and all restarts after the first upgrade). -async fn has_extension_owned_duroxide_objects(pool: &sqlx::PgPool) -> bool { +async fn has_extension_owned_duroxide_objects(pool: &sqlx::PgPool, schema_name: &str) -> bool { let result: Result<(bool,), sqlx::Error> = sqlx::query_as( "SELECT EXISTS ( SELECT 1 FROM pg_depend d JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' JOIN pg_class c ON c.oid = d.objid - JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = 'duroxide' + JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = $1 WHERE d.classid = 'pg_class'::regclass AND d.deptype = 'e' UNION ALL SELECT 1 FROM pg_depend d JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' JOIN pg_proc p ON p.oid = d.objid - JOIN pg_namespace n ON n.oid = p.pronamespace AND n.nspname = 'duroxide' + JOIN pg_namespace n ON n.oid = p.pronamespace AND n.nspname = $1 WHERE d.classid = 'pg_proc'::regclass AND d.deptype = 'e' UNION ALL SELECT 1 @@ -393,10 +412,11 @@ async fn has_extension_owned_duroxide_objects(pool: &sqlx::PgPool) -> bool { JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' JOIN pg_trigger t ON t.oid = d.objid JOIN pg_class c ON c.oid = t.tgrelid - JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = 'duroxide' + JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = $1 WHERE d.classid = 'pg_trigger'::regclass AND d.deptype = 'e' )", ) + .bind(schema_name) .fetch_one(pool) .await; result.map(|(b,)| b).unwrap_or(false) @@ -406,6 +426,7 @@ async fn initialize_duroxide_runtime( pg_conn_str: &str, retry_interval: Duration, mgmt_pool: &sqlx::PgPool, + schema_name: &str, ) -> Option> { log!("pg_durable: initializing duroxide runtime..."); @@ -434,7 +455,7 @@ async fn initialize_duroxide_runtime( return None; } - if !check_duroxide_schema_owned(mgmt_pool).await { + if !check_duroxide_schema_owned(mgmt_pool, schema_name).await { log!( "pg_durable: duroxide schema missing or not extension-owned \ (CREATE EXTENSION may still be in progress) — will retry" @@ -449,8 +470,8 @@ async fn initialize_duroxide_runtime( // embedded DDL from the extension before ApplyAll runs. // The existence check avoids executing the five-loop DO block on every // clean restart once the upgrade has already been applied. - if has_extension_owned_duroxide_objects(mgmt_pool).await { - if let Err(e) = release_extension_owned_duroxide_objects(mgmt_pool).await { + if has_extension_owned_duroxide_objects(mgmt_pool, schema_name).await { + if let Err(e) = release_extension_owned_duroxide_objects(mgmt_pool, schema_name).await { log!( "pg_durable: failed to release extension-owned duroxide objects (will retry): {}", e @@ -460,18 +481,22 @@ async fn initialize_duroxide_runtime( } } - let store = - match PostgresProvider::new_with_config(worker_provider_config(pg_conn_str)).await { - Ok(s) => Arc::new(s), - Err(e) => { - log!( - "pg_durable: failed to create PostgreSQL store (will retry): {}", - e - ); - tokio::time::sleep(retry_interval).await; - continue; - } - }; + let store = match PostgresProvider::new_with_config(worker_provider_config( + pg_conn_str, + schema_name, + )) + .await + { + Ok(s) => Arc::new(s), + Err(e) => { + log!( + "pg_durable: failed to create PostgreSQL store (will retry): {}", + e + ); + tokio::time::sleep(retry_interval).await; + continue; + } + }; // Reuse the management pool for activities (graph loading, status updates). // The former dedicated activity pool with its df.in_workflow hook is no @@ -510,35 +535,43 @@ async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result Result<(), sqlx::Error> { - sqlx::query( - "CREATE TABLE IF NOT EXISTS duroxide._worker_ready ( +async fn write_worker_ready(pool: &sqlx::PgPool, schema_name: &str) -> Result<(), sqlx::Error> { + sqlx::query(&format!( + "CREATE TABLE IF NOT EXISTS {schema}._worker_ready ( sentinel BOOLEAN PRIMARY KEY DEFAULT TRUE, CONSTRAINT only_one_sentinel CHECK (sentinel), schema_version INT NOT NULL, initialized_at TIMESTAMPTZ NOT NULL DEFAULT now() )", - ) + schema = schema_name + )) .execute(pool) .await?; // Allow non-superuser sessions to read the readiness record via // is_worker_ready() which runs SPI in the caller's security context. - sqlx::query("GRANT USAGE ON SCHEMA duroxide TO PUBLIC") - .execute(pool) - .await?; - sqlx::query("GRANT SELECT ON duroxide._worker_ready TO PUBLIC") - .execute(pool) - .await?; + sqlx::query(&format!( + "GRANT USAGE ON SCHEMA {schema} TO PUBLIC", + schema = schema_name + )) + .execute(pool) + .await?; + sqlx::query(&format!( + "GRANT SELECT ON {schema}._worker_ready TO PUBLIC", + schema = schema_name + )) + .execute(pool) + .await?; - sqlx::query( - "INSERT INTO duroxide._worker_ready (sentinel, schema_version, initialized_at) \ + sqlx::query(&format!( + "INSERT INTO {schema}._worker_ready (sentinel, schema_version, initialized_at) \ VALUES (TRUE, $1, now()) \ ON CONFLICT (sentinel) DO UPDATE SET \ schema_version = EXCLUDED.schema_version, \ initialized_at = EXCLUDED.initialized_at \ - WHERE duroxide._worker_ready.schema_version != EXCLUDED.schema_version", - ) + WHERE {schema}._worker_ready.schema_version != EXCLUDED.schema_version", + schema = schema_name + )) .bind(crate::WORKER_SCHEMA_VERSION) .execute(pool) .await?; diff --git a/tests/e2e/sql/00_setup_playground.sql b/tests/e2e/sql/00_setup_playground.sql index 616b64e..fb89350 100644 --- a/tests/e2e/sql/00_setup_playground.sql +++ b/tests/e2e/sql/00_setup_playground.sql @@ -38,15 +38,19 @@ DECLARE max_attempts INT := p_timeout_secs * 10; -- poll every 100ms table_exists BOOLEAN; is_ready BOOLEAN; + dx_schema TEXT := df.duroxide_schema(); BEGIN LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO is_ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO is_ready; ELSE is_ready := FALSE; END IF; diff --git a/tests/e2e/sql/12_extension_lifecycle.sql b/tests/e2e/sql/12_extension_lifecycle.sql index 7da90a3..d66635b 100644 --- a/tests/e2e/sql/12_extension_lifecycle.sql +++ b/tests/e2e/sql/12_extension_lifecycle.sql @@ -13,25 +13,26 @@ -- Ensure a clean starting point SELECT public._e2e_drop_extension_safe(); --- 1) Verify BGW does not create duroxide schema pre-extension +-- 1) Verify BGW does not create the duroxide provider schema pre-extension. +-- Fresh installs use the '_duroxide' schema (created by CREATE EXTENSION). DO $$ DECLARE exists_before BOOLEAN; exists_after BOOLEAN; BEGIN - SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'duroxide') INTO exists_before; + SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '_duroxide') INTO exists_before; IF exists_before THEN - RAISE EXCEPTION 'TEST FAILED: duroxide schema exists before CREATE EXTENSION'; + RAISE EXCEPTION 'TEST FAILED: _duroxide schema exists before CREATE EXTENSION'; END IF; PERFORM pg_sleep(6); - SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'duroxide') INTO exists_after; + SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '_duroxide') INTO exists_after; IF exists_after THEN - RAISE EXCEPTION 'TEST FAILED: duroxide schema was created before CREATE EXTENSION'; + RAISE EXCEPTION 'TEST FAILED: _duroxide schema was created before CREATE EXTENSION'; END IF; - RAISE NOTICE 'PASSED: BGW did not create duroxide schema pre-extension'; + RAISE NOTICE 'PASSED: BGW did not create _duroxide schema pre-extension'; END $$; -- 2) Create extension and run a simple workflow @@ -78,12 +79,12 @@ DO $$ DECLARE schema_exists BOOLEAN; BEGIN - SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'duroxide') INTO schema_exists; + SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = '_duroxide') INTO schema_exists; IF schema_exists THEN - RAISE EXCEPTION 'TEST FAILED: duroxide schema still exists after DROP EXTENSION'; + RAISE EXCEPTION 'TEST FAILED: _duroxide schema still exists after DROP EXTENSION'; END IF; - RAISE NOTICE 'PASSED: DROP EXTENSION removed duroxide schema'; + RAISE NOTICE 'PASSED: DROP EXTENSION removed _duroxide schema'; END $$; -- 4) Re-create extension and run another workflow diff --git a/tests/e2e/sql/46_connection_limit_startup_validation.sql b/tests/e2e/sql/46_connection_limit_startup_validation.sql index dd00bd6..e0093ad 100644 --- a/tests/e2e/sql/46_connection_limit_startup_validation.sql +++ b/tests/e2e/sql/46_connection_limit_startup_validation.sql @@ -13,16 +13,20 @@ DECLARE ready BOOLEAN; table_exists BOOLEAN; attempts INT := 0; + dx_schema TEXT := df.duroxide_schema(); BEGIN - -- Poll duroxide._worker_ready for up to 15 seconds — the worker should never become ready. + -- Poll ._worker_ready for up to 15 seconds — the worker should never become ready. LOOP SELECT EXISTS( SELECT 1 FROM information_schema.tables - WHERE table_schema = 'duroxide' AND table_name = '_worker_ready' + WHERE table_schema = dx_schema AND table_name = '_worker_ready' ) INTO table_exists; IF table_exists THEN - SELECT EXISTS(SELECT 1 FROM duroxide._worker_ready WHERE schema_version >= 1) INTO ready; + EXECUTE format( + 'SELECT EXISTS(SELECT 1 FROM %I._worker_ready WHERE schema_version >= 1)', + dx_schema + ) INTO ready; ELSE ready := FALSE; END IF;