11
11
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockDeltaEvent ;
12
12
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockStart ;
13
13
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockStartEvent ;
14
+ import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockStopEvent ;
14
15
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamMetadataEvent ;
15
16
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamOutput ;
16
17
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamResponseHandler ;
17
18
import software .amazon .awssdk .services .bedrockruntime .model .MessageStartEvent ;
19
+ import software .amazon .awssdk .services .bedrockruntime .model .MessageStopEvent ;
20
+ import software .amazon .awssdk .services .bedrockruntime .model .StopReason ;
18
21
19
22
import org .elasticsearch .ElasticsearchException ;
20
23
import org .elasticsearch .ExceptionsHelper ;
@@ -90,6 +93,13 @@ public void onNext(ConverseStreamOutput item) {
90
93
);
91
94
return ;
92
95
}
96
+ case ConverseStreamOutput .EventType .MESSAGE_STOP -> {
97
+ demand .set (0 ); // reset demand before we fork to another thread
98
+ item .accept (
99
+ ConverseStreamResponseHandler .Visitor .builder ().onMessageStop (event -> handleMessageStop (event , chunks )).build ()
100
+ );
101
+ return ;
102
+ }
93
103
case ConverseStreamOutput .EventType .CONTENT_BLOCK_START -> {
94
104
demand .set (0 ); // reset demand before we fork to another thread
95
105
item .accept (
@@ -108,14 +118,18 @@ public void onNext(ConverseStreamOutput item) {
108
118
);
109
119
return ;
110
120
}
111
- case ConverseStreamOutput .EventType .METADATA -> {
121
+ case ConverseStreamOutput .EventType .CONTENT_BLOCK_STOP -> {
112
122
demand .set (0 ); // reset demand before we fork to another thread
113
- item .accept (ConverseStreamResponseHandler .Visitor .builder ().onMetadata (event -> handleMetadata (event , chunks )).build ());
123
+ item .accept (
124
+ ConverseStreamResponseHandler .Visitor .builder ()
125
+ .onContentBlockStop (event -> handleContentBlockStop (event , chunks ))
126
+ .build ()
127
+ );
114
128
return ;
115
129
}
116
- case ConverseStreamOutput .EventType .MESSAGE_STOP -> {
130
+ case ConverseStreamOutput .EventType .METADATA -> {
117
131
demand .set (0 ); // reset demand before we fork to another thread
118
- item .accept (ConverseStreamResponseHandler .Visitor .builder ().onMessageStop (event -> Stream . empty ( )).build ());
132
+ item .accept (ConverseStreamResponseHandler .Visitor .builder ().onMetadata (event -> handleMetadata ( event , chunks )).build ());
119
133
return ;
120
134
}
121
135
default -> {
@@ -146,6 +160,22 @@ private void handleMessageStart(MessageStartEvent event, ArrayDeque<StreamingUni
146
160
});
147
161
}
148
162
163
+ private void handleMessageStop (MessageStopEvent event , ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks ) {
164
+ runOnUtilityThreadPool (() -> {
165
+ try {
166
+ var messageStop = handleMessageStop (event );
167
+ messageStop .forEach (chunks ::offer );
168
+ } catch (Exception e ) {
169
+ logger .warn ("Failed to parse message stop event from Amazon Bedrock provider: {}" , event );
170
+ }
171
+ if (chunks .isEmpty ()) {
172
+ upstream .request (1 );
173
+ } else {
174
+ downstream .onNext (new StreamingUnifiedChatCompletionResults .Results (chunks ));
175
+ }
176
+ });
177
+ }
178
+
149
179
private void handleContentBlockStart (
150
180
ContentBlockStartEvent event ,
151
181
ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks
@@ -176,6 +206,22 @@ private void handleContentBlockDelta(
176
206
});
177
207
}
178
208
209
+ private void handleContentBlockStop (
210
+ ContentBlockStopEvent event ,
211
+ ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks
212
+ ) {
213
+ runOnUtilityThreadPool (() -> {
214
+ try {
215
+ var contentBlockStop = handleContentBlockStop (event );
216
+ contentBlockStop .forEach (chunks ::offer );
217
+ } catch (Exception e ) {
218
+ logger .warn ("Failed to parse content block stop event from Amazon Bedrock provider: {}" , event );
219
+ }
220
+ var results = new StreamingUnifiedChatCompletionResults .Results (chunks );
221
+ downstream .onNext (results );
222
+ });
223
+ }
224
+
179
225
private void handleMetadata (
180
226
ConverseStreamMetadataEvent event ,
181
227
ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks
@@ -283,6 +329,56 @@ public static Stream<StreamingUnifiedChatCompletionResults.ChatCompletionChunk>
283
329
return Stream .of (chunk );
284
330
}
285
331
332
+ /**
333
+ * Parse a MessageStopEvent into a ChatCompletionChunk stream
334
+ * @param event the MessageStopEvent data
335
+ * @return a stream of ChatCompletionChunk
336
+ */
337
+ public static Stream <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > handleMessageStop (MessageStopEvent event ) {
338
+ var finishReason = handleFinishReason (event .stopReason ());
339
+ var choice = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice (null , finishReason , 0 );
340
+ var chunk = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk (null , List .of (choice ), null , null , null );
341
+ return Stream .of (chunk );
342
+ }
343
+
344
+ public static Stream <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > processEvent (MessageStopEvent event ) {
345
+ var finishReason = handleFinishReason (event .stopReason ());
346
+ var choice = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice (null , finishReason , 0 );
347
+ var chunk = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk (null , List .of (choice ), null , null , null );
348
+ return Stream .of (chunk );
349
+ }
350
+
351
+ /**
352
+ * This ensures consistent handling of completion termination across different providers.
353
+ * For example, both "stop_sequence" and "end_turn" from Bedrock map to the unified "stop" reason.
354
+ * @param stopReason the stop reason
355
+ * @return a stop reason
356
+ */
357
+ public static String handleFinishReason (StopReason stopReason ) {
358
+ switch (stopReason ) {
359
+ case StopReason .TOOL_USE -> {
360
+ return "FinishReasonToolCalls" ;
361
+ }
362
+ case StopReason .MAX_TOKENS -> {
363
+ return "FinishReasonLength" ;
364
+ }
365
+ case StopReason .CONTENT_FILTERED , StopReason .GUARDRAIL_INTERVENED -> {
366
+ return "FinishReasonContentFilter" ;
367
+ }
368
+ case StopReason .END_TURN , StopReason .STOP_SEQUENCE -> {
369
+ return "FinishReasonStop" ;
370
+ }
371
+ default -> {
372
+ logger .debug ("unhandled stop reason [{}]." , stopReason );
373
+ return "FinishReasonStop" ;
374
+ }
375
+ }
376
+ }
377
+
378
+ public StreamingUnifiedChatCompletionResults .ChatCompletionChunk createBaseChunk () {
379
+ return new StreamingUnifiedChatCompletionResults .ChatCompletionChunk (null , null , null , "chat.completion.chunk" , null );
380
+ }
381
+
286
382
/**
287
383
* processes a tool initialization event from Bedrock
288
384
* This occurs when the model first decides to use a tool, providing its name and ID.
@@ -326,11 +422,20 @@ private static StreamingUnifiedChatCompletionResults.ChatCompletionChunk.Choice.
326
422
* @return a stream of ChatCompletionChunk
327
423
*/
328
424
public static Stream <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > handleContentBlockStart (ContentBlockStartEvent event ) {
329
- var toolCall = handleToolUseStart (event .start ());
330
- var role = "assistant" ;
331
-
332
- var delta = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta (null , null , role , List .of (toolCall ));
333
- var choice = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice (delta , null , 0 );
425
+ StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta delta = null ;
426
+ var index = event .contentBlockIndex ();
427
+ var type = event .start ().type ();
428
+
429
+ switch (type ) {
430
+ case ContentBlockStart .Type .TOOL_USE -> {
431
+ var toolCall = handleToolUseStart (event .start ());
432
+ var role = "assistant" ;
433
+ delta = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta (null , null , role , List .of (toolCall ));
434
+ }
435
+ default -> logger .debug ("unhandled content block start type [{}]." , type );
436
+ }
437
+ delta = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta (null , null , null , null );
438
+ var choice = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice (delta , null , index );
334
439
var chunk = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk (null , List .of (choice ), null , null , null );
335
440
return Stream .of (chunk );
336
441
}
@@ -342,14 +447,35 @@ public static Stream<StreamingUnifiedChatCompletionResults.ChatCompletionChunk>
342
447
* @return a stream of ChatCompletionChunk
343
448
*/
344
449
public static Stream <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > handleContentBlockDelta (ContentBlockDeltaEvent event ) {
345
- var text = event .delta ().text ();
346
- var toolCall = handleToolUseDelta (event .delta ());
347
- var delta = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta (text , null , null , List .of (toolCall ));
348
- var choice = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice (delta , null , 0 );
450
+ StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta delta = null ;
451
+ var type = event .delta ().type ();
452
+
453
+ switch (type ) {
454
+ case ContentBlockDelta .Type .TEXT -> {
455
+ var content = event .delta ().text ();
456
+ delta = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta (content , null , null , null );
457
+ }
458
+ case ContentBlockDelta .Type .TOOL_USE -> {
459
+ var toolCall = handleToolUseDelta (event .delta ());
460
+ delta = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta (null , null , null , List .of (toolCall ));
461
+ }
462
+ default -> logger .debug ("unknown content block delta type [{}]." , type );
463
+ }
464
+ var choice = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice (delta , null , event .contentBlockIndex ());
349
465
var chunk = new StreamingUnifiedChatCompletionResults .ChatCompletionChunk (null , List .of (choice ), null , null , null );
350
466
return Stream .of (chunk );
351
467
}
352
468
469
+ /**
470
+ * processes incremental content updates
471
+ * Parse a ContentBlockStopEvent into a ChatCompletionChunk stream
472
+ * @param event the event data
473
+ * @return a stream of ChatCompletionChunk
474
+ */
475
+ public static Stream <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > handleContentBlockStop (ContentBlockStopEvent event ) {
476
+ return Stream .empty ();
477
+ }
478
+
353
479
/**
354
480
* processes usage statistics
355
481
* Parse a ConverseStreamMetadataEvent into a ChatCompletionChunk stream
0 commit comments