Skip to content

Commit 2dd28b8

Browse files
committed
feat(conversation): Introduce turn markers and stream sanitization
This commit introduces a structured turn model and robust stream sanitization to `jp_conversation`. A new `TurnStart` event kind is used to group sequences of events within a single query invocation, enabling better navigation and filtering of conversation history. The `EventBuilder` is added to handle the accumulation of partial LLM stream chunks into complete `ConversationEvent` objects, supporting interleaved reasoning, message, and structured output parts. To improve privacy and prevent sensitive data from leaking into workspace search results, a storage-level encoding layer has been implemented. Tool arguments, results, and metadata are now base64-encoded when persisted to disk, while remaining transparent to the rest of the application. Additionally, the `ConversationStream` now includes a `sanitize` process to repair structural invariants that may be violated after filtering or interrupted tool executions. This includes injecting synthetic error responses for orphaned tool calls and re-indexing turn markers. The `Thread` model has also been updated to replace the concept of "instructions" with "sections", providing a more flexible way to render context before sending it to providers. Signed-off-by: Jean Mertz <git@jeanmertz.com>
1 parent 26d2926 commit 2dd28b8

18 files changed

+2465
-289
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/jp_conversation/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@ version.workspace = true
1616
jp_attachment = { workspace = true }
1717
jp_config = { workspace = true }
1818
jp_id = { workspace = true }
19-
jp_serde = { workspace = true }
20-
19+
base64 = { workspace = true, features = ["std"] }
20+
chrono = { workspace = true }
2121
indexmap = { workspace = true }
2222
quick-xml = { workspace = true, features = ["serialize"] }
2323
serde = { workspace = true }
2424
serde_json = { workspace = true, features = ["preserve_order"] }
2525
thiserror = { workspace = true }
26-
chrono = { workspace = true }
2726
tracing = { workspace = true }
2827

2928
[dev-dependencies]
29+
assert_matches = { workspace = true }
3030
insta = { workspace = true, features = ["json"] }
3131
test-log = { workspace = true }
3232

crates/jp_conversation/src/conversation.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ use std::{fmt, str::FromStr};
55
use chrono::{DateTime, Utc};
66
use jp_id::{
77
Id, NANOSECONDS_PER_DECISECOND,
8-
parts::{GlobalId, TargetId, Variant},
8+
parts::{TargetId, Variant},
99
};
10-
use jp_serde::skip_if;
1110
use serde::{Deserialize, Serialize};
1211

1312
use crate::error::{Error, Result};
@@ -28,7 +27,7 @@ pub struct Conversation {
2827

2928
/// Whether the conversation is stored in the user or workspace storage.
3029
// TODO: rename to `user_local`
31-
#[serde(default, rename = "local", skip_serializing_if = "skip_if::is_false")]
30+
#[serde(default, rename = "local", skip_serializing_if = "std::ops::Not::not")]
3231
pub user: bool,
3332

3433
/// When the conversation expires.
@@ -110,7 +109,7 @@ pub struct ConversationId(#[serde(with = "jp_id::serde")] DateTime<Utc>);
110109
impl fmt::Debug for ConversationId {
111110
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112111
f.debug_tuple("ConversationId")
113-
.field(&self.to_string())
112+
.field(&self.as_deciseconds())
114113
.finish()
115114
}
116115
}
@@ -236,10 +235,6 @@ impl Id for ConversationId {
236235
fn target_id(&self) -> TargetId {
237236
self.as_deciseconds().to_string().into()
238237
}
239-
240-
fn global_id(&self) -> GlobalId {
241-
jp_id::global::get().into()
242-
}
243238
}
244239

245240
impl fmt::Display for ConversationId {

crates/jp_conversation/src/event.rs

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
mod chat;
44
mod inquiry;
55
mod tool_call;
6+
mod turn;
67

78
use std::fmt;
89

@@ -13,8 +14,12 @@ use serde_json::{Map, Value};
1314

1415
pub use self::{
1516
chat::{ChatRequest, ChatResponse},
16-
inquiry::{InquiryAnswerType, InquiryQuestion, InquiryRequest, InquiryResponse, InquirySource},
17+
inquiry::{
18+
InquiryAnswerType, InquiryId, InquiryQuestion, InquiryRequest, InquiryResponse,
19+
InquirySource, SelectOption,
20+
},
1721
tool_call::{ToolCallRequest, ToolCallResponse},
22+
turn::TurnStart,
1823
};
1924

2025
/// A single event in a conversation.
@@ -32,11 +37,7 @@ pub struct ConversationEvent {
3237
pub kind: EventKind,
3338

3439
/// Additional opaque metadata associated with the event.
35-
#[serde(
36-
default,
37-
skip_serializing_if = "Map::is_empty",
38-
with = "jp_serde::repr::base64_json_map"
39-
)]
40+
#[serde(default, skip_serializing_if = "Map::is_empty")]
4041
pub metadata: Map<String, Value>,
4142
}
4243

@@ -305,12 +306,43 @@ impl ConversationEvent {
305306
_ => None,
306307
}
307308
}
309+
310+
/// Returns `true` if the event is a [`TurnStart`].
311+
#[must_use]
312+
pub const fn is_turn_start(&self) -> bool {
313+
matches!(self.kind, EventKind::TurnStart(_))
314+
}
315+
316+
/// Returns a reference to the [`TurnStart`], if applicable.
317+
#[must_use]
318+
pub const fn as_turn_start(&self) -> Option<&TurnStart> {
319+
match &self.kind {
320+
EventKind::TurnStart(turn_start) => Some(turn_start),
321+
_ => None,
322+
}
323+
}
324+
325+
/// Consumes the event and returns the [`TurnStart`], if applicable.
326+
#[must_use]
327+
pub fn into_turn_start(self) -> Option<TurnStart> {
328+
match self.kind {
329+
EventKind::TurnStart(turn_start) => Some(turn_start),
330+
_ => None,
331+
}
332+
}
308333
}
309334

310335
/// A type of event in a conversation.
311336
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
312337
#[serde(tag = "type", rename_all = "snake_case")]
313338
pub enum EventKind {
339+
/// A turn start event.
340+
///
341+
/// This event marks the beginning of a new turn in the conversation. A turn
342+
/// groups together a user's chat request through the assistant's final
343+
/// response, including any intermediate tool calls.
344+
TurnStart(TurnStart),
345+
314346
/// A chat request event.
315347
///
316348
/// This event is usually triggered by the user, but can also be
@@ -353,6 +385,22 @@ pub enum EventKind {
353385
InquiryResponse(InquiryResponse),
354386
}
355387

388+
impl EventKind {
389+
/// Returns the name of the event kind.
390+
#[must_use]
391+
pub const fn as_str(&self) -> &str {
392+
match self {
393+
Self::TurnStart(_) => "TurnStart",
394+
Self::ChatRequest(_) => "ChatRequest",
395+
Self::ChatResponse(_) => "ChatResponse",
396+
Self::ToolCallRequest(_) => "ToolCallRequest",
397+
Self::ToolCallResponse(_) => "ToolCallResponse",
398+
Self::InquiryRequest(_) => "InquiryRequest",
399+
Self::InquiryResponse(_) => "InquiryResponse",
400+
}
401+
}
402+
}
403+
356404
impl From<ChatRequest> for EventKind {
357405
fn from(request: ChatRequest) -> Self {
358406
Self::ChatRequest(request)
@@ -389,6 +437,12 @@ impl From<InquiryResponse> for EventKind {
389437
}
390438
}
391439

440+
impl From<TurnStart> for EventKind {
441+
fn from(turn_start: TurnStart) -> Self {
442+
Self::TurnStart(turn_start)
443+
}
444+
}
445+
392446
impl From<ChatRequest> for ConversationEvent {
393447
fn from(request: ChatRequest) -> Self {
394448
Self::now(request)
@@ -424,3 +478,9 @@ impl From<InquiryResponse> for ConversationEvent {
424478
Self::now(response)
425479
}
426480
}
481+
482+
impl From<TurnStart> for ConversationEvent {
483+
fn from(turn_start: TurnStart) -> Self {
484+
Self::now(turn_start)
485+
}
486+
}

0 commit comments

Comments
 (0)