Skip to content

Commit a7ce9c9

Browse files
committed
defines conduit constructs
1 parent ac5496a commit a7ce9c9

File tree

4 files changed

+202
-39
lines changed

4 files changed

+202
-39
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/chat-cli-ui/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ tracing.workspace = true
1515
tracing-appender.workspace = true
1616
tracing-subscriber.workspace = true
1717
thiserror.workspace = true
18+
serde.workspace = true
19+
serde_json.workspace = true
20+
chrono.workspace = true
21+
crossterm.workspace = true
1822

1923
[target.'cfg(unix)'.dependencies]
2024
nix.workspace = true

crates/chat-cli-ui/src/conduit.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
use crossterm::{
2+
execute,
3+
style,
4+
};
5+
6+
use crate::protocol::Event;
7+
8+
#[derive(thiserror::Error, Debug)]
9+
pub enum ConduitError {
10+
#[error(transparent)]
11+
Send(#[from] Box<std::sync::mpsc::SendError<Event>>),
12+
#[error(transparent)]
13+
Utf8(#[from] std::string::FromUtf8Error),
14+
#[error("No event set")]
15+
NullState,
16+
#[error(transparent)]
17+
Io(#[from] std::io::Error),
18+
}
19+
20+
/// The view would own this struct.
21+
/// [ViewEnd] serves two purposes
22+
/// - To deliver user inputs to the control layer from the view layer
23+
/// - To deliver state changes from the control layer to the view layer
24+
pub struct ViewEnd {
25+
/// Used by the view to send input to the control
26+
// TODO: later on we will need replace this byte array with an actual event type from ACP
27+
pub sender: std::sync::mpsc::Sender<Vec<u8>>,
28+
/// To receive messages from control about state changes
29+
pub receiver: std::sync::mpsc::Receiver<Event>,
30+
}
31+
32+
impl ViewEnd {
33+
/// Method to facilitate in the interim
34+
/// It takes possible messages from the old even loop and queues write to the output provided
35+
/// This blocks the current thread and consumes the [ViewEnd]
36+
pub fn into_legacy_mode(self, mut output: impl std::io::Write) -> Result<(), ConduitError> {
37+
while let Ok(event) = self.receiver.recv() {
38+
let content = match event {
39+
Event::Custom(custom) => custom.value.to_string(),
40+
Event::TextMessageContent(content) => content.delta,
41+
Event::TextMessageChunk(chunk) => {
42+
if let Some(content) = chunk.delta {
43+
content
44+
} else {
45+
continue;
46+
}
47+
},
48+
_ => continue,
49+
};
50+
51+
execute!(&mut output, style::Print(content))?;
52+
}
53+
54+
Ok(())
55+
}
56+
}
57+
58+
/// This compliments the [ViewEnd]. It can be thought of as the "other end" of a pipe.
59+
/// The control would own this.
60+
pub struct ControlEnd {
61+
pub current_event: Option<Event>,
62+
/// Used by the control to send state changes to the view
63+
pub sender: std::sync::mpsc::Sender<Event>,
64+
/// To receive user input from the view
65+
// TODO: later on we will need replace this byte array with an actual event type from ACP
66+
pub receiver: std::sync::mpsc::Receiver<Vec<u8>>,
67+
}
68+
69+
impl ControlEnd {
70+
/// Primes the [ControlEnd] with the state passed in
71+
/// This api is intended to serve as an interim solution to bridge the gap between the current
72+
/// code base, which heavily relies on crossterm apis to print directly to the terminal and the
73+
/// refactor where the message passing paradigm is the norm
74+
pub fn prime(&mut self, event: Event) {
75+
self.current_event.replace(event);
76+
}
77+
78+
/// Sends an event to the view layer through the conduit
79+
pub fn send(&self, event: Event) -> Result<(), ConduitError> {
80+
Ok(self.sender.send(event).map_err(Box::new)?)
81+
}
82+
}
83+
84+
impl std::io::Write for ControlEnd {
85+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
86+
// We'll default to custom event
87+
// This hardly matters because in legacy mode we are simply extracting the bytes and
88+
// dumping it to output
89+
if self.current_event.is_none() {
90+
self.current_event.replace(Event::Custom(Default::default()));
91+
}
92+
93+
let current_event = self
94+
.current_event
95+
.as_mut()
96+
.ok_or(std::io::Error::other("No event set"))?;
97+
98+
current_event
99+
.insert_content(buf)
100+
.map_err(|_e| std::io::Error::other("Error inserting content"))?;
101+
102+
Ok(buf.len())
103+
}
104+
105+
fn flush(&mut self) -> std::io::Result<()> {
106+
let current_state = self.current_event.take().ok_or(std::io::Error::other("No state set"))?;
107+
108+
self.sender.send(current_state).map_err(std::io::Error::other)
109+
}
110+
}
111+
112+
/// Creates a bidirectional communication channel between view and control layers.
113+
///
114+
/// This function establishes a message-passing conduit that enables:
115+
/// - The view layer to send user input (as bytes) to the control layer
116+
/// - The control layer to send state changes to the view layer
117+
///
118+
/// # Returns
119+
/// A tuple containing:
120+
/// - `ViewEnd<S>`: The view-side endpoint for sending input and receiving state updates
121+
/// - `ControlEnd<S>`: The control-side endpoint for receiving input and sending state updates
122+
///
123+
/// # Type Parameters
124+
/// - `S`: The state type that implements `ViewState` trait
125+
pub fn get_conduit_pair() -> (ViewEnd, ControlEnd) {
126+
let (state_tx, state_rx) = std::sync::mpsc::channel::<Event>();
127+
let (byte_tx, byte_rx) = std::sync::mpsc::channel::<Vec<u8>>();
128+
129+
(
130+
ViewEnd {
131+
sender: byte_tx,
132+
receiver: state_rx,
133+
},
134+
ControlEnd {
135+
current_event: None,
136+
sender: state_tx,
137+
receiver: byte_rx,
138+
},
139+
)
140+
}
141+
142+
pub trait InterimEvent {
143+
type Error: std::error::Error;
144+
fn insert_content(&mut self, content: &[u8]) -> Result<(), Self::Error>;
145+
}
146+
147+
// It seems silly to implement a trait we have defined in the crate for a type we have also defined
148+
// in the same crate. But the plan is to move the Event type definition out of this crate (or use a
149+
// an external crate once AGUI has a rust crate)
150+
impl InterimEvent for Event {
151+
type Error = ConduitError;
152+
153+
fn insert_content(&mut self, content: &[u8]) -> Result<(), ConduitError> {
154+
debug_assert!(self.is_compatible_with_legacy_event_loop());
155+
156+
match self {
157+
Self::Custom(_custom) => {
158+
// custom events are defined in this UI crate
159+
// TODO: use an enum as implement AsRef for it
160+
// match custom.name.as_str() {
161+
// _ => {},
162+
// }
163+
},
164+
Self::TextMessageContent(msg_content) => {
165+
let str = String::from_utf8(content.to_vec())?;
166+
msg_content.delta.push_str(&str);
167+
},
168+
Self::TextMessageChunk(chunk) => {
169+
let str = String::from_utf8(content.to_vec())?;
170+
if let Some(d) = chunk.delta.as_mut() {
171+
d.push_str(&str);
172+
}
173+
},
174+
_ => unreachable!(),
175+
}
176+
177+
Ok(())
178+
}
179+
}

crates/chat-cli-ui/src/protocol.rs

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
//! This is largely based on https://docs.ag-ui.com/concepts/events
22
//! They do not have a rust SDK so for now we are handrolling these types
33
4-
use serde::{Deserialize, Serialize};
4+
use serde::{
5+
Deserialize,
6+
Serialize,
7+
};
58
use serde_json::Value;
6-
use std::collections::HashMap;
7-
use chrono::{DateTime, Utc};
89

910
/// Role of a message sender
1011
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -207,7 +208,7 @@ pub struct Raw {
207208
}
208209

209210
/// Used for application-specific custom events
210-
#[derive(Debug, Clone, Serialize, Deserialize)]
211+
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
211212
pub struct Custom {
212213
pub name: String,
213214
pub value: Value,
@@ -232,7 +233,7 @@ pub struct ActivitySnapshotEvent {
232233
pub struct ActivityDeltaEvent {
233234
pub message_id: String,
234235
pub activity_type: String, // e.g., "PLAN", "SEARCH", "SCRAPE"
235-
pub patch: Vec<Value>, // JSON Patch operations (RFC 6902)
236+
pub patch: Vec<Value>, // JSON Patch operations (RFC 6902)
236237
}
237238

238239
// ============================================================================
@@ -306,76 +307,49 @@ pub struct MetaEvent {
306307

307308
/// Main event enum that encompasses all event types in the Agent UI Protocol
308309
#[derive(Debug, Clone, Serialize, Deserialize)]
309-
#[serde(tag = "type")]
310+
#[serde(tag = "type", rename_all = "camelCase")]
310311
pub enum Event {
311312
// Lifecycle Events
312-
#[serde(rename = "runStarted")]
313313
RunStarted(RunStarted),
314-
#[serde(rename = "runFinished")]
315314
RunFinished(RunFinished),
316-
#[serde(rename = "runError")]
317315
RunError(RunError),
318-
#[serde(rename = "stepStarted")]
319316
StepStarted(StepStarted),
320-
#[serde(rename = "stepFinished")]
321317
StepFinished(StepFinished),
322318

323319
// Text Message Events
324-
#[serde(rename = "textMessageStart")]
325320
TextMessageStart(TextMessageStart),
326-
#[serde(rename = "textMessageContent")]
327321
TextMessageContent(TextMessageContent),
328-
#[serde(rename = "textMessageEnd")]
329322
TextMessageEnd(TextMessageEnd),
330-
#[serde(rename = "textMessageChunk")]
331323
TextMessageChunk(TextMessageChunk),
332324

333325
// Tool Call Events
334-
#[serde(rename = "toolCallStart")]
335326
ToolCallStart(ToolCallStart),
336-
#[serde(rename = "toolCallArgs")]
337327
ToolCallArgs(ToolCallArgs),
338-
#[serde(rename = "toolCallEnd")]
339328
ToolCallEnd(ToolCallEnd),
340-
#[serde(rename = "toolCallResult")]
341329
ToolCallResult(ToolCallResult),
342330

343331
// State Management Events
344-
#[serde(rename = "stateSnapshot")]
345332
StateSnapshot(StateSnapshot),
346-
#[serde(rename = "stateDelta")]
347333
StateDelta(StateDelta),
348-
#[serde(rename = "messagesSnapshot")]
349334
MessagesSnapshot(MessagesSnapshot),
350335

351336
// Special Events
352-
#[serde(rename = "raw")]
353337
Raw(Raw),
354-
#[serde(rename = "custom")]
355338
Custom(Custom),
356339

357340
// Draft Events - Activity Events
358-
#[serde(rename = "activitySnapshotEvent")]
359341
ActivitySnapshotEvent(ActivitySnapshotEvent),
360-
#[serde(rename = "activityDeltaEvent")]
361342
ActivityDeltaEvent(ActivityDeltaEvent),
362343

363344
// Draft Events - Reasoning Events
364-
#[serde(rename = "reasoningStart")]
365345
ReasoningStart(ReasoningStart),
366-
#[serde(rename = "reasoningMessageStart")]
367346
ReasoningMessageStart(ReasoningMessageStart),
368-
#[serde(rename = "reasoningMessageContent")]
369347
ReasoningMessageContent(ReasoningMessageContent),
370-
#[serde(rename = "reasoningMessageEnd")]
371348
ReasoningMessageEnd(ReasoningMessageEnd),
372-
#[serde(rename = "reasoningMessageChunk")]
373349
ReasoningMessageChunk(ReasoningMessageChunk),
374-
#[serde(rename = "reasoningEnd")]
375350
ReasoningEnd(ReasoningEnd),
376351

377352
// Draft Events - Meta Events
378-
#[serde(rename = "metaEvent")]
379353
MetaEvent(MetaEvent),
380354
}
381355

@@ -428,6 +402,13 @@ impl Event {
428402
}
429403
}
430404

405+
pub fn is_compatible_with_legacy_event_loop(&self) -> bool {
406+
matches!(
407+
self,
408+
Event::Custom(_) | Event::TextMessageContent(_) | Event::TextMessageChunk(_)
409+
)
410+
}
411+
431412
/// Check if this is a lifecycle event
432413
pub fn is_lifecycle_event(&self) -> bool {
433414
matches!(
@@ -455,10 +436,7 @@ impl Event {
455436
pub fn is_tool_call_event(&self) -> bool {
456437
matches!(
457438
self,
458-
Event::ToolCallStart(_)
459-
| Event::ToolCallArgs(_)
460-
| Event::ToolCallEnd(_)
461-
| Event::ToolCallResult(_)
439+
Event::ToolCallStart(_) | Event::ToolCallArgs(_) | Event::ToolCallEnd(_) | Event::ToolCallResult(_)
462440
)
463441
}
464442

@@ -486,5 +464,3 @@ impl Event {
486464
)
487465
}
488466
}
489-
490-

0 commit comments

Comments
 (0)