1
+ package com.agui.client.chunks
2
+
3
+ import com.agui.core.types.*
4
+ import kotlinx.coroutines.flow.*
5
+ import co.touchlab.kermit.Logger
6
+
7
+ private val logger = Logger .withTag(" ChunkTransform" )
8
+
9
+ /* *
10
+ * Transforms chunk events (TEXT_MESSAGE_CHUNK, TOOL_CALL_CHUNK) into structured event sequences.
11
+ *
12
+ * This transform handles automatic start/end sequences for chunk events:
13
+ * - TEXT_MESSAGE_CHUNK events are converted into TEXT_MESSAGE_START/CONTENT/END sequences
14
+ * - TOOL_CALL_CHUNK events are converted into TOOL_CALL_START/ARGS/END sequences
15
+ *
16
+ * The transform maintains state to track active sequences and only starts new sequences
17
+ * when no active sequence exists or when IDs change. This allows chunk events to
18
+ * integrate seamlessly with existing message/tool call flows.
19
+ *
20
+ * @param debug Whether to enable debug logging
21
+ * @return Flow<BaseEvent> with chunk events transformed into structured sequences
22
+ */
23
+ fun Flow<BaseEvent>.transformChunks (debug : Boolean = false): Flow <BaseEvent > {
24
+ // State tracking for active sequences
25
+ var mode: String? = null // "text" or "tool"
26
+ var textMessageId: String? = null
27
+ var toolCallId: String? = null
28
+ var toolCallName: String? = null
29
+ var parentMessageId: String? = null
30
+
31
+ return transform { event ->
32
+ if (debug) {
33
+ logger.d { " [CHUNK_TRANSFORM]: Processing ${event.eventType} " }
34
+ }
35
+
36
+ when (event) {
37
+ is TextMessageChunkEvent -> {
38
+ val messageId = event.messageId
39
+ val delta = event.delta
40
+
41
+ // Determine if we need to start a new text message
42
+ val needsNewTextMessage = mode != " text" ||
43
+ (messageId != null && messageId != textMessageId)
44
+
45
+ if (needsNewTextMessage) {
46
+ if (debug) {
47
+ logger.d { " [CHUNK_TRANSFORM]: Starting new text message (id: $messageId )" }
48
+ }
49
+
50
+ // Close any existing tool call sequence first
51
+ if (mode == " tool" && toolCallId != null ) {
52
+ emit(ToolCallEndEvent (
53
+ toolCallId = toolCallId!! ,
54
+ timestamp = event.timestamp,
55
+ rawEvent = event.rawEvent
56
+ ))
57
+ }
58
+
59
+ // Require messageId for the first chunk of a new message
60
+ if (messageId == null ) {
61
+ throw IllegalArgumentException (" messageId is required for TEXT_MESSAGE_CHUNK when starting a new text message" )
62
+ }
63
+
64
+ // Start new text message
65
+ emit(TextMessageStartEvent (
66
+ messageId = messageId,
67
+ timestamp = event.timestamp,
68
+ rawEvent = event.rawEvent
69
+ ))
70
+
71
+ mode = " text"
72
+ textMessageId = messageId
73
+ }
74
+
75
+ // Generate content event if delta is present
76
+ if (delta != null ) {
77
+ val currentMessageId = textMessageId ? : messageId
78
+ if (currentMessageId == null ) {
79
+ throw IllegalArgumentException (" Cannot generate TEXT_MESSAGE_CONTENT without a messageId" )
80
+ }
81
+
82
+ emit(TextMessageContentEvent (
83
+ messageId = currentMessageId,
84
+ delta = delta,
85
+ timestamp = event.timestamp,
86
+ rawEvent = event.rawEvent
87
+ ))
88
+ }
89
+ }
90
+
91
+ is ToolCallChunkEvent -> {
92
+ val toolId = event.toolCallId
93
+ val toolName = event.toolCallName
94
+ val delta = event.delta
95
+ val parentMsgId = event.parentMessageId
96
+
97
+ // Determine if we need to start a new tool call
98
+ val needsNewToolCall = mode != " tool" ||
99
+ (toolId != null && toolId != toolCallId)
100
+
101
+ if (needsNewToolCall) {
102
+ if (debug) {
103
+ logger.d { " [CHUNK_TRANSFORM]: Starting new tool call (id: $toolId , name: $toolName )" }
104
+ }
105
+
106
+ // Close any existing text message sequence first
107
+ if (mode == " text" && textMessageId != null ) {
108
+ emit(TextMessageEndEvent (
109
+ messageId = textMessageId!! ,
110
+ timestamp = event.timestamp,
111
+ rawEvent = event.rawEvent
112
+ ))
113
+ }
114
+
115
+ // Require toolCallId and toolCallName for the first chunk of a new tool call
116
+ if (toolId == null || toolName == null ) {
117
+ throw IllegalArgumentException (" toolCallId and toolCallName are required for TOOL_CALL_CHUNK when starting a new tool call" )
118
+ }
119
+
120
+ // Start new tool call
121
+ emit(ToolCallStartEvent (
122
+ toolCallId = toolId,
123
+ toolCallName = toolName,
124
+ parentMessageId = parentMsgId,
125
+ timestamp = event.timestamp,
126
+ rawEvent = event.rawEvent
127
+ ))
128
+
129
+ mode = " tool"
130
+ toolCallId = toolId
131
+ toolCallName = toolName
132
+ parentMessageId = parentMsgId
133
+ }
134
+
135
+ // Generate args event if delta is present
136
+ if (delta != null ) {
137
+ val currentToolCallId = toolCallId ? : toolId
138
+ if (currentToolCallId == null ) {
139
+ throw IllegalArgumentException (" Cannot generate TOOL_CALL_ARGS without a toolCallId" )
140
+ }
141
+
142
+ emit(ToolCallArgsEvent (
143
+ toolCallId = currentToolCallId,
144
+ delta = delta,
145
+ timestamp = event.timestamp,
146
+ rawEvent = event.rawEvent
147
+ ))
148
+ }
149
+ }
150
+
151
+ // Track state changes from regular events to maintain consistency
152
+ is TextMessageStartEvent -> {
153
+ mode = " text"
154
+ textMessageId = event.messageId
155
+ emit(event)
156
+ }
157
+
158
+ is TextMessageEndEvent -> {
159
+ if (mode == " text" && textMessageId == event.messageId) {
160
+ mode = null
161
+ textMessageId = null
162
+ }
163
+ emit(event)
164
+ }
165
+
166
+ is ToolCallStartEvent -> {
167
+ mode = " tool"
168
+ toolCallId = event.toolCallId
169
+ toolCallName = event.toolCallName
170
+ parentMessageId = event.parentMessageId
171
+ emit(event)
172
+ }
173
+
174
+ is ToolCallEndEvent -> {
175
+ if (mode == " tool" && toolCallId == event.toolCallId) {
176
+ mode = null
177
+ toolCallId = null
178
+ toolCallName = null
179
+ parentMessageId = null
180
+ }
181
+ emit(event)
182
+ }
183
+
184
+ else -> {
185
+ // Pass through all other events unchanged
186
+ emit(event)
187
+ }
188
+ }
189
+ }
190
+ }
0 commit comments