Conversation
jmcphers
left a comment
There was a problem hiding this comment.
I didn't review this in detail, but the structure is sound. I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings. But this pattern feels much better for things like the Data Explorer that are primarily interacting with R state.
Absolutely! I've kept a Shell thread on the Ark side as an intermediate between Amalthea Shell and the Ark Console for that reason. The Ark Shell thread will dispatch asynchronous messages to async comm threads. See also related discussion in posit-dev/positron#7447. This setup will resemble how the DAP currently works, with a Console side running on the R thread and a server side living in its own thread. Both sides share common state via a mutex, and the server side is also able to run R code via idle tasks. |
Progress towards #689
Progress towards #1074
Comm RPCs (
comm_msg) and other comm messages currently run R code concurrently with the R interpreter. This happens because amalthea'shandle_comm_msgsends the message to the comm'sincoming_txand returns immediately, freeing Shell to process the next message. Meanwhile, the comm's dedicated thread (e.g. ark-variables, ark-data-viewer) calls R code viar_task()at interrupt time. This is unsafe: R is not reentrant, and complex operations likeloadNamespace()can corrupt state if preempted. Furthermore, this prevents strong sequential assertions in integration tests (#1074).This PR introduces a blocking comm path where
comm_msgandcomm_closeare forwarded from amalthea's Shell to the R thread (viaReadConsole), so comm handlers run synchronously with R. The migration is opt-in per comm type, with a fallback to the existingincoming_txpath for comms not yet migrated.The Data Explorer is migrated as the first comm to illustrate the nice wins from this approach:
r_task()at interrupt timer_task(), which allows a much nicer/easier development experience (and easier code reviews).select!loop, andRThreadSafewrapper all go away)Amalthea:
CommHandled+ShellHandlertrait extensionNew
handle_comm_msgandhandle_comm_closemethods onShellHandler, with defaultNotHandledreturn. Amalthea's Shell calls these first and falls back to the existingincoming_txpath onNotHandled. Both methods receivecomm_namefrom amalthea'sopen_commslookup so the kernel can decide by comm type. Once all comms are migrated,NotHandledgoes away.Ark: blocking comm infrastructure
CommHandlertrait for handlers that run on the R thread. Methods:handle_open,handle_msg,handle_close,handle_environment. All called from withinReadConsole, so R code can be safely invoked.CommHandlerContextgives handlers access to Console state and to the outgoing channel. It also provides aclose_on_exit()mechanism for backend-initiated closes. Console checksis_closed()after each dispatch and handles cleanup.Console holds a
HashMap<String, ConsoleComm>keyed by comm ID. NewKernelRequestvariants (CommOpen,CommMsg,CommClose) carry adone_txfor synchronous completion: ark's Shell sends the request and blocks until the R thread signals done.Environment change notifications are dispatched to registered comm handlers after execute results go on IOPub but before the reply unblocks Shell (which sends Idle). This ensures side effects like data explorer updates deterministically arrive within the Busy/Idle window of the execute request that caused them.
Data Explorer migration
RDataExplorerbecomes a plain struct implementingCommHandler. The dedicated "ark-data-viewer" thread,execution_thread(),select!loop,r_task()wrappers, andEVENTS.environment_changedlistener are all removed.handle_environmentreplaces the event listener. Whenupdate()detects the binding was removed, it callsctx.close_on_exit().Backend-initiated opens (from
View()or the variables "View" button) register directly with Console viacomm_register()since they're already on the R thread.Tablestorage is simplified fromRThreadSafe<RObject>to plainRObjectsince the handler now lives on the R thread exclusively.