Skip to content

Commit 7220855

Browse files
nikomatsakisclaude
andcommitted
docs: fix doctests for API refactoring
- Export JrConnection from lib.rs public API - Rename ViaBytes to ByteStreams in all doctests - Update JrConnection::new() to JrHandlerChain::new() - Fix .with_client() examples to use .connect_to() first - Add comprehensive docs and examples for spawn_connection() - Update forward_to_request_cx doctest to show proper proxy pattern - Add missing documentation for JrConnection struct - Fix describe_chain() implementation in conductor (was todo!()) All workspace doctests now pass. Co-authored-by: Claude <claude@anthropic.com>
1 parent 7d5a774 commit 7220855

20 files changed

+514
-276
lines changed

src/sacp-conductor/src/component.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use sacp;
22
use sacp_tokio::AcpAgent;
33
use std::pin::Pin;
4-
use tokio_util::compat::{FuturesAsyncReadCompatExt as _, FuturesAsyncWriteCompatExt};
4+
use tokio_util::compat::{
5+
FuturesAsyncReadCompatExt as _, FuturesAsyncWriteCompatExt, TokioAsyncReadCompatExt as _,
6+
TokioAsyncWriteCompatExt as _,
7+
};
58

9+
use futures::channel::mpsc;
610
use futures::{AsyncRead, AsyncWrite};
711

8-
use sacp::JrConnectionCx;
12+
use sacp::{IntoJrTransport, JrConnectionCx};
913
use tokio::process::Child;
1014
use tracing::debug;
1115

@@ -121,6 +125,35 @@ impl ComponentProvider for CommandComponentProvider {
121125
}
122126
}
123127

128+
impl IntoJrTransport for Box<dyn ComponentProvider> {
129+
fn into_jr_transport(
130+
self: Box<Self>,
131+
cx: &JrConnectionCx,
132+
outgoing_rx: mpsc::UnboundedReceiver<jsonrpcmsg::Message>,
133+
incoming_tx: mpsc::UnboundedSender<jsonrpcmsg::Message>,
134+
) -> Result<(), sacp::Error> {
135+
use tokio::io::duplex;
136+
137+
// Create byte streams using tokio duplex channels
138+
let (outgoing_write, outgoing_read) = duplex(8192);
139+
let (incoming_write, incoming_read) = duplex(8192);
140+
141+
// Create the component with the byte streams
142+
let _cleanup = self.create(
143+
cx,
144+
Box::pin(outgoing_write.compat_write()),
145+
Box::pin(incoming_read.compat()),
146+
)?;
147+
148+
// Delegate to ByteStreams for the transport layer
149+
Box::new(sacp::ByteStreams::new(
150+
incoming_write.compat_write(),
151+
outgoing_read.compat(),
152+
))
153+
.into_jr_transport(cx, outgoing_rx, incoming_tx)
154+
}
155+
}
156+
124157
impl ComponentProvider for AcpAgent {
125158
fn create(
126159
&self,

src/sacp-conductor/src/conductor.rs

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl Conductor {
117117
}
118118
}
119119

120-
pub fn into_handler_chain(self) -> JrHandlerChain<impl JrMessageHandler> {
120+
pub fn into_handler_chain(self) -> JrHandlerChain<ConductorMessageHandler> {
121121
let (mut conductor_tx, mut conductor_rx) = mpsc::channel(128 /* chosen arbitrarily */);
122122

123123
let conductor_command = self.conductor_command.unwrap_or_else(|| {
@@ -128,58 +128,33 @@ impl Conductor {
128128
vec![argv0]
129129
});
130130

131-
let mut handler = ConductorHandler {
131+
let mut state = ConductorHandlerState {
132132
components: Default::default(),
133133
bridge_listeners: Default::default(),
134134
bridge_connections: Default::default(),
135135
conductor_command,
136136
proxy_mode: AtomicBool::new(false),
137137
};
138138

139-
let len_components = self.providers.len();
140-
141-
JrHandlerChain::new()
142-
.name(self.name)
143-
.on_receive_message_from_successor({
144-
let mut conductor_tx = conductor_tx.clone();
145-
async move |message: MessageAndCx| {
146-
// If we receive a message from our successor, we must be in proxy mode or else something odd is going on.
147-
conductor_tx
148-
.send(ConductorMessage::AgentToClient {
149-
source_component_index: len_components,
150-
message,
151-
})
152-
.await
153-
.map_err(sacp::util::internal_error)
154-
}
155-
})
156-
// Any incoming messages from the client are client-to-agent messages targeting the first component.
157-
.on_receive_message({
158-
let mut conductor_tx = conductor_tx.clone();
159-
async move |message: MessageAndCx| {
160-
conductor_tx
161-
.send(ConductorMessage::ClientToAgent {
162-
target_component_index: 0,
163-
message,
164-
})
165-
.await
166-
.map_err(sacp::util::internal_error)
167-
}
168-
})
169-
.with_spawned(async move |cx| {
170-
for provider in self.providers {
171-
handler.launch_proxy(&cx, &mut conductor_tx, provider)?;
172-
}
139+
JrHandlerChain::new_with(ConductorMessageHandler {
140+
len_components: self.providers.len(),
141+
conductor_tx: conductor_tx.clone(),
142+
})
143+
.name(self.name)
144+
.with_spawned(async move |cx| {
145+
for provider in self.providers {
146+
state.launch_proxy(&cx, &mut conductor_tx, provider)?;
147+
}
173148

174-
// This is the "central actor" of the conductor. Most other things forward messages
175-
// via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
176-
while let Some(message) = conductor_rx.next().await {
177-
handler
178-
.handle_conductor_message(&cx, message, &mut conductor_tx)
179-
.await?;
180-
}
181-
Ok(())
182-
})
149+
// This is the "central actor" of the conductor. Most other things forward messages
150+
// via `conductor_tx` into this loop. This lets us serialize the conductor's activity.
151+
while let Some(message) = conductor_rx.next().await {
152+
state
153+
.handle_conductor_message(&cx, message, &mut conductor_tx)
154+
.await?;
155+
}
156+
Ok(())
157+
})
183158
}
184159

185160
/// Convenience method to run the conductor with a transport.
@@ -199,12 +174,17 @@ impl Conductor {
199174
}
200175
}
201176

177+
pub struct ConductorMessageHandler {
178+
len_components: usize,
179+
conductor_tx: mpsc::Sender<ConductorMessage>,
180+
}
181+
202182
/// The conductor manages the proxy chain lifecycle and message routing.
203183
///
204184
/// It maintains connections to all components in the chain and routes messages
205185
/// bidirectionally between the editor, components, and agent.
206186
///
207-
pub struct ConductorHandler {
187+
struct ConductorHandlerState {
208188
/// Manages the TCP listeners for MCP connections that will be proxied over ACP.
209189
bridge_listeners: McpBridgeListeners,
210190

@@ -225,7 +205,49 @@ pub struct ConductorHandler {
225205
proxy_mode: AtomicBool,
226206
}
227207

228-
impl ConductorHandler {
208+
impl JrMessageHandler for ConductorMessageHandler {
209+
async fn handle_message(
210+
&mut self,
211+
message: MessageAndCx,
212+
) -> Result<sacp::Handled<MessageAndCx>, sacp::Error> {
213+
JrHandlerChain::new()
214+
.on_receive_message_from_successor({
215+
let len_components = self.len_components;
216+
let mut conductor_tx = self.conductor_tx.clone();
217+
async move |message: MessageAndCx| {
218+
// If we receive a message from our successor, we must be in proxy mode or else something odd is going on.
219+
conductor_tx
220+
.send(ConductorMessage::AgentToClient {
221+
source_component_index: len_components,
222+
message,
223+
})
224+
.await
225+
.map_err(sacp::util::internal_error)
226+
}
227+
})
228+
// Any incoming messages from the client are client-to-agent messages targeting the first component.
229+
.on_receive_message({
230+
let mut conductor_tx = self.conductor_tx.clone();
231+
async move |message: MessageAndCx| {
232+
conductor_tx
233+
.send(ConductorMessage::ClientToAgent {
234+
target_component_index: 0,
235+
message,
236+
})
237+
.await
238+
.map_err(sacp::util::internal_error)
239+
}
240+
})
241+
.apply(message)
242+
.await
243+
}
244+
245+
fn describe_chain(&self) -> impl std::fmt::Debug {
246+
"ConductorMessageHandler"
247+
}
248+
}
249+
250+
impl ConductorHandlerState {
229251
/// Recursively spawns components and builds the proxy chain.
230252
///
231253
/// This function implements the recursive chain building pattern:

src/sacp-conductor/tests/arrow_proxy_eliza.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,20 @@ async fn test_conductor_with_arrow_proxy_and_eliza() -> Result<(), sacp::Error>
2424

2525
// Spawn the conductor
2626
let conductor_handle = tokio::spawn(async move {
27-
Conductor::run(
27+
Conductor::new(
2828
"conductor".to_string(),
29+
vec![arrow_proxy_agent, eliza_agent],
30+
None,
31+
)
32+
.run(sacp::ByteStreams::new(
2933
conductor_write.compat_write(),
3034
conductor_read.compat(),
31-
vec![Box::new(arrow_proxy_agent), Box::new(eliza_agent)],
32-
)
35+
))
3336
.await
3437
});
3538

36-
// Editor side: connect and send a prompt using helper
37-
let editor_handle = tokio::spawn(async move {
39+
// Wait for editor to complete and get the result
40+
let result = tokio::time::timeout(std::time::Duration::from_secs(30), async move {
3841
let result =
3942
yolo_prompt(editor_write.compat_write(), editor_read.compat(), "Hello").await?;
4043

@@ -47,14 +50,10 @@ async fn test_conductor_with_arrow_proxy_and_eliza() -> Result<(), sacp::Error>
4750
);
4851

4952
Ok::<String, sacp::Error>(result)
50-
});
51-
52-
// Wait for editor to complete and get the result
53-
let result = tokio::time::timeout(std::time::Duration::from_secs(30), editor_handle)
54-
.await
55-
.expect("Test timed out")
56-
.expect("Editor task panicked")
57-
.expect("Editor failed");
53+
})
54+
.await
55+
.expect("Test timed out")
56+
.expect("Editor failed");
5857

5958
tracing::info!(
6059
?result,

src/sacp-conductor/tests/initialization_sequence.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66
//! 3. Proxy components must accept the capability or initialization fails
77
//! 4. Last component (agent) never receives proxy capability offer
88
9-
use futures::{AsyncRead, AsyncWrite};
9+
use futures::channel::mpsc;
1010
use sacp::schema::{AgentCapabilities, InitializeRequest, InitializeResponse};
11-
use sacp::{JrConnectionCx, JrHandlerChain, MetaCapabilityExt, Proxy};
12-
use sacp_conductor::component::{Cleanup, ComponentProvider};
11+
use sacp::{IntoJrTransport, JrConnectionCx, JrHandlerChain, MetaCapabilityExt, Proxy};
1312
use sacp_conductor::conductor::Conductor;
1413
use sacp_proxy::JrCxExt;
15-
use std::pin::Pin;
1614
use std::sync::Arc;
1715
use std::sync::Mutex;
1816

@@ -64,23 +62,26 @@ struct InitComponentProvider {
6462
}
6563

6664
impl InitComponentProvider {
67-
fn new(config: &Arc<InitConfig>) -> Box<dyn ComponentProvider> {
65+
fn new(config: &Arc<InitConfig>) -> Box<dyn IntoJrTransport> {
6866
Box::new(Self {
6967
config: config.clone(),
7068
})
7169
}
7270
}
7371

74-
impl ComponentProvider for InitComponentProvider {
75-
fn create(
76-
&self,
72+
impl IntoJrTransport for InitComponentProvider {
73+
fn into_jr_transport(
74+
self: Box<Self>,
7775
cx: &JrConnectionCx,
78-
outgoing_bytes: Pin<Box<dyn AsyncWrite + Send>>,
79-
incoming_bytes: Pin<Box<dyn AsyncRead + Send>>,
80-
) -> Result<Cleanup, sacp::Error> {
76+
outgoing_rx: mpsc::UnboundedReceiver<jsonrpcmsg::Message>,
77+
incoming_tx: mpsc::UnboundedSender<jsonrpcmsg::Message>,
78+
) -> Result<(), sacp::Error> {
8179
let config = Arc::clone(&self.config);
80+
8281
cx.spawn(async move {
83-
let transport = sacp::ByteStreams::new(outgoing_bytes, incoming_bytes);
82+
// Create the channel-based transport
83+
let transport = sacp::Channels::new(outgoing_rx, incoming_tx);
84+
8485
JrHandlerChain::new()
8586
.name("init-component-provider")
8687
.on_receive_request(async move |mut request: InitializeRequest, request_cx| {
@@ -117,12 +118,12 @@ impl ComponentProvider for InitComponentProvider {
117118
.await
118119
})?;
119120

120-
Ok(Cleanup::None)
121+
Ok(())
121122
}
122123
}
123124

124125
async fn run_test_with_components(
125-
components: Vec<Box<dyn ComponentProvider>>,
126+
components: Vec<Box<dyn IntoJrTransport>>,
126127
editor_task: impl AsyncFnOnce(JrConnectionCx) -> Result<(), sacp::Error>,
127128
) -> Result<(), sacp::Error> {
128129
// Set up editor <-> conductor communication
@@ -133,16 +134,15 @@ async fn run_test_with_components(
133134

134135
JrHandlerChain::new()
135136
.name("editor-to-connector")
136-
.with_spawned(async move {
137-
Conductor::run(
138-
"conductor".to_string(),
139-
conductor_out.compat_write(),
140-
conductor_in.compat(),
141-
components,
142-
)
143-
.await
137+
.with_spawned(|_cx| async move {
138+
Conductor::new("conductor".to_string(), components, None)
139+
.run(sacp::ByteStreams::new(
140+
conductor_out.compat_write(),
141+
conductor_in.compat(),
142+
))
143+
.await
144144
})
145-
.serve_with(transport, editor_task)
145+
.with_client(transport, editor_task)
146146
.await
147147
}
148148

src/sacp-conductor/tests/mcp-integration.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,19 @@ async fn run_test_with_components(
5353

5454
JrHandlerChain::new()
5555
.name("editor-to-connector")
56-
.with_spawned(async move {
57-
Conductor::run_with_command(
56+
.with_spawned(|_cx| async move {
57+
Conductor::new(
5858
"conductor".to_string(),
59-
conductor_out.compat_write(),
60-
conductor_in.compat(),
6159
components,
6260
Some(conductor_command()),
6361
)
62+
.run(sacp::ByteStreams::new(
63+
conductor_out.compat_write(),
64+
conductor_in.compat(),
65+
))
6466
.await
6567
})
66-
.serve_with(transport, editor_task)
68+
.with_client(transport, editor_task)
6769
.await
6870
}
6971

@@ -139,20 +141,22 @@ async fn test_agent_handles_prompt() -> Result<(), sacp::Error> {
139141
.map_err(|_| sacp::Error::internal_error())
140142
}
141143
})
142-
.with_spawned(async move {
143-
Conductor::run_with_command(
144+
.with_spawned(|_cx| async move {
145+
Conductor::new(
144146
"mcp-integration-conductor".to_string(),
145-
conductor_out.compat_write(),
146-
conductor_in.compat(),
147147
vec![
148148
mcp_integration::proxy::create(),
149149
mcp_integration::agent::create(),
150150
],
151151
Some(conductor_command()),
152152
)
153+
.run(sacp::ByteStreams::new(
154+
conductor_out.compat_write(),
155+
conductor_in.compat(),
156+
))
153157
.await
154158
})
155-
.serve_with(transport, async |editor_cx| {
159+
.with_client(transport, async |editor_cx| {
156160
// Initialize
157161
recv(editor_cx.send_request(InitializeRequest {
158162
protocol_version: Default::default(),

0 commit comments

Comments
 (0)