Skip to content

Run comms on the R thread#1075

Open
lionel- wants to merge 22 commits intomainfrom
task/sync-comms
Open

Run comms on the R thread#1075
lionel- wants to merge 22 commits intomainfrom
task/sync-comms

Conversation

@lionel-
Copy link
Contributor

@lionel- lionel- commented Mar 2, 2026

Progress towards #689
Progress towards #1074

Comm RPCs (comm_msg) and other comm messages currently run R code concurrently with the R interpreter. This happens because amalthea's handle_comm_msg sends the message to the comm's incoming_tx and returns immediately, freeing Shell to process the next message. Meanwhile, the comm's dedicated thread (e.g. ark-variables, ark-data-viewer) calls R code via r_task() at interrupt time. This is unsafe: R is not reentrant, and complex operations like loadNamespace() can corrupt state if preempted. Furthermore, this prevents strong sequential assertions in integration tests (#1074).

This PR introduces a blocking comm path where comm_msg and comm_close are forwarded from amalthea's Shell to the R thread (via ReadConsole), so comm handlers run synchronously with R. The migration is opt-in per comm type, with a fallback to the existing incoming_tx path for comms not yet migrated.

The Data Explorer is migrated as the first comm to illustrate the nice wins from this approach:

  • No more R reentrancy risk from r_task() at interrupt time
  • No more risk of omitting wrapping of R-related code in r_task(), which allows a much nicer/easier development experience (and easier code reviews).
  • Much simpler implementation (the dedicated thread, select! loop, and RThreadSafe wrapper all go away)
  • 10 data explorers used to spawn 10 comm threads. We now spawn 0 additional threads. In addition to reduced complexity, this will reduce memory usage since each thread allocates 2mb for its stack. This reduction will also apply to plot comms, so could be significant in real sessions.
  • Deterministic update ordering (environment change side effects land within the Busy/Idle window of the request that caused them)
  • The tests become deterministic
  • All the event buffering test infra goes away

Amalthea: CommHandled + ShellHandler trait extension

New handle_comm_msg and handle_comm_close methods on ShellHandler, with default NotHandled return. Amalthea's Shell calls these first and falls back to the existing incoming_tx path on NotHandled. Both methods receive comm_name from amalthea's open_comms lookup so the kernel can decide by comm type. Once all comms are migrated, NotHandled goes away.

Ark: blocking comm infrastructure

CommHandler trait for handlers that run on the R thread. Methods: handle_open, handle_msg, handle_close, handle_environment. All called from within ReadConsole, so R code can be safely invoked.

CommHandlerContext gives handlers access to Console state and to the outgoing channel. It also provides a close_on_exit() mechanism for backend-initiated closes. Console checks is_closed() after each dispatch and handles cleanup.

Console holds a HashMap<String, ConsoleComm> keyed by comm ID. New KernelRequest variants (CommOpen, CommMsg, CommClose) carry a done_tx for synchronous completion: ark's Shell sends the request and blocks until the R thread signals done.

Environment change notifications are dispatched to registered comm handlers after execute results go on IOPub but before the reply unblocks Shell (which sends Idle). This ensures side effects like data explorer updates deterministically arrive within the Busy/Idle window of the execute request that caused them.

Data Explorer migration

RDataExplorer becomes a plain struct implementing CommHandler. The dedicated "ark-data-viewer" thread, execution_thread(), select! loop, r_task() wrappers, and EVENTS.environment_changed listener are all removed.

handle_environment replaces the event listener. When update() detects the binding was removed, it calls ctx.close_on_exit().

Backend-initiated opens (from View() or the variables "View" button) register directly with Console via comm_register() since they're already on the R thread.

Table storage is simplified from RThreadSafe<RObject> to plain RObject since the handler now lives on the R thread exclusively.

@lionel- lionel- requested review from DavisVaughan and jmcphers March 2, 2026 15:53
Copy link
Contributor

@jmcphers jmcphers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review this in detail, but the structure is sound. I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings. But this pattern feels much better for things like the Data Explorer that are primarily interacting with R state.

@lionel-
Copy link
Contributor Author

lionel- commented Mar 3, 2026

I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings.

Absolutely! I've kept a Shell thread on the Ark side as an intermediate between Amalthea Shell and the Ark Console for that reason. The Ark Shell thread will dispatch asynchronous messages to async comm threads. See also related discussion in posit-dev/positron#7447.

This setup will resemble how the DAP currently works, with a Console side running on the R thread and a server side living in its own thread. Both sides share common state via a mutex, and the server side is also able to run R code via idle tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants