On shutdown handler#111
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Server.handle_shutdown (Rust + Python bindings), replaces Python signal handlers in Teleduck with a shutdown callback that checkpoints DuckDB, introduces shared test helpers (stop_server, wait_for_catalog, ensure_started), migrates tests to those helpers, and adjusts test-concurrency ports and Makefile/test runner. ChangesServer shutdown callback API and Teleduck integration
Test readiness utilities and suite migration
Build configuration and dependency updates
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning Review ran into problems🔥 ProblemsGit: Failed to clone repository. Please run the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
Match the handle_ naming used by the other client handlers (handle_auth/handle_query/handle_connect/handle_disconnect). Renames the Server and RiffqServer method, the internal callback field, the test file and its cases, and the docs. Also drops the redundant "zero-argument callback" wording from the docstrings. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
Makefile (1)
4-4:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winInconsistent Python interpreter.
The
testtarget usespythonwhile theall-teststarget usespython3. For consistency, both targets should use the same interpreter.🔧 Proposed fix
- python -m unittest discover -s tests + python3 -m unittest discover -s tests🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Makefile` at line 4, The Makefile uses inconsistent interpreters: the test target runs "python -m unittest discover -s tests" while the all-tests target uses "python3"; update the test target to use the same interpreter as all-tests by replacing "python -m unittest discover -s tests" with "python3 -m unittest discover -s tests" (ensure the test target name "test" and the other target "all-tests" remain unchanged).src/lib.rs (1)
1894-1909:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftDrain active connection tasks before invoking
on_shutdown.
server_task.abort()only stops the accept loop. Every already-spawnedprocess_socket(...)task keeps running until the Tokio runtime itself is dropped, butrun_shutdown_callback(&shutdown_cb)runs before that. In this PR, Teleduck uses the callback toCHECKPOINTand close the shared DuckDB connection, so an in-flight query can race with shutdown and fail or close the database while it is still in use. Track and await the per-connectionJoinHandles (and await the aborted accept task) before running the callback.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib.rs` around lines 1894 - 1909, The accept loop currently tokio::spawn's per-connection tasks (calling process_socket) but never tracks their JoinHandle, so server_task.abort() only stops accepts while in-flight connections keep running and run_shutdown_callback(&shutdown_cb) may run while those tasks still use shared resources; modify the spawn logic around process_socket(...) to collect each JoinHandle (e.g., push into a shared Vec<JoinHandle<...>> or a FuturesUnordered protected by an Arc<Mutex<_>> or a tokio::sync::watch/Notify) so you can await them on shutdown; after wait_for_shutdown_signal().await, abort and await server_task (await its JoinHandle), then concurrently await/join all collected per-connection JoinHandle(s) (or drain the FuturesUnordered) before calling run_shutdown_callback(&shutdown_cb) to ensure all process_socket tasks finish or are awaited prior to running the shutdown callback.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@teleduck/src/teleduck/server.py`:
- Around line 178-191: The _checkpoint_and_close() hook currently calls
duckdb_con.close() inside the same try as duckdb_con.execute("CHECKPOINT"), so
if CHECKPOINT fails (e.g., when run_server(..., read_only=True) registers the
hook) the connection never closes; update _checkpoint_and_close to only attempt
the CHECKPOINT when writes are enabled (check the read_only flag or equivalent)
and always close duckdb_con in a finally block, ensuring shutdown_state["done"]
semantics remain, using the existing symbols _checkpoint_and_close,
shutdown_state, duckdb_con, and respecting run_server's read_only path so
server.start() does not return with the DB still open.
In `@teleduck/tests/server_readiness.py`:
- Around line 34-37: The code currently swallows psycopg.OperationalError;
change the retry logic around psycopg.connect to capture the caught exception
(use "except psycopg.OperationalError as last_exc") and store it, and when
retries exhaust raise the final RuntimeError chained from that last exception
(raise RuntimeError("...") from last_exc). Alternatively, if you prefer stricter
suppression, only catch the specific transient psycopg errors you expect
(inspect psycopg.errors.* names) instead of catching all OperationalError. Apply
the same change to the other identical blocks that currently catch
psycopg.OperationalError (the blocks around the psycopg.connect calls at the
other noted locations).
In `@teleduck/tests/test_duckdb_catalog.py`:
- Around line 53-62: In setUpClass(), replace the raw process cleanup (calls to
proc.terminate() and proc.join()) used on the startup-timeout path with the
existing stop_server(cls.proc) helper so the child process and port are cleaned
up consistently; update both the timeout branch after wait_for_catalog and the
similar branch around lines 65-66 to call stop_server(cls.proc) instead of
terminate()/join(), referencing setUpClass, cls.proc, wait_for_catalog, and
stop_server.
In `@teleduck/tests/test_sql_init.py`:
- Around line 55-72: The failed-startup branch currently calls
proc.terminate()/proc.join() directly instead of using the shared stop_server
helper; update that branch to call stop_server(cls.proc) (the same helper used
in tearDownClass) so process termination, cleanup and DuckDB shutdown handling
are consistent—locate the "Server did not start" branch where terminate() and
join() are invoked and replace those calls with stop_server(cls.proc).
In `@tests/test_multiple_databases.py`:
- Around line 57-60: Replace the fixed time.sleep(10) in
tests/test_multiple_databases.py with a deterministic readiness poll: loop with
a short interval up to a reasonable timeout that attempts a real readiness check
(e.g. TCP connect to the server port or run the catalog-readiness query used by
the app) and break when it succeeds; raise/assert failure if timeout elapses.
Use exponential/backoff or fixed small sleeps between retries and keep the
uniqueness tied to the existing call site (replace time.sleep(10) in that test),
so the test only proceeds once the catalog is actually ready.
---
Outside diff comments:
In `@Makefile`:
- Line 4: The Makefile uses inconsistent interpreters: the test target runs
"python -m unittest discover -s tests" while the all-tests target uses
"python3"; update the test target to use the same interpreter as all-tests by
replacing "python -m unittest discover -s tests" with "python3 -m unittest
discover -s tests" (ensure the test target name "test" and the other target
"all-tests" remain unchanged).
In `@src/lib.rs`:
- Around line 1894-1909: The accept loop currently tokio::spawn's per-connection
tasks (calling process_socket) but never tracks their JoinHandle, so
server_task.abort() only stops accepts while in-flight connections keep running
and run_shutdown_callback(&shutdown_cb) may run while those tasks still use
shared resources; modify the spawn logic around process_socket(...) to collect
each JoinHandle (e.g., push into a shared Vec<JoinHandle<...>> or a
FuturesUnordered protected by an Arc<Mutex<_>> or a tokio::sync::watch/Notify)
so you can await them on shutdown; after wait_for_shutdown_signal().await, abort
and await server_task (await its JoinHandle), then concurrently await/join all
collected per-connection JoinHandle(s) (or drain the FuturesUnordered) before
calling run_shutdown_callback(&shutdown_cb) to ensure all process_socket tasks
finish or are awaited prior to running the shutdown callback.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b1288564-7923-4bf5-b7a9-cc7dae955c10
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (25)
Makefiledocs/changelog.mddocs/getting-started.mdpysrc/riffq/connection.pyrequirements.txtsrc/lib.rsteleduck/src/teleduck/server.pyteleduck/tests/server_readiness.pyteleduck/tests/test_checkpoint_on_shutdown.pyteleduck/tests/test_duckdb_catalog.pyteleduck/tests/test_sql_init.pytests/helpers.pytests/test_connection_id.pytests/test_multiple_databases.pytests/test_on_authentication.pytests/test_on_authentication_custom_error.pytests/test_on_connect.pytests/test_on_connect_custom_error.pytests/test_on_disconnect.pytests/test_on_shutdown.pytests/test_pg_catalog_query.pytests/test_query_error.pytests/test_register_catalog.pytests/test_server.pytests/test_server_tls.py
💤 Files with no reviewable changes (12)
- tests/test_on_connect.py
- tests/test_pg_catalog_query.py
- tests/test_on_connect_custom_error.py
- tests/test_on_disconnect.py
- tests/test_connection_id.py
- tests/test_server.py
- tests/test_server_tls.py
- tests/test_register_catalog.py
- tests/test_on_authentication.py
- tests/test_query_error.py
- tests/helpers.py
- tests/test_on_authentication_custom_error.py
| try: | ||
| return psycopg.connect(dsn, connect_timeout=2) | ||
| except psycopg.OperationalError: | ||
| return None |
There was a problem hiding this comment.
Preserve the real psycopg failure.
OperationalError covers more than “server still starting”, and psycopg.errors.Error also includes permanent probe bugs like invalid SQL. Retrying both until a generic timeout makes broken tests look like slow startup. Capture the last exception and chain it from the final RuntimeError, or only suppress the specific transient states you expect here.
Also applies to: 46-49, 80-83
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@teleduck/tests/server_readiness.py` around lines 34 - 37, The code currently
swallows psycopg.OperationalError; change the retry logic around psycopg.connect
to capture the caught exception (use "except psycopg.OperationalError as
last_exc") and store it, and when retries exhaust raise the final RuntimeError
chained from that last exception (raise RuntimeError("...") from last_exc).
Alternatively, if you prefer stricter suppression, only catch the specific
transient psycopg errors you expect (inspect psycopg.errors.* names) instead of
catching all OperationalError. Apply the same change to the other identical
blocks that currently catch psycopg.OperationalError (the blocks around the
psycopg.connect calls at the other noted locations).
There was a problem hiding this comment.
♻️ Duplicate comments (1)
teleduck/src/teleduck/server.py (1)
186-191:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winClose DuckDB in a
finallyand skip CHECKPOINT in read-only mode.When
run_server(..., read_only=True)registers this hook,CHECKPOINTraises and theexceptswallows it, soduckdb_con.close()never runs and the handle leaks. Guard the checkpoint behindread_onlyand always close infinally.Proposed fix
shutdown_state["done"] = True try: - duckdb_con.execute("CHECKPOINT") - duckdb_con.close() - logging.info("Database checkpointed and closed.") + if not read_only: + duckdb_con.execute("CHECKPOINT") + logging.info("Database checkpointed.") except Exception as exc: logging.error("Checkpoint failed: %s", exc) + finally: + duckdb_con.close() + logging.info("Database closed.")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@teleduck/src/teleduck/server.py` around lines 186 - 191, The shutdown hook currently calls duckdb_con.execute("CHECKPOINT") and then duckdb_con.close() inside try/except which swallows CHECKPOINT errors and can leak the duckdb_con when run_server(..., read_only=True) is used; change the logic in the shutdown/cleanup block (the code referencing duckdb_con and CHECKPOINT in server.py) so that you only run CHECKPOINT when read_only is False, and always call duckdb_con.close() in a finally block (or after the try/except) to ensure the connection is closed even if CHECKPOINT or other errors occur.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@teleduck/src/teleduck/server.py`:
- Around line 186-191: The shutdown hook currently calls
duckdb_con.execute("CHECKPOINT") and then duckdb_con.close() inside try/except
which swallows CHECKPOINT errors and can leak the duckdb_con when
run_server(..., read_only=True) is used; change the logic in the
shutdown/cleanup block (the code referencing duckdb_con and CHECKPOINT in
server.py) so that you only run CHECKPOINT when read_only is False, and always
call duckdb_con.close() in a finally block (or after the try/except) to ensure
the connection is closed even if CHECKPOINT or other errors occur.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 71f2a9be-1eba-41cd-84e6-71255e4c0a34
📒 Files selected for processing (7)
docs/changelog.mddocs/getting-started.mdpysrc/riffq/connection.pysrc/lib.rsteleduck/src/teleduck/server.pyteleduck/tests/test_checkpoint_on_shutdown.pytests/test_handle_shutdown.py
✅ Files skipped from review due to trivial changes (1)
- docs/changelog.md
🚧 Files skipped from review as they are similar to previous changes (3)
- docs/getting-started.md
- src/lib.rs
- teleduck/tests/test_checkpoint_on_shutdown.py
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
teleduck/tests/test_duckdb_catalog.py (1)
27-28:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winLeaked file descriptor from
mkstemp.
tempfile.mkstempreturns an open fd that is never closed here;Path(cls.db_file).unlink()only removes the directory entry, leaving the descriptor open on the deleted inode for the lifetime of the test class.test_sql_init.pycloses it correctly viaos.close(fd)(line 28) — mirror that here.🛠️ Proposed fix
- fd, cls.db_file = tempfile.mkstemp(suffix=".db") - Path(cls.db_file).unlink() # remove so DuckDB can create it + fd, cls.db_file = tempfile.mkstemp(suffix=".db") + os.close(fd) + Path(cls.db_file).unlink() # remove so DuckDB can create itAdd
import osif not already present.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@teleduck/tests/test_duckdb_catalog.py` around lines 27 - 28, The tempfile.mkstemp call in the test leaves an open file descriptor (fd) when assigning cls.db_file; after calling tempfile.mkstemp(suffix=".db") you must close the returned fd (call os.close(fd)) before unlinking the path to avoid leaking the descriptor — update the setup in test_duckdb_catalog.py (the mkstemp/Path(cls.db_file).unlink() sequence) to close fd first and add an import os at top if not already present.
♻️ Duplicate comments (1)
pysrc/riffq/testing.py (1)
39-61:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winBroad exception suppression hides permanent test failures.
_connectswallows everypsycopg.OperationalErrorand_probe_valueswallows everypsycopg.errors.Error. Both cover far more than "server still starting" (e.g. auth failures, invalid probe SQL). With those retried until the generic timeout, a genuinely broken probe or credential mistake looks like slow startup and surfaces only as a 30scatalog not readyerror with no root cause. Consider capturing the last exception and chaining it into the finalRuntimeError, or narrowing the suppressed states to the transient ones you actually expect.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pysrc/riffq/testing.py` around lines 39 - 61, Both _connect and _probe_value currently catch broad psycopg exceptions and hide permanent failures; update _connect (the function returning psycopg.connect) to only suppress transient connection errors or, if you keep catching OperationalError, store the caught exception and re-raise or chain it into the final RuntimeError on timeout so the root cause is preserved, and update _probe_value to either narrow the except to specific transient errors (e.g., query-not-ready errors) or capture the last psycopg.errors.Error and include it when turning probe-timeouts into a RuntimeError; reference the functions _connect and _probe_value and ensure the last exception is attached (exception chaining) to any final error you raise so credential/SQL errors are visible.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@teleduck/tests/test_duckdb_catalog.py`:
- Around line 27-28: The tempfile.mkstemp call in the test leaves an open file
descriptor (fd) when assigning cls.db_file; after calling
tempfile.mkstemp(suffix=".db") you must close the returned fd (call
os.close(fd)) before unlinking the path to avoid leaking the descriptor — update
the setup in test_duckdb_catalog.py (the mkstemp/Path(cls.db_file).unlink()
sequence) to close fd first and add an import os at top if not already present.
---
Duplicate comments:
In `@pysrc/riffq/testing.py`:
- Around line 39-61: Both _connect and _probe_value currently catch broad
psycopg exceptions and hide permanent failures; update _connect (the function
returning psycopg.connect) to only suppress transient connection errors or, if
you keep catching OperationalError, store the caught exception and re-raise or
chain it into the final RuntimeError on timeout so the root cause is preserved,
and update _probe_value to either narrow the except to specific transient errors
(e.g., query-not-ready errors) or capture the last psycopg.errors.Error and
include it when turning probe-timeouts into a RuntimeError; reference the
functions _connect and _probe_value and ensure the last exception is attached
(exception chaining) to any final error you raise so credential/SQL errors are
visible.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9307c192-aa2c-40e1-865e-de1899f06643
📒 Files selected for processing (23)
pysrc/riffq/testing.pyrequirements.txtteleduck/src/teleduck.egg-info/SOURCES.txtteleduck/tests/server_readiness.pyteleduck/tests/test_duckdb_catalog.pyteleduck/tests/test_sql_init.pytest_concurrency/test_concurrency_duckdb.pytest_concurrency/test_concurrency_duckdb_arrow.pytest_concurrency/test_concurrency_polars.pytest_concurrency/utils.pytests/helpers.pytests/test_connection_id.pytests/test_multiple_databases.pytests/test_on_authentication.pytests/test_on_authentication_custom_error.pytests/test_on_connect.pytests/test_on_connect_custom_error.pytests/test_on_disconnect.pytests/test_pg_catalog_query.pytests/test_query_error.pytests/test_register_catalog.pytests/test_server.pytests/test_server_tls.py
💤 Files with no reviewable changes (1)
- requirements.txt
✅ Files skipped from review due to trivial changes (2)
- test_concurrency/utils.py
- teleduck/src/teleduck.egg-info/SOURCES.txt
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/test_query_error.py
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lib.rs (1)
1907-1911:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftShutdown callback can race active client tasks.
After
server_task.abort(), previously spawnedprocess_sockettasks keep running, butrun_shutdown_callbackexecutes immediately. That can run checkpoint/flush logic while queries are still in flight.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/lib.rs` around lines 1907 - 1911, The shutdown callback can run while per-connection tasks (spawned in process_socket) are still active; change the shutdown path so that after wait_for_shutdown_signal().await and server_task.abort() you first wait for all active connection tasks to finish (or at least reach a safe checkpoint) before calling run_shutdown_callback. Concretely, stop spawning process_socket fire-and-forget: collect their JoinHandles (or use a tokio::task::JoinSet / FuturesUnordered) when you spawn them, expose a way to signal shutdown to those tasks (reuse wait_for_shutdown_signal or a broadcast) and then await their completion (with an optional timeout) in the same scope where you currently call server_task.abort() — only call run_shutdown_callback(&shutdown_cb) after those handles have been awaited/cleaned up.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/lib.rs`:
- Around line 1907-1911: The shutdown callback can run while per-connection
tasks (spawned in process_socket) are still active; change the shutdown path so
that after wait_for_shutdown_signal().await and server_task.abort() you first
wait for all active connection tasks to finish (or at least reach a safe
checkpoint) before calling run_shutdown_callback. Concretely, stop spawning
process_socket fire-and-forget: collect their JoinHandles (or use a
tokio::task::JoinSet / FuturesUnordered) when you spawn them, expose a way to
signal shutdown to those tasks (reuse wait_for_shutdown_signal or a broadcast)
and then await their completion (with an optional timeout) in the same scope
where you currently call server_task.abort() — only call
run_shutdown_callback(&shutdown_cb) after those handles have been
awaited/cleaned up.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f63c3e41-eb80-45ce-9ac1-c4f1937d3b03
📒 Files selected for processing (8)
src/lib.rstest_concurrency/server_duckdb.pytest_concurrency/server_duckdb_arrow.pytest_concurrency/server_polars.pytest_concurrency/test_concurrency_duckdb.pytest_concurrency/test_concurrency_duckdb_arrow.pytest_concurrency/test_concurrency_polars.pytest_concurrency/utils.py
Summary by CodeRabbit
New Features
Documentation
Tests
Chores