1+ package com.agui.client.chunks
2+
3+ import com.agui.core.types.*
4+ import kotlinx.coroutines.flow.*
5+ import co.touchlab.kermit.Logger
6+
7+ private val logger = Logger .withTag(" ChunkTransform" )
8+
9+ /* *
10+ * Transforms chunk events (TEXT_MESSAGE_CHUNK, TOOL_CALL_CHUNK) into structured event sequences.
11+ *
12+ * This transform handles automatic start/end sequences for chunk events:
13+ * - TEXT_MESSAGE_CHUNK events are converted into TEXT_MESSAGE_START/CONTENT/END sequences
14+ * - TOOL_CALL_CHUNK events are converted into TOOL_CALL_START/ARGS/END sequences
15+ *
16+ * The transform maintains state to track active sequences and only starts new sequences
17+ * when no active sequence exists or when IDs change. This allows chunk events to
18+ * integrate seamlessly with existing message/tool call flows.
19+ *
20+ * @param debug Whether to enable debug logging
21+ * @return Flow<BaseEvent> with chunk events transformed into structured sequences
22+ */
23+ fun Flow<BaseEvent>.transformChunks (debug : Boolean = false): Flow <BaseEvent > {
24+ // State tracking for active sequences
25+ var mode: String? = null // "text" or "tool"
26+ var textMessageId: String? = null
27+ var toolCallId: String? = null
28+ var toolCallName: String? = null
29+ var parentMessageId: String? = null
30+
31+ return transform { event ->
32+ if (debug) {
33+ logger.d { " [CHUNK_TRANSFORM]: Processing ${event.eventType} " }
34+ }
35+
36+ when (event) {
37+ is TextMessageChunkEvent -> {
38+ val messageId = event.messageId
39+ val delta = event.delta
40+
41+ // Determine if we need to start a new text message
42+ val needsNewTextMessage = mode != " text" ||
43+ (messageId != null && messageId != textMessageId)
44+
45+ if (needsNewTextMessage) {
46+ if (debug) {
47+ logger.d { " [CHUNK_TRANSFORM]: Starting new text message (id: $messageId )" }
48+ }
49+
50+ // Close any existing tool call sequence first
51+ if (mode == " tool" && toolCallId != null ) {
52+ emit(ToolCallEndEvent (
53+ toolCallId = toolCallId!! ,
54+ timestamp = event.timestamp,
55+ rawEvent = event.rawEvent
56+ ))
57+ }
58+
59+ // Require messageId for the first chunk of a new message
60+ if (messageId == null ) {
61+ throw IllegalArgumentException (" messageId is required for TEXT_MESSAGE_CHUNK when starting a new text message" )
62+ }
63+
64+ // Start new text message
65+ emit(TextMessageStartEvent (
66+ messageId = messageId,
67+ timestamp = event.timestamp,
68+ rawEvent = event.rawEvent
69+ ))
70+
71+ mode = " text"
72+ textMessageId = messageId
73+ }
74+
75+ // Generate content event if delta is present
76+ if (delta != null ) {
77+ val currentMessageId = textMessageId ? : messageId
78+ if (currentMessageId == null ) {
79+ throw IllegalArgumentException (" Cannot generate TEXT_MESSAGE_CONTENT without a messageId" )
80+ }
81+
82+ emit(TextMessageContentEvent (
83+ messageId = currentMessageId,
84+ delta = delta,
85+ timestamp = event.timestamp,
86+ rawEvent = event.rawEvent
87+ ))
88+ }
89+ }
90+
91+ is ToolCallChunkEvent -> {
92+ val toolId = event.toolCallId
93+ val toolName = event.toolCallName
94+ val delta = event.delta
95+ val parentMsgId = event.parentMessageId
96+
97+ // Determine if we need to start a new tool call
98+ val needsNewToolCall = mode != " tool" ||
99+ (toolId != null && toolId != toolCallId)
100+
101+ if (needsNewToolCall) {
102+ if (debug) {
103+ logger.d { " [CHUNK_TRANSFORM]: Starting new tool call (id: $toolId , name: $toolName )" }
104+ }
105+
106+ // Close any existing text message sequence first
107+ if (mode == " text" && textMessageId != null ) {
108+ emit(TextMessageEndEvent (
109+ messageId = textMessageId!! ,
110+ timestamp = event.timestamp,
111+ rawEvent = event.rawEvent
112+ ))
113+ }
114+
115+ // Require toolCallId and toolCallName for the first chunk of a new tool call
116+ if (toolId == null || toolName == null ) {
117+ throw IllegalArgumentException (" toolCallId and toolCallName are required for TOOL_CALL_CHUNK when starting a new tool call" )
118+ }
119+
120+ // Start new tool call
121+ emit(ToolCallStartEvent (
122+ toolCallId = toolId,
123+ toolCallName = toolName,
124+ parentMessageId = parentMsgId,
125+ timestamp = event.timestamp,
126+ rawEvent = event.rawEvent
127+ ))
128+
129+ mode = " tool"
130+ toolCallId = toolId
131+ toolCallName = toolName
132+ parentMessageId = parentMsgId
133+ }
134+
135+ // Generate args event if delta is present
136+ if (delta != null ) {
137+ val currentToolCallId = toolCallId ? : toolId
138+ if (currentToolCallId == null ) {
139+ throw IllegalArgumentException (" Cannot generate TOOL_CALL_ARGS without a toolCallId" )
140+ }
141+
142+ emit(ToolCallArgsEvent (
143+ toolCallId = currentToolCallId,
144+ delta = delta,
145+ timestamp = event.timestamp,
146+ rawEvent = event.rawEvent
147+ ))
148+ }
149+ }
150+
151+ // Track state changes from regular events to maintain consistency
152+ is TextMessageStartEvent -> {
153+ mode = " text"
154+ textMessageId = event.messageId
155+ emit(event)
156+ }
157+
158+ is TextMessageEndEvent -> {
159+ if (mode == " text" && textMessageId == event.messageId) {
160+ mode = null
161+ textMessageId = null
162+ }
163+ emit(event)
164+ }
165+
166+ is ToolCallStartEvent -> {
167+ mode = " tool"
168+ toolCallId = event.toolCallId
169+ toolCallName = event.toolCallName
170+ parentMessageId = event.parentMessageId
171+ emit(event)
172+ }
173+
174+ is ToolCallEndEvent -> {
175+ if (mode == " tool" && toolCallId == event.toolCallId) {
176+ mode = null
177+ toolCallId = null
178+ toolCallName = null
179+ parentMessageId = null
180+ }
181+ emit(event)
182+ }
183+
184+ else -> {
185+ // Pass through all other events unchanged
186+ emit(event)
187+ }
188+ }
189+ }
190+ }
0 commit comments