Skip to content

Commit a518e29

Browse files
committed
changes conduit sender to tokio unbounded
1 parent dd12213 commit a518e29

File tree

2 files changed

+120
-118
lines changed

2 files changed

+120
-118
lines changed

crates/chat-cli-ui/src/conduit.rs

Lines changed: 115 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const CONTINUATION_LINE: &str = " ⋮ ";
2525
#[derive(thiserror::Error, Debug)]
2626
pub enum ConduitError {
2727
#[error(transparent)]
28-
Send(#[from] Box<std::sync::mpsc::SendError<Event>>),
28+
Send(#[from] Box<tokio::sync::mpsc::error::SendError<Event>>),
2929
#[error(transparent)]
3030
Utf8(#[from] std::string::FromUtf8Error),
3131
#[error("No event set")]
@@ -43,135 +43,139 @@ pub struct ViewEnd {
4343
// TODO: later on we will need replace this byte array with an actual event type from ACP
4444
pub sender: tokio::sync::mpsc::Sender<Vec<u8>>,
4545
/// To receive messages from control about state changes
46-
pub receiver: std::sync::mpsc::Receiver<Event>,
46+
pub receiver: tokio::sync::mpsc::UnboundedReceiver<Event>,
4747
}
4848

4949
impl ViewEnd {
5050
/// Method to facilitate in the interim
5151
/// It takes possible messages from the old even loop and queues write to the output provided
5252
/// This blocks the current thread and consumes the [ViewEnd]
5353
pub fn into_legacy_mode(
54-
self,
54+
mut self,
5555
theme_source: impl ThemeSource,
5656
mut stderr: std::io::Stderr,
5757
mut stdout: std::io::Stdout,
5858
) -> Result<(), ConduitError> {
59-
while let Ok(event) = self.receiver.recv() {
60-
match event {
61-
Event::LegacyPassThrough(content) => match content {
62-
LegacyPassThroughOutput::Stderr(content) => {
63-
stderr.write_all(&content)?;
64-
stderr.flush()?;
59+
tokio::spawn(async move {
60+
while let Some(event) = self.receiver.recv().await {
61+
match event {
62+
Event::LegacyPassThrough(content) => match content {
63+
LegacyPassThroughOutput::Stderr(content) => {
64+
stderr.write_all(&content)?;
65+
stderr.flush()?;
66+
},
67+
LegacyPassThroughOutput::Stdout(content) => {
68+
stdout.write_all(&content)?;
69+
stdout.flush()?;
70+
},
6571
},
66-
LegacyPassThroughOutput::Stdout(content) => {
67-
stdout.write_all(&content)?;
72+
Event::RunStarted(_run_started) => {},
73+
Event::RunFinished(_run_finished) => {},
74+
Event::RunError(_run_error) => {},
75+
Event::StepStarted(_step_started) => {},
76+
Event::StepFinished(_step_finished) => {},
77+
Event::TextMessageStart(_text_message_start) => {
78+
queue!(stdout, theme_source.success_fg(), Print("> "), theme_source.reset(),)?;
79+
},
80+
Event::TextMessageContent(text_message_content) => {
81+
stdout.write_all(&text_message_content.delta)?;
6882
stdout.flush()?;
6983
},
70-
},
71-
Event::RunStarted(_run_started) => {},
72-
Event::RunFinished(_run_finished) => {},
73-
Event::RunError(_run_error) => {},
74-
Event::StepStarted(_step_started) => {},
75-
Event::StepFinished(_step_finished) => {},
76-
Event::TextMessageStart(_text_message_start) => {
77-
queue!(stdout, theme_source.success_fg(), Print("> "), theme_source.reset(),)?;
78-
},
79-
Event::TextMessageContent(text_message_content) => {
80-
stdout.write_all(&text_message_content.delta)?;
81-
stdout.flush()?;
82-
},
83-
Event::TextMessageEnd(_text_message_end) => {
84-
queue!(stderr, theme_source.reset(), theme_source.reset_attributes())?;
85-
execute!(stdout, style::Print("\n"))?;
86-
},
87-
Event::TextMessageChunk(_text_message_chunk) => {},
88-
Event::ToolCallStart(tool_call_start) => {
89-
let ToolCallStart {
90-
tool_call_name,
91-
is_trusted,
92-
mcp_server_name,
93-
..
94-
} = tool_call_start;
95-
96-
queue!(
97-
stdout,
98-
theme_source.emphasis_fg(),
99-
Print(format!(
100-
"🛠️ Using tool: {}{}",
84+
Event::TextMessageEnd(_text_message_end) => {
85+
queue!(stderr, theme_source.reset(), theme_source.reset_attributes())?;
86+
execute!(stdout, style::Print("\n"))?;
87+
},
88+
Event::TextMessageChunk(_text_message_chunk) => {},
89+
Event::ToolCallStart(tool_call_start) => {
90+
let ToolCallStart {
10191
tool_call_name,
102-
if is_trusted {
103-
" (trusted)".dark_green()
104-
} else {
105-
"".reset()
106-
}
107-
)),
108-
theme_source.reset(),
109-
)?;
110-
111-
if let Some(server_name) = mcp_server_name {
92+
is_trusted,
93+
mcp_server_name,
94+
..
95+
} = tool_call_start;
96+
11297
queue!(
11398
stdout,
114-
theme_source.reset(),
115-
Print(" from mcp server "),
11699
theme_source.emphasis_fg(),
117-
Print(&server_name),
100+
Print(format!(
101+
"🛠️ Using tool: {}{}",
102+
tool_call_name,
103+
if is_trusted {
104+
" (trusted)".dark_green()
105+
} else {
106+
"".reset()
107+
}
108+
)),
118109
theme_source.reset(),
119110
)?;
120-
}
121-
122-
execute!(
123-
stdout,
124-
Print("\n"),
125-
Print(CONTINUATION_LINE),
126-
Print("\n"),
127-
Print(TOOL_BULLET)
128-
)?;
129-
},
130-
Event::ToolCallArgs(tool_call_args) => {
131-
if let serde_json::Value::String(content) = tool_call_args.delta {
132-
execute!(stdout, style::Print(content))?;
133-
} else {
134-
execute!(stdout, style::Print(tool_call_args.delta))?;
135-
}
136-
},
137-
Event::ToolCallEnd(_tool_call_end) => {
138-
// noop for now
139-
},
140-
Event::ToolCallResult(_tool_call_result) => {
141-
// noop for now (currently we don't show the tool call results to users)
142-
},
143-
Event::StateSnapshot(_state_snapshot) => {},
144-
Event::StateDelta(_state_delta) => {},
145-
Event::MessagesSnapshot(_messages_snapshot) => {},
146-
Event::Raw(_raw) => {},
147-
Event::Custom(_custom) => {},
148-
Event::ActivitySnapshotEvent(_activity_snapshot_event) => {},
149-
Event::ActivityDeltaEvent(_activity_delta_event) => {},
150-
Event::ReasoningStart(_reasoning_start) => {},
151-
Event::ReasoningMessageStart(_reasoning_message_start) => {},
152-
Event::ReasoningMessageContent(_reasoning_message_content) => {},
153-
Event::ReasoningMessageEnd(_reasoning_message_end) => {},
154-
Event::ReasoningMessageChunk(_reasoning_message_chunk) => {},
155-
Event::ReasoningEnd(_reasoning_end) => {},
156-
Event::MetaEvent(_meta_event) => {},
157-
Event::ToolCallRejection(tool_call_rejection) => {
158-
let ToolCallRejection { reason, name, .. } = tool_call_rejection;
159-
160-
execute!(
161-
stderr,
162-
theme_source.error_fg(),
163-
Print("Command "),
164-
theme_source.warning_fg(),
165-
Print(name),
166-
theme_source.error_fg(),
167-
Print(" is rejected because it matches one or more rules on the denied list:"),
168-
Print(reason),
169-
Print("\n"),
170-
theme_source.reset(),
171-
)?;
172-
},
111+
112+
if let Some(server_name) = mcp_server_name {
113+
queue!(
114+
stdout,
115+
theme_source.reset(),
116+
Print(" from mcp server "),
117+
theme_source.emphasis_fg(),
118+
Print(&server_name),
119+
theme_source.reset(),
120+
)?;
121+
}
122+
123+
execute!(
124+
stdout,
125+
Print("\n"),
126+
Print(CONTINUATION_LINE),
127+
Print("\n"),
128+
Print(TOOL_BULLET)
129+
)?;
130+
},
131+
Event::ToolCallArgs(tool_call_args) => {
132+
if let serde_json::Value::String(content) = tool_call_args.delta {
133+
execute!(stdout, style::Print(content))?;
134+
} else {
135+
execute!(stdout, style::Print(tool_call_args.delta))?;
136+
}
137+
},
138+
Event::ToolCallEnd(_tool_call_end) => {
139+
// noop for now
140+
},
141+
Event::ToolCallResult(_tool_call_result) => {
142+
// noop for now (currently we don't show the tool call results to users)
143+
},
144+
Event::StateSnapshot(_state_snapshot) => {},
145+
Event::StateDelta(_state_delta) => {},
146+
Event::MessagesSnapshot(_messages_snapshot) => {},
147+
Event::Raw(_raw) => {},
148+
Event::Custom(_custom) => {},
149+
Event::ActivitySnapshotEvent(_activity_snapshot_event) => {},
150+
Event::ActivityDeltaEvent(_activity_delta_event) => {},
151+
Event::ReasoningStart(_reasoning_start) => {},
152+
Event::ReasoningMessageStart(_reasoning_message_start) => {},
153+
Event::ReasoningMessageContent(_reasoning_message_content) => {},
154+
Event::ReasoningMessageEnd(_reasoning_message_end) => {},
155+
Event::ReasoningMessageChunk(_reasoning_message_chunk) => {},
156+
Event::ReasoningEnd(_reasoning_end) => {},
157+
Event::MetaEvent(_meta_event) => {},
158+
Event::ToolCallRejection(tool_call_rejection) => {
159+
let ToolCallRejection { reason, name, .. } = tool_call_rejection;
160+
161+
execute!(
162+
stderr,
163+
theme_source.error_fg(),
164+
Print("Command "),
165+
theme_source.warning_fg(),
166+
Print(name),
167+
theme_source.error_fg(),
168+
Print(" is rejected because it matches one or more rules on the denied list:"),
169+
Print(reason),
170+
Print("\n"),
171+
theme_source.reset(),
172+
)?;
173+
},
174+
}
173175
}
174-
}
176+
177+
Ok::<(), ConduitError>(())
178+
});
175179

176180
Ok(())
177181
}
@@ -192,7 +196,7 @@ pub type InputReceiver = tokio::sync::mpsc::Receiver<Vec<u8>>;
192196
pub struct ControlEnd<T> {
193197
pub current_event: Option<Event>,
194198
/// Used by the control to send state changes to the view
195-
pub sender: std::sync::mpsc::Sender<Event>,
199+
pub sender: tokio::sync::mpsc::UnboundedSender<Event>,
196200
/// Flag indicating whether structured events should be sent through the conduit.
197201
/// When true, the control end will send structured event data in addition to
198202
/// raw pass-through content, enabling richer communication between layers.
@@ -381,7 +385,7 @@ pub fn get_legacy_conduits(
381385
ControlEnd<DestinationStderr>,
382386
ControlEnd<DestinationStdout>,
383387
) {
384-
let (state_tx, state_rx) = std::sync::mpsc::channel::<Event>();
388+
let (state_tx, state_rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
385389
let (byte_tx, byte_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
386390

387391
(

crates/chat-cli/src/cli/chat/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -628,13 +628,11 @@ impl ChatSession {
628628
let (view_end, _byte_receiver, mut control_end_stderr, control_end_stdout) =
629629
get_legacy_conduits(should_send_structured_msg);
630630

631-
tokio::task::spawn_blocking(move || {
632-
let stderr = std::io::stderr();
633-
let stdout = std::io::stdout();
634-
if let Err(e) = view_end.into_legacy_mode(StyledText, stderr, stdout) {
635-
error!("Conduit view end legacy mode exited: {:?}", e);
636-
}
637-
});
631+
let stderr = std::io::stderr();
632+
let stdout = std::io::stdout();
633+
if let Err(e) = view_end.into_legacy_mode(StyledText, stderr, stdout) {
634+
error!("Conduit view end legacy mode exited: {:?}", e);
635+
}
638636

639637
let conversation = match resume_conversation {
640638
true => {

0 commit comments

Comments
 (0)