Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/amalthea/src/comm/comm_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ pub enum Comm {
Other(String),
}

/// Messages exchanged on a comm channel.
///
/// The `Open` and `Close` variants are lifecycle signals rather than wire
/// content. They exist because the legacy comm path routes everything through
/// a single `incoming_tx`/`outgoing_tx` channel pair. Once all comms are
/// migrated to the blocking `CommHandler` path, these variants can be removed
/// and `CommMsg` can be reduced to just `Rpc` and `Data`. `Open` is a
/// special case: backend-initiated comms currently send it through
/// `outgoing_tx` to trigger the `comm_open` IOPub message. Removing it
/// would require a dedicated channel or method for that notification.
#[derive(Clone, Debug)]
pub enum CommMsg {
/// A message indicating that the comm channel is being opened.
Expand Down
44 changes: 43 additions & 1 deletion crates/amalthea/src/language/shell_handler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/*
* shell_handler.rs
*
* Copyright (C) 2022 Posit Software, PBC. All rights reserved.
* Copyright (C) 2022-2026 Posit Software, PBC. All rights reserved.
*
*/

use async_trait::async_trait;

use crate::comm::comm_channel::Comm;
use crate::comm::comm_channel::CommMsg;
use crate::socket::comm::CommSocket;
use crate::wire::complete_reply::CompleteReply;
use crate::wire::complete_request::CompleteRequest;
Expand All @@ -21,6 +22,16 @@ use crate::wire::kernel_info_reply::KernelInfoReply;
use crate::wire::kernel_info_request::KernelInfoRequest;
use crate::wire::originator::Originator;

/// Result of a `handle_comm_msg` or `handle_comm_close` call on the
/// `ShellHandler`. `Handled` means the kernel processed the message
/// synchronously (blocking Shell until done). `NotHandled` means amalthea
/// should fall back to the historical `incoming_tx` path. This fallback is
/// temporary until all comms are migrated to the blocking path.
pub enum CommHandled {
Handled,
NotHandled,
}

#[async_trait]
pub trait ShellHandler: Send {
/// Handles a request for information about the kernel.
Expand Down Expand Up @@ -71,4 +82,35 @@ pub trait ShellHandler: Send {
/// * `target` - The target name of the comm, such as `positron.variables`
/// * `comm` - The comm channel to use to communicate with the frontend
async fn handle_comm_open(&self, target: Comm, comm: CommSocket) -> crate::Result<bool>;

/// Handle an incoming comm message (RPC or data) synchronously on the
/// kernel's main thread. Return `CommHandled::Handled` if the message was
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really want to be talking about "kernel main thread" in amalthea docs?

/// processed, or `CommHandled::NotHandled` to fall back to the existing
/// async `incoming_tx` path.
///
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name (e.g. `"positron.dataExplorer"`)
/// * `msg` - The parsed `CommMsg`
fn handle_comm_msg(
&mut self,
_comm_id: &str,
_comm_name: &str,
_msg: CommMsg,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
}

/// Handle a comm close synchronously on the kernel's main thread.
/// Return `CommHandled::Handled` if the close was processed, or
/// `CommHandled::NotHandled` to fall back to the historical path.
///
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name
fn handle_comm_close(
&mut self,
_comm_id: &str,
_comm_name: &str,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
}
}
2 changes: 1 addition & 1 deletion crates/amalthea/src/socket/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::socket::iopub::IOPubMessage;
/// This wrapper ensures comm messages go through the same channel as other
/// IOPub messages (like `ExecuteResult`), providing deterministic message
/// ordering when emitted from the same thread.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct CommOutgoingTx {
comm_id: String,
iopub_tx: Sender<IOPubMessage>,
Expand Down
31 changes: 24 additions & 7 deletions crates/amalthea/src/socket/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::comm::server_comm::ServerComm;
use crate::comm::server_comm::ServerStartedMessage;
use crate::error::Error;
use crate::language::server_handler::ServerHandler;
use crate::language::shell_handler::CommHandled;
use crate::language::shell_handler::ShellHandler;
use crate::socket::comm::CommInitiator;
use crate::socket::comm::CommSocket;
Expand Down Expand Up @@ -289,13 +290,13 @@ impl Shell {
let open_comms = &self.open_comms;
let header = req.header.clone();
Self::handle_notification(iopub_tx, req, |msg| {
Self::handle_comm_msg(open_comms, header, msg)
Self::handle_comm_msg(shell_handler, open_comms, header, msg)
})
},
Message::CommClose(req) => {
let open_comms = &mut self.open_comms;
Self::handle_notification(iopub_tx, req, |msg| {
Self::handle_comm_close(open_comms, msg)
Self::handle_comm_close(shell_handler, open_comms, msg)
})
},
Message::InspectRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| {
Expand Down Expand Up @@ -452,6 +453,7 @@ impl Shell {
/// request from the frontend to deliver a message to a backend, often as
/// the request side of a request/response pair.
fn handle_comm_msg(
shell_handler: &mut Box<dyn ShellHandler>,
open_comms: &[CommSocket],
header: JupyterHeader,
msg: &CommWireMsg,
Expand All @@ -474,7 +476,6 @@ impl Shell {
CommMsg::Data(msg.data.clone())
};

// Send the message to the comm
let Some(comm) = open_comms.iter().find(|c| c.comm_id == msg.comm_id) else {
log::warn!(
"Received message for unknown comm channel {}: {comm_msg:?}",
Expand All @@ -483,6 +484,13 @@ impl Shell {
return Ok(());
};

// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
CommHandled::Handled => return Ok(()),
CommHandled::NotHandled => {},
}

// Fall back to old approach for compatibility while we migrate comms
log::trace!("Sending message to comm '{}'", comm.comm_name);
comm.incoming_tx.send(comm_msg).log_err();

Comment on lines +487 to 496
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
CommHandled::Handled => return Ok(()),
CommHandled::NotHandled => {},
}
// Fall back to old approach for compatibility while we migrate comms
log::trace!("Sending message to comm '{}'", comm.comm_name);
comm.incoming_tx.send(comm_msg).log_err();
// Try to dispatch the message to the new handler API
match shell_handler.handle_comm_msg(&msg.comm_id, &comm.comm_name, comm_msg.clone())? {
CommHandled::Handled => Ok(()),
CommHandled::NotHandled => {
// Fall back to old approach for compatibility while we migrate comms
log::trace!("Sending message to comm '{}'", comm.comm_name);
comm.incoming_tx.send(comm_msg).log_err();
Ok(())
},
}

idk, seems like a clearer use of match?

Expand Down Expand Up @@ -667,7 +675,11 @@ impl Shell {
}

/// Handle a request to close a comm
fn handle_comm_close(open_comms: &mut Vec<CommSocket>, msg: &CommClose) -> crate::Result<()> {
fn handle_comm_close(
shell_handler: &mut Box<dyn ShellHandler>,
open_comms: &mut Vec<CommSocket>,
msg: &CommClose,
) -> crate::Result<()> {
let Some(idx) = open_comms.iter().position(|c| c.comm_id == msg.comm_id) else {
log::warn!(
"Received close message for unknown comm channel {}",
Expand All @@ -676,11 +688,16 @@ impl Shell {
return Ok(());
};

// Notify the comm that it's being closed
open_comms[idx].incoming_tx.send(CommMsg::Close).log_err();
// Try to dispatch the message to the new handler API.
// Fall back to notifying via `incoming_tx` for comms not yet migrated.
match shell_handler.handle_comm_close(&msg.comm_id, &open_comms[idx].comm_name)? {
CommHandled::Handled => {},
CommHandled::NotHandled => {
open_comms[idx].incoming_tx.send(CommMsg::Close).log_err();
},
}

open_comms.remove(idx);

log::info!(
"Comm channel closed; there are now {} open comms",
open_comms.len()
Expand Down
157 changes: 157 additions & 0 deletions crates/ark/src/comm_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
//
// comm_handler.rs
//
// Copyright (C) 2026 Posit Software, PBC. All rights reserved.
//
//

use std::cell::Cell;
use std::fmt::Debug;

use amalthea::comm::base_comm::json_rpc_error;
use amalthea::comm::base_comm::JsonRpcErrorCode;
use amalthea::comm::comm_channel::CommMsg;
use amalthea::comm::event::CommEvent;
use amalthea::socket::comm::CommOutgoingTx;
use crossbeam::channel::Sender;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stdext::result::ResultExt;

/// Context provided to `CommHandler` methods, giving access to the outgoing
/// channel and close-request mechanism. In the future, we'll provide access to
/// more of the Console state, such as the currently active environment.
#[derive(Debug)]
pub struct CommHandlerContext {
pub outgoing_tx: CommOutgoingTx,
pub comm_event_tx: Sender<CommEvent>,
closed: Cell<bool>,
}

impl CommHandlerContext {
pub fn new(outgoing_tx: CommOutgoingTx, comm_event_tx: Sender<CommEvent>) -> Self {
Self {
outgoing_tx,
comm_event_tx,
closed: Cell::new(false),
}
}

/// Request that Console close this comm after the current handler method
/// returns. The handler can still send responses or events before and
/// after calling this since cleanup is deferred.
pub fn close_on_exit(&self) {
self.closed.set(true);
}

pub fn is_closed(&self) -> bool {
self.closed.get()
}
}

/// Trait for comm handlers that run synchronously on the R thread.
///
/// All methods are called from the R thread within `ReadConsole`, so R code
/// can be safely called from these handlers.
pub trait CommHandler: Debug {
/// Metadata sent to the frontend in the `comm_open` message
/// (backend-initiated comms). Default is empty object.
fn open_metadata(&self) -> serde_json::Value {
serde_json::json!({})
}

/// Initialise handler state on the R thread (initial scan, first event,
/// etc.). Default is no-op.
fn handle_open(&mut self, _ctx: &CommHandlerContext) {}

/// Handle an incoming message (RPC or data).
fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext);

/// Handle comm close. Default is no-op.
fn handle_close(&mut self, _ctx: &CommHandlerContext) {}

/// Called when the environment changes. The `event` indicates what
/// triggered the change so handlers can decide whether to react.
/// Default is no-op.
fn handle_environment(&mut self, _event: EnvironmentChanged, _ctx: &CommHandlerContext) {}
}

/// Why the environment changed.
#[derive(Debug, Clone, Copy)]
pub enum EnvironmentChanged {
/// A top-level execution completed (user code, debug eval, etc.).
Execution,
/// The user selected a different frame in the call stack during debugging.
FrameSelected,
}
Comment on lines +81 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like this framing!


/// A registered comm in the Console's comm table.
pub(crate) struct ConsoleComm {
pub(crate) handler: Box<dyn CommHandler>,
pub(crate) ctx: CommHandlerContext,
}

/// Handle an RPC request from a `CommMsg`.
///
/// Non-RPC messages are logged and ignored. Requests that could not be
/// handled cause an RPC error response.
pub fn handle_rpc_request<Reqs, Reps>(
outgoing_tx: &CommOutgoingTx,
comm_name: &str,
message: CommMsg,
request_handler: impl FnOnce(Reqs) -> anyhow::Result<Reps>,
) where
Reqs: DeserializeOwned + Debug,
Reps: Serialize,
{
let (id, parent_header, data) = match message {
CommMsg::Rpc {
id,
parent_header,
data,
} => (id, parent_header, data),
other => {
log::warn!("Expected RPC message for {comm_name}, got {other:?}");
return;
},
};

let json = match serde_json::from_value::<Reqs>(data.clone()) {
Ok(m) => {
let _span =
tracing::trace_span!("comm handler", name = comm_name, request = ?m).entered();
match request_handler(m) {
Ok(reply) => match serde_json::to_value(reply) {
Ok(value) => value,
Err(err) => {
let message = format!(
"Failed to serialise reply for {comm_name} request: {err} (request: {data})"
);
log::warn!("{message}");
json_rpc_error(JsonRpcErrorCode::InternalError, message)
},
},
Err(err) => {
let message =
format!("Failed to process {comm_name} request: {err} (request: {data})");
log::warn!("{message}");
json_rpc_error(JsonRpcErrorCode::InternalError, message)
},
}
},
Err(err) => {
let message = format!(
"No handler for {comm_name} request (method not found): {err} (request: {data})"
);
log::warn!("{message}");
json_rpc_error(JsonRpcErrorCode::MethodNotFound, message)
},
};

let response = CommMsg::Rpc {
id,
parent_header,
data: json,
};
outgoing_tx.send(response).log_err();
}
8 changes: 4 additions & 4 deletions crates/ark/src/connections/r_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ pub unsafe extern "C-unwind" fn ps_connection_opened(

if let Err(err) = RConnection::start(
metadata,
console.get_comm_event_tx().clone(),
console.comm_event_tx.clone(),
console.get_iopub_tx().clone(),
id,
) {
Expand All @@ -346,7 +346,7 @@ pub unsafe extern "C-unwind" fn ps_connection_closed(id: SEXP) -> Result<SEXP, a
let id = RObject::view(id).to::<String>()?;

Console::get()
.get_comm_event_tx()
.comm_event_tx
.send(CommEvent::Message(id, CommMsg::Close))?;

Ok(R_NilValue)
Expand All @@ -357,7 +357,7 @@ pub unsafe extern "C-unwind" fn ps_connection_updated(id: SEXP) -> Result<SEXP,
let comm_id: String = RObject::view(id).to::<String>()?;
let event = ConnectionsFrontendEvent::Update;

Console::get().get_comm_event_tx().send(CommEvent::Message(
Console::get().comm_event_tx.send(CommEvent::Message(
comm_id,
CommMsg::Data(serde_json::to_value(event)?),
))?;
Expand All @@ -374,7 +374,7 @@ pub unsafe extern "C-unwind" fn ps_connection_focus(id: SEXP) -> Result<SEXP, an
let comm_id: String = RObject::view(id).to::<String>()?;
let event = ConnectionsFrontendEvent::Focus;

Console::get().get_comm_event_tx().send(CommEvent::Message(
Console::get().comm_event_tx.send(CommEvent::Message(
comm_id,
CommMsg::Data(serde_json::to_value(event)?),
))?;
Expand Down
Loading