Skip to content

Commit cf61494

Browse files
authored
refactor(llm): Replace Vec<Event> with Reply wrapper (#137)
This change introduces a `Reply` struct that wraps `Vec<Event>` to provide a more semantic representation of LLM responses. The Reply type includes a `From` implementation for `AssistantMessage` conversion and maintains backward compatibility through Deref traits. All provider implementations have been updated to return `Reply` instead of raw event vectors, creating a cleaner abstraction for handling LLM response collections. Signed-off-by: Jean Mertz <git@jeanmertz.com>
1 parent 91b7268 commit cf61494

File tree

6 files changed

+78
-20
lines changed

6 files changed

+78
-20
lines changed

crates/jp_llm/src/provider.rs

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use async_trait::async_trait;
1313
use futures::{Stream, StreamExt as _};
1414
use google::Google;
1515
use jp_config::llm::provider;
16-
use jp_conversation::{message::ToolCallRequest, model::ProviderId, Model};
16+
use jp_conversation::{message::ToolCallRequest, model::ProviderId, AssistantMessage, Model};
1717
use jp_query::query::{ChatQuery, StructuredQuery};
1818
use ollama::Ollama;
1919
use openai::Openai;
@@ -117,6 +117,56 @@ impl StreamEvent {
117117
}
118118
}
119119

120+
/// A collection of events in a single reply.
121+
#[derive(Debug, Clone, Default, PartialEq)]
122+
pub struct Reply(Vec<Event>);
123+
124+
impl Reply {
125+
/// Returns the list of events in the reply.
126+
#[must_use]
127+
pub fn into_inner(self) -> Vec<Event> {
128+
self.0
129+
}
130+
}
131+
132+
impl std::ops::Deref for Reply {
133+
type Target = Vec<Event>;
134+
135+
fn deref(&self) -> &Self::Target {
136+
&self.0
137+
}
138+
}
139+
140+
impl std::ops::DerefMut for Reply {
141+
fn deref_mut(&mut self) -> &mut Self::Target {
142+
&mut self.0
143+
}
144+
}
145+
146+
impl From<Reply> for AssistantMessage {
147+
fn from(reply: Reply) -> Self {
148+
let mut message = AssistantMessage::default();
149+
150+
for event in reply.0 {
151+
match event {
152+
Event::Content(content) => {
153+
message.content.get_or_insert_default().push_str(&content);
154+
}
155+
Event::Reasoning(reasoning) => message
156+
.reasoning
157+
.get_or_insert_default()
158+
.push_str(&reasoning),
159+
Event::ToolCall(call) => message.tool_calls.push(call),
160+
Event::Metadata(key, metadata) => {
161+
message.metadata.insert(key, metadata);
162+
}
163+
}
164+
}
165+
166+
message
167+
}
168+
}
169+
120170
/// Represents a completed event from the LLM.
121171
#[derive(Debug, Clone, PartialEq)]
122172
pub enum Event {
@@ -217,7 +267,7 @@ pub trait Provider: std::fmt::Debug + Send + Sync {
217267
/// Perform a non-streaming chat completion.
218268
///
219269
/// Default implementation collects results from the streaming version.
220-
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Vec<Event>> {
270+
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Reply> {
221271
let mut stream = self.chat_completion_stream(model, query).await?;
222272
let mut events = Vec::new();
223273
let mut reasoning = String::new();
@@ -252,7 +302,7 @@ pub trait Provider: std::fmt::Debug + Send + Sync {
252302
events.push(Event::Content(content));
253303
}
254304

255-
Ok(events)
305+
Ok(Reply(events))
256306
}
257307

258308
/// Perform a structured completion.
@@ -283,10 +333,13 @@ pub trait Provider: std::fmt::Debug + Send + Sync {
283333
}
284334
};
285335

286-
let data = events.into_iter().find_map(|event| match event {
287-
Event::ToolCall(call) if call.name == SCHEMA_TOOL_NAME => Some(call.arguments),
288-
_ => None,
289-
});
336+
let data = events
337+
.into_inner()
338+
.into_iter()
339+
.find_map(|event| match event {
340+
Event::ToolCall(call) if call.name == SCHEMA_TOOL_NAME => Some(call.arguments),
341+
_ => None,
342+
});
290343

291344
match data {
292345
Some(data) => return Ok(query.map(data)),

crates/jp_llm/src/provider/anthropic.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use serde_json::Value;
2020
use time::macros::date;
2121
use tracing::{trace, warn};
2222

23-
use super::{Event, EventStream, ModelDetails, Provider, ReasoningDetails, StreamEvent};
23+
use super::{Event, EventStream, ModelDetails, Provider, ReasoningDetails, Reply, StreamEvent};
2424
use crate::{
2525
error::{Error, Result},
2626
provider::{handle_delta, AccumulationState, Delta},
@@ -149,14 +149,15 @@ impl Provider for Anthropic {
149149
Ok(models.into_iter().map(map_model).collect())
150150
}
151151

152-
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Vec<Event>> {
152+
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Reply> {
153153
let request = self.create_request(model, query).await?;
154154
self.client
155155
.messages()
156156
.create(request)
157157
.await
158158
.map_err(Into::into)
159159
.and_then(map_response)
160+
.map(Reply)
160161
}
161162

162163
async fn chat_completion_stream(&self, model: &Model, query: ChatQuery) -> Result<EventStream> {

crates/jp_llm/src/provider/google.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use jp_query::query::ChatQuery;
1515
use serde_json::Value;
1616
use tracing::trace;
1717

18-
use super::{Event, EventStream, ModelDetails, Provider, ReasoningDetails};
18+
use super::{Event, EventStream, ModelDetails, Provider, ReasoningDetails, Reply};
1919
use crate::{
2020
error::{Error, Result},
2121
provider::Delta,
@@ -109,14 +109,15 @@ impl Provider for Google {
109109
.collect())
110110
}
111111

112-
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Vec<Event>> {
112+
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Reply> {
113113
let request = self.create_request(model, query).await?;
114114

115115
self.client
116116
.generate_content(model.id.slug(), &request)
117117
.await
118118
.map_err(Into::into)
119119
.and_then(map_response)
120+
.map(Reply)
120121
}
121122

122123
async fn chat_completion_stream(&self, model: &Model, query: ChatQuery) -> Result<EventStream> {

crates/jp_llm/src/provider/ollama.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use serde_json::Value;
2424
use tracing::trace;
2525
use url::Url;
2626

27-
use super::{handle_delta, Event, EventStream, ModelDetails, Provider, StreamEvent};
27+
use super::{handle_delta, Event, EventStream, ModelDetails, Provider, Reply, StreamEvent};
2828
use crate::{
2929
error::{Error, Result},
3030
provider::{AccumulationState, Delta},
@@ -44,13 +44,14 @@ impl Provider for Ollama {
4444
Ok(models.into_iter().map(map_model).collect())
4545
}
4646

47-
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Vec<Event>> {
47+
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Reply> {
4848
let request = create_request(model, query)?;
4949
self.client
5050
.send_chat_messages(request)
5151
.await
5252
.map_err(Into::into)
5353
.and_then(map_response)
54+
.map(Reply)
5455
}
5556

5657
async fn chat_completion_stream(&self, model: &Model, query: ChatQuery) -> Result<EventStream> {

crates/jp_llm/src/provider/openai.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use time::{macros::date, OffsetDateTime};
2222
use tracing::{debug, trace, warn};
2323

2424
use super::{
25-
handle_delta, Delta, Event, EventStream, ModelDetails, Provider, ReasoningDetails, StreamEvent,
25+
handle_delta, Delta, Event, EventStream, ModelDetails, Provider, ReasoningDetails, Reply,
26+
StreamEvent,
2627
};
2728
use crate::{
2829
error::{Error, Result},
@@ -94,14 +95,15 @@ impl Provider for Openai {
9495
.collect())
9596
}
9697

97-
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Vec<Event>> {
98+
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Reply> {
9899
let client = self.client.clone();
99100
let request = self.create_request(model, query).await?;
100101
client
101102
.create(request)
102103
.await?
103104
.map_err(Into::into)
104105
.and_then(map_response)
106+
.map(Reply)
105107
}
106108

107109
async fn chat_completion_stream(&self, model: &Model, query: ChatQuery) -> Result<EventStream> {

crates/jp_llm/src/provider/openrouter.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use serde::Serialize;
2727
use serde_json::Value;
2828
use tracing::{debug, trace, warn};
2929

30-
use super::{CompletionChunk, Delta, Event, EventStream, ModelDetails, StreamEvent};
30+
use super::{CompletionChunk, Delta, Event, EventStream, ModelDetails, Reply, StreamEvent};
3131
use crate::{
3232
error::Result,
3333
provider::{handle_delta, AccumulationState, Provider},
@@ -198,7 +198,7 @@ impl Provider for Openrouter {
198198
Ok(stream)
199199
}
200200

201-
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Vec<Event>> {
201+
async fn chat_completion(&self, model: &Model, query: ChatQuery) -> Result<Reply> {
202202
let request = self.build_request(query, model).await?;
203203
let completion =
204204
self.client.chat_completion(request).await.inspect_err(
@@ -210,12 +210,12 @@ impl Provider for Openrouter {
210210
let choice_data = completion.choices.into_iter().next();
211211
let Some(choice) = choice_data else {
212212
trace!("OpenRouter delta had no choices, skipping.");
213-
return Ok(vec![]);
213+
return Ok(Reply::default());
214214
};
215215

216216
let Choice::NonStreaming(choice) = choice else {
217217
warn!("Received streaming choice in non-streaming context, ignoring.");
218-
return Ok(vec![]);
218+
return Ok(Reply::default());
219219
};
220220

221221
if let Some(ErrorResponse { code, message, .. }) = choice.error {
@@ -240,7 +240,7 @@ impl Provider for Openrouter {
240240
}));
241241
}
242242

243-
Ok(events)
243+
Ok(Reply(events))
244244
}
245245
}
246246

0 commit comments

Comments
 (0)