Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ sqlparser = "0.61.0"
chrono = "0.4.41"
log = "0.4"
datafusion = "53.1.0"
datafusion_pg_catalog = { git = "https://github.com/ybrs/pg_catalog", branch = "main", package = "datafusion_pg_catalog" }
datafusion_pg_catalog = { git = "https://github.com/ybrs/pg_catalog", rev = "4ee6e83b86a3444dac1aa61f07e930d4222b6316", package = "datafusion_pg_catalog" }
# Local dev override (uncomment to build against the in-tree ../pg_catalog):
# datafusion_pg_catalog = { path = "../pg_catalog" }

env_logger = "0.11.8"
Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,34 @@ For example
server.start(catalog_emulation=True)
```

#### Lazy (callback-driven) catalog

The `register_*` calls above snapshot the catalog at startup. For a **live**
source, install a lazy catalog instead: supply one source object and Riffq pulls
catalog metadata from it on every `pg_catalog` scan, so tables created after
startup show up automatically — nothing is cached.

```python
class MyCatalog:
def databases(self, callback):
callback([{"oid": 16384, "name": "appdb"}])
def schemas(self, database, callback):
callback([{"oid": 16385, "name": "public"}])
def relations(self, database, schema, callback):
callback([{"oid": 20001, "reltype_oid": 30001, "name": "users",
"kind": "table", "has_index": False}])
def columns(self, database, schema, relation, callback):
callback([{"name": "id", "type_oid": 23, "nullable": False}]) # 23 = int4

server.set_lazy_catalog(MyCatalog()) # replaces the eager register_* calls
server.start(catalog_emulation=True)
```

You own the OIDs (stable + unique); `type_oid` is a `pg_type` OID. See
[docs/catalog.md](docs/catalog.md) for the full contract and
[`example/lazy_catalog.py`](example/lazy_catalog.py) for a runnable example;
[Teleduck](teleduck/) uses this path against a live DuckDB connection.


---

Expand Down
90 changes: 90 additions & 0 deletions docs/catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,86 @@ for schema_name, table_name in list_tables_from_duckdb():
server.start(catalog_emulation=True)
```

## Lazy (callback-driven) catalog

The `register_*` calls above take a **snapshot at startup**: if your underlying
data changes (a table is created, dropped, or altered), the emulated catalog
goes stale until you re-register.

For a live source, install a **lazy catalog** instead. You supply one source
object and Riffq pulls catalog metadata from it on *every* `pg_catalog` /
`information_schema` scan, so the catalog always reflects the current state —
tables created after startup included. Nothing is cached.

```python
server.set_lazy_catalog(source) # instead of register_database/schema/table
server.start(catalog_emulation=True)
```

The `source` object implements four methods. Each receives a `callback` and
invokes it with a list of row dicts (mirroring the Rust `LazyCatalogSource`
trait one method per catalog level):

```python
class MyCatalog:
def databases(self, callback):
# -> pg_database
callback([{"oid": 16384, "name": "appdb"}]) # "datdba"? optional

def schemas(self, database, callback):
# -> pg_namespace
callback([{"oid": 16385, "name": "public"}]) # "owner_oid"? optional

def relations(self, database, schema, callback):
# -> pg_class (+ pg_type rowtype)
callback([{
"oid": 20001, "reltype_oid": 30001, "name": "users",
"kind": "table", # "table" | "view" | "materialized_view"
# all optional, default off / NULL:
"owner_oid": 10, # -> pg_tables.tableowner (omit if no ownership)
"has_index": True, # -> pg_tables.hasindexes
"has_rules": False, "has_triggers": False, "row_security": False,
}])

def columns(self, database, schema, relation, callback):
# -> pg_attribute (+ information_schema.columns)
callback([
{"name": "id", "type_oid": 23, "nullable": False}, # 23 = int4
{"name": "name", "type_oid": 25, "nullable": True}, # 25 = text
])

server.set_lazy_catalog(MyCatalog())
server.start(catalog_emulation=True)
```

### Rules of the contract

- **You own the OIDs.** Riffq writes them through verbatim. They must be stable
across calls (so catalog joins like `pg_class.oid = pg_attribute.attrelid`
resolve) and unique among your objects. Keep them **above the built-in range**
so they don't collide with built-in rows on OID joins — use `16384`
(PostgreSQL's first normal object id) as a safe base, as the example and
Teleduck do. A common trick is `16384 + stable_hash(name)` (see the example
below).
- **`type_oid` is a `pg_type` OID** you choose, e.g. `23` int4, `20` int8,
`25` text, `16` bool, `701` float8, `1700` numeric, `1082` date, `1114`
timestamp. This keeps Riffq independent of your engine's type system.
- **Merged with built-ins.** Your rows are merged with the built-in system rows
(so `int4`, `pg_class`, etc. still resolve). A user object whose name collides
with a built-in **replaces** it; two of your objects with the same identity
(e.g. two tables of the same name in one schema) is an **error** surfaced to the
client.
- **Errors propagate.** An exception raised in any source method is returned to
the SQL client as an error — it never fails silently.
- **Opt-in & exclusive.** When a lazy source is set, the eager
`register_database`/`register_schema`/`register_table` calls are ignored.
Requires `start(catalog_emulation=True)`.

A complete runnable example backed by an in-memory dict (no external engine) is
in [`example/lazy_catalog.py`](https://github.com/ybrs/riffq/blob/main/example/lazy_catalog.py),
and [Teleduck](https://github.com/ybrs/riffq/tree/main/teleduck) uses this path
against a live DuckDB connection.

## Examples

For a minimal end-to-end example that registers a database, schema, and table and asserts they appear via `pg_catalog`, see:
Expand Down Expand Up @@ -221,5 +301,15 @@ RiffqServer.register_table(database_name: str,
schema_name: str, table_name: str, columns: list[dict]) -> None:
```

### Install a lazy (callback-driven) catalog source.

Pull catalog metadata from `source` on every scan instead of snapshotting it.
See [Lazy (callback-driven) catalog](#lazy-callback-driven-catalog) for the
source object's contract. Mutually exclusive with the eager `register_*` calls.

```python
RiffqServer.set_lazy_catalog(source) -> None:
```


[pg_catalog_rs]: https://github.com/ybrs/pg_catalog
143 changes: 143 additions & 0 deletions example/lazy_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Lazy (callback-driven) catalog example.

Riffq can answer PostgreSQL catalog queries (``pg_catalog`` / ``information_schema``)
from a *source object* that is consulted on every scan, instead of a snapshot
registered up front. This example backs that source with a plain in-memory dict
-- no external engine -- so you can see the whole contract in one file.

Run it::

python example/lazy_catalog.py

then connect with psql and watch the catalog stay in sync as you create tables::

psql -h 127.0.0.1 -p 5444 -U user -d appdb

appdb=> SELECT relname FROM pg_class WHERE relname='orders'; -- 0 rows
appdb=> CREATE TABLE orders(id INT, total INT); -- handled below
appdb=> SELECT relname FROM pg_class WHERE relname='orders'; -- now 1 row
appdb=> \\d orders
"""

import logging

import pyarrow as pa
import riffq

logging.basicConfig(level=logging.INFO)

# The "engine": a live, in-memory catalog. In a real app this would be DuckDB,
# PostgreSQL, a config file, a remote service -- anything. The lazy source below
# reads whatever is here *at query time*, so mutating it is reflected instantly.
CATALOG = {
"appdb": {
"public": {
# table_name -> list of (column_name, pg_type_oid, nullable)
"users": [("id", 23, False), ("name", 25, True)], # 23=int4, 25=text
}
}
}

# pg_type OIDs the example understands, keyed by a coarse type name.
TYPE_OIDS = {"int": 23, "bigint": 20, "text": 25, "bool": 16, "float": 701}


def stable_oid(salt: str, *parts: str) -> int:
"""Derive a stable, built-in-clear OID from a name.

The same inputs always return the same OID, so ``pg_class.oid`` and
``pg_attribute.attrelid`` agree across scans and catalog joins resolve.
Distinct object classes use distinct salts to avoid collisions, and the
result is kept well above the built-in OID range.
"""
h = 5381
for ch in (salt + "\x00" + "\x00".join(parts)):
h = (h * 33 + ord(ch)) & 0x7FFFFFFF
return 16384 + (h % 2_000_000_000)


class DictCatalogSource:
"""A lazy catalog source over the in-memory ``CATALOG`` dict.

Each method receives a ``callback`` and invokes it with a list of row dicts,
mirroring Riffq's Rust ``LazyCatalogSource`` trait one method per level.
"""

def databases(self, callback):
callback(
[{"oid": stable_oid("db", name), "name": name} for name in CATALOG]
)

def schemas(self, database, callback):
callback(
[
{"oid": stable_oid("ns", database, schema), "name": schema}
for schema in CATALOG.get(database, {})
]
)

def relations(self, database, schema, callback):
tables = CATALOG.get(database, {}).get(schema, {})
callback(
[
{
"oid": stable_oid("rel", database, schema, name),
"reltype_oid": stable_oid("type", database, schema, name),
"name": name,
"kind": "table",
}
for name in tables
]
)

def columns(self, database, schema, relation, callback):
cols = CATALOG.get(database, {}).get(schema, {}).get(relation, [])
callback(
[
{"name": col, "type_oid": type_oid, "nullable": nullable}
for (col, type_oid, nullable) in cols
]
)


class Connection(riffq.BaseConnection):
"""Data path. Catalog queries are served by the lazy source above; everything
else lands here. We implement a toy ``CREATE TABLE`` so you can watch the
catalog update live, and echo a single row for any other query."""

def handle_auth(self, user, password, host, database=None, callback=callable):
# Accept any credentials in this example.
return callback(True)

def handle_query(self, sql, callback=callable, **kwargs):
text = sql.strip().rstrip(";")
low = text.lower()

if low.startswith("create table "):
# "create table public.orders(id int, total int)" (schema optional)
head, _, body = text[len("create table "):].partition("(")
qualified = head.strip()
schema, _, name = qualified.rpartition(".")
schema = schema or "public"
cols = []
for part in body.rstrip(")").split(","):
bits = part.split()
if len(bits) >= 2:
cols.append((bits[0].strip('"'), TYPE_OIDS.get(bits[1].lower(), 25), True))
CATALOG.setdefault("appdb", {}).setdefault(schema, {})[name] = cols

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Hardcoded database name breaks multi-database scenarios.

Line 127 always mutates CATALOG under the key "appdb", regardless of which database the client connected to. If a client connects with -d other_db and issues CREATE TABLE, the table will be incorrectly added to appdb in the catalog structure, breaking catalog queries for other_db.

Extract the database name from the connection context (likely available in **kwargs or via self) and use it instead of the hardcoded "appdb" literal.

🔧 Proposed fix
-            CATALOG.setdefault("appdb", {}).setdefault(schema, {})[name] = cols
+            # Extract database from connection context or default to "appdb"
+            database = kwargs.get("database", "appdb")
+            CATALOG.setdefault(database, {}).setdefault(schema, {})[name] = cols
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CATALOG.setdefault("appdb", {}).setdefault(schema, {})[name] = cols
# Extract database from connection context or default to "appdb"
database = kwargs.get("database", "appdb")
CATALOG.setdefault(database, {}).setdefault(schema, {})[name] = cols
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@example/lazy_catalog.py` at line 127, The assignment always uses the
hardcoded key "appdb" (CATALOG.setdefault("appdb", {}).setdefault(schema,
{})[name] = cols) which breaks multi-database use; change it to derive the
database name from the connection context (e.g., check kwargs for a "database"
or "db" key, or use an instance property like self.database or a connection
helper method) and use that variable as the top-level key (e.g., db_name =
kwargs.get("database") or self.database with a sensible fallback) before calling
CATALOG.setdefault(db_name, ...). Update the code path in the function/method
that contains that line so new tables are registered under the actual connected
database rather than the literal "appdb".

return callback("CREATE TABLE", is_tag=True)

# Any other statement: return a single dummy row so clients are happy.
batch = self.arrow_batch([pa.array([1])], ["?column?"])
return self.send_reader(batch, callback)


def main():
server = riffq.RiffqServer("127.0.0.1:5444", connection_cls=Connection)
# Install the lazy source instead of register_database/schema/table.
server.set_lazy_catalog(DictCatalogSource())
server.start(catalog_emulation=True)


if __name__ == "__main__":
main()
37 changes: 37 additions & 0 deletions pysrc/riffq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,43 @@ def set_tls(self, crt:str, key:str):

self._server.set_tls(crt, key)

def set_lazy_catalog(self, source: Any) -> None:
"""Install a lazy (callback-driven) catalog source for catalog emulation.

Instead of eagerly pre-registering every database/schema/table with
`register_database`/`register_schema`/`register_table`, supply one
``source`` object and the server pulls catalog metadata from it on every
``pg_catalog`` / ``information_schema`` scan — so the catalog always
reflects the source's live state.

``source`` must expose four methods, each of which receives a ``callback``
and invokes it with a list of row dicts:

- ``databases(callback)`` -> ``[{"oid": int, "name": str, "datdba"?: int}]``
- ``schemas(database, callback)`` -> ``[{"oid": int, "name": str, "owner_oid"?: int}]``
- ``relations(database, schema, callback)`` ->
``[{"oid": int, "reltype_oid": int, "name": str,
"kind"?: "table"|"view"|"materialized_view",
"owner_oid"?: int, "has_index"?: bool, "has_rules"?: bool,
"has_triggers"?: bool, "row_security"?: bool}]`` — the optional flags
populate ``pg_tables`` (``tableowner`` via ``owner_oid``, ``hasindexes``,
etc.); omit ``owner_oid`` for backends without ownership (it stays blank)
- ``columns(database, schema, relation, callback)`` ->
``[{"name": str, "type_oid": int, "nullable": bool}]``

OIDs are supplied by the source and written through verbatim; they must be
stable across calls (so catalog joins resolve) and unique among the
source's objects. A duplicate object (same name in the same scope) is an
error; a source object whose name collides with a built-in replaces it.

When a lazy source is set, the eager ``register_*`` registrations are
ignored. Requires ``start(catalog_emulation=True)``.

Args:
source: The lazy catalog source object described above.
"""
self._server.set_lazy_catalog(source)

def register_database(self, database_name: str) -> None:
"""Register a logical database for catalog emulation.

Expand Down
Loading
Loading