Skip to content

Run comms on the R thread#1075

Open
lionel- wants to merge 22 commits intomainfrom
task/sync-comms
Open

Run comms on the R thread#1075
lionel- wants to merge 22 commits intomainfrom
task/sync-comms

Conversation

@lionel-
Copy link
Contributor

@lionel- lionel- commented Mar 2, 2026

Progress towards #689
Progress towards #1074

Comm RPCs (comm_msg) and other comm messages currently run R code concurrently with the R interpreter. This happens because amalthea's handle_comm_msg sends the message to the comm's incoming_tx and returns immediately, freeing Shell to process the next message. Meanwhile, the comm's dedicated thread (e.g. ark-variables, ark-data-viewer) calls R code via r_task() at interrupt time. This is unsafe: R is not reentrant, and complex operations like loadNamespace() can corrupt state if preempted. Furthermore, this prevents strong sequential assertions in integration tests (#1074).

This PR introduces a blocking comm path where comm_msg and comm_close are forwarded from amalthea's Shell to the R thread (via ReadConsole), so comm handlers run synchronously with R. The migration is opt-in per comm type, with a fallback to the existing incoming_tx path for comms not yet migrated.

The Data Explorer is migrated as the first comm to illustrate the nice wins from this approach:

  • No more R reentrancy risk from r_task() at interrupt time
  • No more risk of omitting wrapping of R-related code in r_task(), which allows a much nicer/easier development experience (and easier code reviews).
  • Much simpler implementation (the dedicated thread, select! loop, and RThreadSafe wrapper all go away)
  • 10 data explorers used to spawn 10 comm threads. We now spawn 0 additional threads. In addition to reduced complexity, this will reduce memory usage since each thread allocates 2mb for its stack. This reduction will also apply to plot comms, so could be significant in real sessions.
  • Deterministic update ordering (environment change side effects land within the Busy/Idle window of the request that caused them)
  • The tests become deterministic
  • All the event buffering test infra goes away

Amalthea: CommHandled + ShellHandler trait extension

New handle_comm_msg and handle_comm_close methods on ShellHandler, with default NotHandled return. Amalthea's Shell calls these first and falls back to the existing incoming_tx path on NotHandled. Both methods receive comm_name from amalthea's open_comms lookup so the kernel can decide by comm type. Once all comms are migrated, NotHandled goes away.

Ark: blocking comm infrastructure

CommHandler trait for handlers that run on the R thread. Methods: handle_open, handle_msg, handle_close, handle_environment. All called from within ReadConsole, so R code can be safely invoked.

CommHandlerContext gives handlers access to Console state and to the outgoing channel. It also provides a close_on_exit() mechanism for backend-initiated closes. Console checks is_closed() after each dispatch and handles cleanup.

Console holds a HashMap<String, ConsoleComm> keyed by comm ID. New KernelRequest variants (CommOpen, CommMsg, CommClose) carry a done_tx for synchronous completion: ark's Shell sends the request and blocks until the R thread signals done.

Environment change notifications are dispatched to registered comm handlers after execute results go on IOPub but before the reply unblocks Shell (which sends Idle). This ensures side effects like data explorer updates deterministically arrive within the Busy/Idle window of the execute request that caused them.

Data Explorer migration

RDataExplorer becomes a plain struct implementing CommHandler. The dedicated "ark-data-viewer" thread, execution_thread(), select! loop, r_task() wrappers, and EVENTS.environment_changed listener are all removed.

handle_environment replaces the event listener. When update() detects the binding was removed, it calls ctx.close_on_exit().

Backend-initiated opens (from View() or the variables "View" button) register directly with Console via comm_register() since they're already on the R thread.

Table storage is simplified from RThreadSafe<RObject> to plain RObject since the handler now lives on the R thread exclusively.

@lionel- lionel- requested review from DavisVaughan and jmcphers March 2, 2026 15:53
Copy link
Contributor

@jmcphers jmcphers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review this in detail, but the structure is sound. I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings. But this pattern feels much better for things like the Data Explorer that are primarily interacting with R state.

@lionel-
Copy link
Contributor Author

lionel- commented Mar 3, 2026

I do think it is useful to have some async comms for cases wherein we don't actually need to talk to R at all and don't need to synchronize ourselves with the busy/idle groupings.

Absolutely! I've kept a Shell thread on the Ark side as an intermediate between Amalthea Shell and the Ark Console for that reason. The Ark Shell thread will dispatch asynchronous messages to async comm threads. See also related discussion in posit-dev/positron#7447.

This setup will resemble how the DAP currently works, with a Console side running on the R thread and a server side living in its own thread. Both sides share common state via a mutex, and the server side is also able to run R code via idle tasks.

async fn handle_comm_open(&self, target: Comm, comm: CommSocket) -> crate::Result<bool>;

/// Handle an incoming comm message (RPC or data) synchronously on the
/// kernel's main thread. Return `CommHandled::Handled` if the message was
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want to be talking about "kernel main thread" in amalthea docs?

Comment on lines +487 to 496
// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
CommHandled::Handled => return Ok(()),
CommHandled::NotHandled => {},
}

// Fall back to old approach for compatibility while we migrate comms
log::trace!("Sending message to comm '{}'", comm.comm_name);
comm.incoming_tx.send(comm_msg).log_err();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
CommHandled::Handled => return Ok(()),
CommHandled::NotHandled => {},
}
// Fall back to old approach for compatibility while we migrate comms
log::trace!("Sending message to comm '{}'", comm.comm_name);
comm.incoming_tx.send(comm_msg).log_err();
// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
CommHandled::Handled => Ok(()),
CommHandled::NotHandled => {
// Fall back to old approach for compatibility while we migrate comms
log::trace!("Sending message to comm '{}'", comm.comm_name);
comm.incoming_tx.send(comm_msg).log_err();
Ok(())
},
}

idk, seems like a clearer use of match?

Comment on lines 219 to +220
/// Channel used to send along messages relayed on the open comms.
comm_event_tx: Sender<CommEvent>,
pub(crate) comm_event_tx: Sender<CommEvent>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought get_comm_event_tx() that doesn't expose this as pub was a nice abstraction :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd still need a pub crate if you call it from other files

}
// Safety: `Table` is only accessed on the R thread (or in R idle tasks,
// which also run on the R thread).
unsafe impl Send for Table {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very uneasy about this

We still ship a Table around via an r_task::spawn_idle() (as you mentioned).

I know that we:

  • Send the task from the main R thread
  • Pick the task up and run it on the main R thread

But I am still extremely nervous about providing anything outside of RThreadSafe that can send across threads. I just don't trust us to get it right every time.

I think I would prefer to keep this wrapped in RThreadSafe, because to me that is The Way to ship across threads, even if you end up running from the main R thread on both sides.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also like to note that this problem would very likely go away entirely if we had a variant of r_task::spawn_idle() that did not require a Send bound on T.

I believe we are stuck with that as long as we are using crossbeam channels, but everything happening here is all on one thread! The main R thread!

All we really want is to queue up a task within the same thread so that read_console can run it at the next idle iteration. Something like r_task::enque_idle() maybe, just spitballing.

That shouldn't require a crossbeam channel ideally (although compatibility with a crossbeam select! would make it challenging probably). We maybe just need some VecDeque to push and pop from.

Then you should be able to ship a closure around, even if it has an RObject inside, without a Send bound.

But until then, I still like RThreadSafe

Comment on lines +194 to 195
fn update(&mut self, ctx: &CommHandlerContext) -> anyhow::Result<bool> {
// No need to check for updates if we have no binding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random thought. Would it be nice to have some kind of assert_r_thread!() macro we could put at the top of functions like this? Panic in debug mode and no-op in release mode? It would be self documenting and would help us with our invariants.

Comment on lines +72 to +80
/// Register a new comm handler on the R thread (frontend-initiated comms).
/// Uses a factory closure so the handler (which may hold `RObject`s) is
/// created on the R thread rather than sent across threads.
CommOpen {
comm_id: String,
comm_name: String,
factory: CommHandlerFactory,
ctx: CommHandlerContext,
done_tx: Sender<()>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what didn't look used to me, and the factory stuff just felt confusing if we don't have a use for it...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if you think you'll use it for Variables, I'd be interested in delaying the addition of this to that PR so we can see / justify that we really do need this weird factory thing

msg: CommMsg,
) -> amalthea::Result<CommHandled> {
match comm_name {
"positron.dataExplorer" => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have used the string "positron.dataExplorer" more times than I am comfortable with at this point.

Can we abstract this into a named static string that lives somewhere and reference it by name instead? A simple typo could screw it all up!

Comment on lines 41 to +42
// Data explorer should NOT have received any events
frontend.assert_no_data_explorer_events();
frontend.assert_iopub_empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad comment

Comment on lines +88 to +95
if reg.ctx.is_closed() {
closed_ids.push(comm_id.clone());
}
}
for comm_id in closed_ids {
if let Some(reg) = self.comms.remove(&comm_id) {
self.comm_notify_closed(&comm_id, &reg);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole is_closed() thing feels a bit wrong to me.

It feels like after any kind of generic message we have to check if the backend decided to close the comm? Like we do this here and in comm_handle_msg.

Should something else be handling this in a more consistent manner?

Otherwise it feels like if we add any other comm_notify_*() helper to this, then we are going to also need to check is_closed there too, and that feels so easy to forget

Comment on lines +81 to +86
pub enum EnvironmentChanged {
/// A top-level execution completed (user code, debug eval, etc.).
Execution,
/// The user selected a different frame in the call stack during debugging.
FrameSelected,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like this framing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants