Skip to content

Commit 4ad7ee1

Browse files
committed
[WIP] SSE partially working
1 parent 67ab96d commit 4ad7ee1

File tree

9 files changed

+689
-105
lines changed

9 files changed

+689
-105
lines changed

rust-sdk/Cargo.lock

Lines changed: 182 additions & 80 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust-sdk/crates/ag-ui-client/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ futures = "0.3.31"
1414
json-patch = "4.0.0"
1515
log = "0.4.27"
1616
reqwest = { version = "0.12.22" , features = ["json", "stream"]}
17-
sse-client = "1.1.1"
17+
bytes = "1.5.0"
18+
tokio = "1.36.0"
1819

1920
[dev-dependencies]
21+
env_logger = "0.11.8"
2022
tokio = { version = "1.36.0", features = ["full"] }
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use ag_ui_client::sse::SseResponseExt;
2+
use ag_ui_core::event::Event;
3+
use futures::StreamExt;
4+
use serde::Deserialize;
5+
use std::error::Error;
6+
7+
#[derive(Debug, Deserialize)]
8+
#[serde(rename_all = "lowercase")]
9+
enum EventType {
10+
Ping,
11+
Update,
12+
Message,
13+
}
14+
15+
#[tokio::main]
16+
async fn main() -> Result<(), Box<dyn Error>> {
17+
// Create a client
18+
let client = reqwest::Client::new();
19+
20+
// Example 1: Stream with custom event and data types
21+
println!("Example 1: Typed events with custom event and data types");
22+
let response = client.get("https://httpbun.org/sse").send().await?;
23+
let mut stream = response.event_source().await;
24+
25+
while let Some(result) = stream.next().await {
26+
match result {
27+
Ok(sse_event) => {
28+
if let Some(event_type) = &sse_event.event {
29+
match event_type.as_str() {
30+
"ping" => println!("Ping: {}", sse_event.data),
31+
&_ => panic!("Unknown event type {event_type}")
32+
}
33+
}
34+
},
35+
Err(err) => eprintln!("Error: {}", err),
36+
}
37+
}
38+
Ok(())
39+
}
40+

rust-sdk/crates/ag-ui-client/src/http.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
use std::string::ParseError;
21
use crate::Agent;
32
use crate::agent::AgentError;
3+
use crate::sse::SseResponseExt;
44
use crate::stream::EventStream;
5-
use ag_ui_core::event::Event;
5+
use ag_ui_core::event::{Event, EventType};
66
use ag_ui_core::types::input::RunAgentInput;
77
use ag_ui_core::{AgentState, FwdProps};
88
use async_trait::async_trait;
99
use futures::StreamExt;
10+
use log::{debug, trace};
1011
use reqwest::header::HeaderMap;
1112
use reqwest::{Client as HttpClient, Url};
12-
use sse_client::EventSource;
1313

1414
pub struct HttpAgent {
1515
http_client: HttpClient,
@@ -42,30 +42,31 @@ impl<StateT: AgentState, FwdPropsT: FwdProps> Agent<StateT, FwdPropsT> for HttpA
4242
&self,
4343
input: &RunAgentInput<StateT, FwdPropsT>,
4444
) -> Result<EventStream<'async_trait, StateT>, AgentError> {
45-
let event_source = EventSource::new(&self.base_url.as_str())
46-
.map_err(|e| AgentError::ExecutionError {
47-
message: e.to_string(),
48-
})?;
49-
50-
let stream = self
45+
// Send the request and get the response
46+
let response = self
5147
.http_client
5248
.post(self.base_url.clone())
5349
.json(input)
5450
.headers(self.header_map.clone())
5551
.send()
56-
.await?
57-
.bytes_stream()
58-
.map(|result| {
59-
result.map_err(Into::into).and_then(|bytes| {
60-
serde_json::from_slice::<Event<StateT>>(&bytes).map_err(|e| {
61-
AgentError::ExecutionError {
62-
message: e.to_string(),
63-
}
64-
})
65-
})
52+
.await?;
53+
54+
// Convert the response to an SSE event stream
55+
let stream = response
56+
.event_source()
57+
.await
58+
.map(|result| match result {
59+
Ok(event) => {
60+
trace!("Received event: {event:?}");
61+
let event_data: Event<StateT> = serde_json::from_str(&event.data)?;
62+
trace!("Deserialized event: {event_data:?}");
63+
Ok(event_data)
64+
}
65+
Err(err) => Err(AgentError::ExecutionError {
66+
message: err.to_string(),
67+
}),
6668
})
6769
.boxed();
68-
6970
Ok(stream)
7071
}
7172
}

rust-sdk/crates/ag-ui-client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ pub mod event_handler;
33
pub mod http;
44
mod stream;
55
mod subscriber;
6+
pub mod sse;
67

78
pub use agent::Agent;
89
pub use http::HttpAgent;
10+
pub use sse::SseResponseExt;

0 commit comments

Comments
 (0)