Skip to content

Commit 43f910f

Browse files
committed
move result back into the event- stream
1 parent 7a09aec commit 43f910f

File tree

27 files changed

+261
-249
lines changed

27 files changed

+261
-249
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

llm/anthropic/src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,10 @@ impl ExtendedGuest for AnthropicComponent {
304304
)
305305
}
306306

307-
fn retry_prompt(original_events: &[Event], partial_result: &[StreamDelta]) -> Vec<Event> {
307+
fn retry_prompt(
308+
original_events: &[Result<Event, Error>],
309+
partial_result: &[StreamDelta],
310+
) -> Vec<Event> {
308311
let mut extended_events = Vec::new();
309312
extended_events.push(Event::Message(Message {
310313
role: Role::System,
@@ -322,7 +325,11 @@ impl ExtendedGuest for AnthropicComponent {
322325
"Here is the original question:".to_string(),
323326
)],
324327
}));
325-
extended_events.extend_from_slice(original_events);
328+
extended_events.extend(
329+
original_events
330+
.iter()
331+
.filter_map(|e| e.as_ref().ok().cloned()),
332+
);
326333

327334
let mut partial_result_as_content = Vec::new();
328335
for delta in partial_result {

llm/anthropic/wit/deps/golem-llm/golem-llm.wit

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ interface llm {
265265
/// Represents an ongoing streaming LLM conversation
266266
resource chat-stream {
267267
/// Polls for the next chunk of stream events
268-
poll-next: func() -> result<option<list<stream-event>>, error>;
268+
poll-next: func() -> option<list<result<stream-event, error>>>;
269269
/// Blocks until the next chunk of stream events is available
270-
get-next: func() -> result<list<stream-event>, error>;
270+
get-next: func() -> list<result<stream-event, error>>;
271271
}
272272

273273
// --- Core Functions ---

llm/bedrock/src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ impl ExtendedGuest for BedrockComponent {
4747
})
4848
}
4949

50-
fn retry_prompt(original_events: &[Event], partial_result: &[llm::StreamDelta]) -> Vec<Event> {
50+
fn retry_prompt(
51+
original_events: &[Result<Event, Error>],
52+
partial_result: &[llm::StreamDelta],
53+
) -> Vec<Event> {
5154
let mut extended_events = Vec::new();
5255
extended_events.push(Event::Message(Message {
5356
role: llm::Role::System,
@@ -66,7 +69,11 @@ impl ExtendedGuest for BedrockComponent {
6669
"Here is the original question:".to_string(),
6770
)],
6871
}));
69-
extended_events.extend_from_slice(original_events);
72+
extended_events.extend(
73+
original_events
74+
.iter()
75+
.filter_map(|e| e.as_ref().ok().cloned()),
76+
);
7077

7178
let mut partial_result_as_content = Vec::new();
7279
for delta in partial_result {

llm/bedrock/src/stream.rs

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl BedrockChatStream {
5050
fn set_finished(&self) {
5151
*self.finished.borrow_mut() = true;
5252
}
53-
fn get_single_event(&self) -> Result<Option<llm::StreamEvent>, llm::Error> {
53+
fn get_single_event(&self) -> Option<Result<llm::StreamEvent, llm::Error>> {
5454
if let Some(stream) = self.stream_mut().as_mut() {
5555
let runtime = async_utils::get_async_runtime();
5656

@@ -61,61 +61,55 @@ impl BedrockChatStream {
6161
match token {
6262
Ok(Some(output)) => {
6363
log::trace!("Processing bedrock stream event: {output:?}");
64-
Ok(converse_stream_output_to_stream_event(output))
64+
converse_stream_output_to_stream_event(output).map(Ok)
6565
}
6666
Ok(None) => {
6767
log::trace!("running set_finished on stream due to None event received");
6868
self.set_finished();
69-
Ok(None)
69+
None
7070
}
7171
Err(error) => {
7272
log::trace!("running set_finished on stream due to error: {error:?}");
7373
self.set_finished();
74-
Err(custom_error(
74+
Some(Err(custom_error(
7575
llm::ErrorCode::InternalError,
7676
format!("An error occurred while reading event stream: {error}"),
77-
))
77+
)))
7878
}
7979
}
8080
})
8181
} else if let Some(error) = self.failure() {
8282
self.set_finished();
83-
Err(error.clone())
83+
Some(Err(error.clone()))
8484
} else {
85-
Ok(None)
85+
None
8686
}
8787
}
8888
}
8989

9090
impl llm::GuestChatStream for BedrockChatStream {
91-
fn poll_next(&self) -> Result<Option<Vec<llm::StreamEvent>>, llm::Error> {
91+
fn poll_next(&self) -> Option<Vec<Result<llm::StreamEvent, llm::Error>>> {
9292
if self.is_finished() {
93-
return Ok(Some(vec![]));
93+
return Some(vec![]);
9494
}
95-
96-
let event = self.get_single_event()?;
97-
match event {
98-
Some(event) => {
99-
if let llm::StreamEvent::Finish(metadata) = event.clone() {
100-
if let Some(llm::StreamEvent::Finish(final_metadata)) =
101-
self.get_single_event()?
102-
{
103-
return Ok(Some(vec![llm::StreamEvent::Finish(merge_metadata(
104-
metadata,
105-
final_metadata,
106-
))]));
107-
}
95+
self.get_single_event().map(|event| {
96+
if let Ok(llm::StreamEvent::Finish(metadata)) = &event {
97+
if let Some(Ok(llm::StreamEvent::Finish(final_metadata))) = self.get_single_event()
98+
{
99+
return vec![Ok(llm::StreamEvent::Finish(merge_metadata(
100+
metadata.clone(),
101+
final_metadata,
102+
)))];
108103
}
109-
Ok(Some(vec![event]))
110104
}
111-
None => Ok(None),
112-
}
105+
vec![event]
106+
})
113107
}
114108

115-
fn get_next(&self) -> Result<Vec<llm::StreamEvent>, llm::Error> {
109+
fn get_next(&self) -> Vec<Result<llm::StreamEvent, llm::Error>> {
116110
loop {
117-
if let Some(events) = self.poll_next()? {
118-
return Ok(events);
111+
if let Some(events) = self.poll_next() {
112+
return events;
119113
}
120114
}
121115
}

llm/bedrock/wit/deps/golem-llm/golem-llm.wit

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ interface llm {
265265
/// Represents an ongoing streaming LLM conversation
266266
resource chat-stream {
267267
/// Polls for the next chunk of stream events
268-
poll-next: func() -> result<option<list<stream-event>>, error>;
268+
poll-next: func() -> option<list<result<stream-event, error>>>;
269269
/// Blocks until the next chunk of stream events is available
270-
get-next: func() -> result<list<stream-event>, error>;
270+
get-next: func() -> list<result<stream-event, error>>;
271271
}
272272

273273
// --- Core Functions ---

llm/grok/wit/deps/golem-llm/golem-llm.wit

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ interface llm {
265265
/// Represents an ongoing streaming LLM conversation
266266
resource chat-stream {
267267
/// Polls for the next chunk of stream events
268-
poll-next: func() -> result<option<list<stream-event>>, error>;
268+
poll-next: func() -> option<list<result<stream-event, error>>>;
269269
/// Blocks until the next chunk of stream events is available
270-
get-next: func() -> result<list<stream-event>, error>;
270+
get-next: func() -> list<result<stream-event, error>>;
271271
}
272272

273273
// --- Core Functions ---

llm/llm/src/chat_stream.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,71 +32,75 @@ impl<T: LlmChatStreamState> LlmChatStream<T> {
3232
}
3333

3434
impl<T: LlmChatStreamState> GuestChatStream for LlmChatStream<T> {
35-
fn poll_next(&self) -> Result<Option<Vec<StreamEvent>>, Error> {
35+
fn poll_next(&self) -> Option<Vec<Result<StreamEvent, Error>>> {
3636
if self.implementation.is_finished() {
37-
return Ok(Some(vec![]));
37+
return Some(vec![]);
3838
}
3939

4040
let mut stream = self.implementation.stream_mut();
4141
if let Some(stream) = stream.as_mut() {
4242
match stream.poll_next() {
4343
Poll::Ready(None) => {
4444
self.implementation.set_finished();
45-
Ok(Some(vec![]))
45+
Some(vec![])
4646
}
4747
Poll::Ready(Some(Err(crate::event_source::error::Error::StreamEnded))) => {
4848
self.implementation.set_finished();
49-
Ok(Some(vec![]))
49+
Some(vec![])
50+
}
51+
Poll::Ready(Some(Err(error))) => {
52+
self.implementation.set_finished();
53+
Some(vec![Err(Error {
54+
code: ErrorCode::InternalError,
55+
message: error.to_string(),
56+
provider_error_json: None,
57+
})])
5058
}
51-
Poll::Ready(Some(Err(error))) => Err(Error {
52-
code: ErrorCode::InternalError,
53-
message: error.to_string(),
54-
provider_error_json: None,
55-
}),
5659
Poll::Ready(Some(Ok(event))) => {
5760
let mut events = vec![];
5861

5962
match event {
6063
Event::Open => {}
6164
Event::Message(MessageEvent { data, .. }) => {
6265
if data != "[DONE]" {
63-
match self.implementation.decode_message(&data)? {
64-
Some(stream_event) => {
66+
match self.implementation.decode_message(&data) {
67+
Ok(Some(stream_event)) => {
6568
if matches!(stream_event, StreamEvent::Finish(_)) {
6669
self.implementation.set_finished();
6770
}
68-
events.push(stream_event);
71+
events.push(Ok(stream_event));
6972
}
70-
None => {
73+
Ok(None) => {
7174
// Ignored event
7275
}
76+
Err(err) => events.push(Err(err)),
7377
}
7478
}
7579
}
7680
}
7781

7882
if events.is_empty() {
79-
Ok(None)
83+
None
8084
} else {
81-
Ok(Some(events))
85+
Some(events)
8286
}
8387
}
84-
Poll::Pending => Ok(None),
88+
Poll::Pending => None,
8589
}
8690
} else if let Some(error) = self.implementation.failure().clone() {
8791
self.implementation.set_finished();
88-
Err(error)
92+
Some(vec![Err(error)])
8993
} else {
90-
Ok(None)
94+
None
9195
}
9296
}
9397

94-
fn get_next(&self) -> Result<Vec<StreamEvent>, Error> {
98+
fn get_next(&self) -> Vec<Result<StreamEvent, Error>> {
9599
let pollable = self.subscribe();
96100
loop {
97101
pollable.block();
98-
if let Some(events) = self.poll_next()? {
99-
return Ok(events);
102+
if let Some(events) = self.poll_next() {
103+
return events;
100104
}
101105
}
102106
}

0 commit comments

Comments
 (0)