7
7
8
8
package org .elasticsearch .xpack .inference .services .amazonbedrock .client ;
9
9
10
- import org .elasticsearch .common .bytes .BytesReference ;
11
-
12
- import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
13
- import org .elasticsearch .common .xcontent .XContentHelper ;
14
- import org .elasticsearch .core .Nullable ;
15
- import org .elasticsearch .xcontent .ConstructingObjectParser ;
16
- import org .elasticsearch .xcontent .ParseField ;
17
- import org .elasticsearch .xcontent .XContentFactory ;
18
- import org .elasticsearch .xcontent .XContentParser ;
19
- import org .elasticsearch .xcontent .XContentParserConfiguration ;
20
- import org .elasticsearch .xcontent .XContentType ;
21
- import org .elasticsearch .xpack .core .inference .results .StreamingUnifiedChatCompletionResults ;
22
-
23
10
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockDeltaEvent ;
24
11
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockStartEvent ;
25
12
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamOutput ;
26
13
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamResponseHandler ;
27
14
28
15
import org .elasticsearch .ElasticsearchException ;
29
16
import org .elasticsearch .ExceptionsHelper ;
17
+ import org .elasticsearch .common .bytes .BytesReference ;
30
18
import org .elasticsearch .common .util .concurrent .EsExecutors ;
19
+ import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
20
+ import org .elasticsearch .common .xcontent .XContentHelper ;
21
+ import org .elasticsearch .core .Nullable ;
31
22
import org .elasticsearch .core .Strings ;
32
23
import org .elasticsearch .logging .LogManager ;
33
24
import org .elasticsearch .logging .Logger ;
34
25
import org .elasticsearch .threadpool .ThreadPool ;
26
+ import org .elasticsearch .xcontent .ConstructingObjectParser ;
27
+ import org .elasticsearch .xcontent .ParseField ;
28
+ import org .elasticsearch .xcontent .XContentFactory ;
29
+ import org .elasticsearch .xcontent .XContentParser ;
30
+ import org .elasticsearch .xcontent .XContentParserConfiguration ;
31
+ import org .elasticsearch .xcontent .XContentType ;
32
+ import org .elasticsearch .xpack .core .inference .results .StreamingUnifiedChatCompletionResults ;
35
33
36
34
import java .io .IOException ;
37
35
import java .util .ArrayDeque ;
51
49
import static org .elasticsearch .xpack .inference .external .response .XContentUtils .moveToFirstToken ;
52
50
53
51
@ SuppressWarnings ("checkstyle:LineLength" )
54
- class AmazonBedrockUnifiedStreamingChatProcessor implements Flow .Processor <ConverseStreamOutput , StreamingUnifiedChatCompletionResults .Results > {
52
+ class AmazonBedrockUnifiedStreamingChatProcessor
53
+ implements
54
+ Flow .Processor <ConverseStreamOutput , StreamingUnifiedChatCompletionResults .Results > {
55
55
private static final Logger logger = LogManager .getLogger (AmazonBedrockStreamingChatProcessor .class );
56
56
57
57
private final AtomicReference <Throwable > error = new AtomicReference <>(null );
@@ -98,19 +98,19 @@ public void onNext(ConverseStreamOutput item) {
98
98
demand .set (0 ); // reset demand before we fork to another thread
99
99
item .accept (ConverseStreamResponseHandler .Visitor .builder ().onContentBlockStart (this ::handleContentBlockStart ).build ());
100
100
101
- // role := unified.RoleType(msg.Value.Role)
102
- // chunk.Choices[0].Delta.Role = &role
101
+ // role := unified.RoleType(msg.Value.Role)
102
+ // chunk.Choices[0].Delta.Role = &role
103
103
104
- // item.accept(ConverseStreamResponseHandler.Visitor.builder()
105
- // .onMessageStart(this::sendDownstreamOnAnotherThread).build());
104
+ // item.accept(ConverseStreamResponseHandler.Visitor.builder()
105
+ // .onMessageStart(this::sendDownstreamOnAnotherThread).build());
106
106
return ;
107
107
}
108
108
109
109
case ConverseStreamOutput .EventType .CONTENT_BLOCK_DELTA -> {
110
110
demand .set (0 ); // reset demand before we fork to another thread
111
- item .accept (ConverseStreamResponseHandler . Visitor . builder ()
112
- . onContentBlockDelta (this ::sendDownstreamOnAnotherThread )
113
- . build () );
111
+ item .accept (
112
+ ConverseStreamResponseHandler . Visitor . builder (). onContentBlockDelta (this ::sendDownstreamOnAnotherThread ). build ( )
113
+ );
114
114
return ;
115
115
}
116
116
default -> {
@@ -137,7 +137,7 @@ private void handleContentBlockStart(ContentBlockStartEvent event) {
137
137
} catch (Exception e ) {
138
138
String eventString = event .start ().toString ();
139
139
logger .warn ("Failed to parse event from Amazon Bedrock AI provider: {}" , eventString );
140
- // throw errorParser.apply(eventString, e);
140
+ // throw errorParser.apply(eventString, e);
141
141
}
142
142
143
143
var results = new StreamingUnifiedChatCompletionResults .Results (result );
@@ -158,7 +158,7 @@ private void sendDownstreamOnAnotherThread(ContentBlockDeltaEvent event) {
158
158
} catch (Exception e ) {
159
159
String eventString = event .delta ().text ();
160
160
logger .warn ("Failed to parse event from Amazon Bedrock AI provider: {}" , eventString );
161
- // throw errorParser.apply(eventString, e);
161
+ // throw errorParser.apply(eventString, e);
162
162
}
163
163
164
164
var results = new StreamingUnifiedChatCompletionResults .Results (result );
@@ -245,16 +245,6 @@ public void cancel() {
245
245
}
246
246
}
247
247
248
-
249
-
250
-
251
-
252
-
253
-
254
-
255
-
256
-
257
-
258
248
private Iterator <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > parse (
259
249
XContentParserConfiguration parserConfig ,
260
250
String event
@@ -263,29 +253,12 @@ private Iterator<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> pars
263
253
moveToFirstToken (jsonParser );
264
254
ensureExpectedToken (XContentParser .Token .START_OBJECT , jsonParser .currentToken (), jsonParser );
265
255
266
- StreamingUnifiedChatCompletionResults .ChatCompletionChunk chunk = AmazonBedrockUnifiedStreamingChatProcessor .AmazonBedrockChatCompletionChunkParser .parse (jsonParser );
256
+ StreamingUnifiedChatCompletionResults .ChatCompletionChunk chunk =
257
+ AmazonBedrockUnifiedStreamingChatProcessor .AmazonBedrockChatCompletionChunkParser .parse (jsonParser );
267
258
return Collections .singleton (chunk ).iterator ();
268
259
}
269
260
}
270
261
271
-
272
-
273
-
274
-
275
-
276
-
277
-
278
-
279
-
280
-
281
-
282
-
283
-
284
-
285
-
286
-
287
-
288
-
289
262
private static final String CONTENT_BLOCK_START_FIELD = "start" ;
290
263
private static final String START_FIELD = "start" ;
291
264
private static final String TOOL_USE_FIELD = "toolUse" ;
@@ -308,7 +281,9 @@ private Iterator<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> pars
308
281
// --- Nested Parsers for Amazon Bedrock structure ---
309
282
310
283
private record ContentBlockStart (AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart toolUse , Type type ) {}
284
+
311
285
private record ToolUseBlockStart (String toolUseId , String name ) {}
286
+
312
287
private record Type (String name ) {}
313
288
314
289
private static class ContentBlockStartParser {
@@ -334,11 +309,12 @@ public static AmazonBedrockUnifiedStreamingChatProcessor.ContentBlockStart parse
334
309
335
310
private static class ToolUseBlockStartParser {
336
311
@ SuppressWarnings ("unchecked" )
337
- private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart , Void > PARSER = new ConstructingObjectParser <>(
338
- START_FIELD ,
339
- true ,
340
- args -> new AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart ((String ) args [0 ], (String ) args [1 ])
341
- );
312
+ private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart , Void > PARSER =
313
+ new ConstructingObjectParser <>(
314
+ START_FIELD ,
315
+ true ,
316
+ args -> new AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart ((String ) args [0 ], (String ) args [1 ])
317
+ );
342
318
343
319
static {
344
320
PARSER .declareObject (
@@ -359,11 +335,12 @@ public static AmazonBedrockUnifiedStreamingChatProcessor.ToolUseBlockStart parse
359
335
}
360
336
361
337
private static class ToolUseParser {
362
- private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart , Void > PARSER = new ConstructingObjectParser <>(
363
- "toolUse" ,
364
- true ,
365
- args -> new AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart ((String ) args [0 ], (String ) args [1 ])
366
- );
338
+ private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart , Void > PARSER =
339
+ new ConstructingObjectParser <>(
340
+ "toolUse" ,
341
+ true ,
342
+ args -> new AmazonBedrockUnifiedStreamingChatProcessor .ToolUseBlockStart ((String ) args [0 ], (String ) args [1 ])
343
+ );
367
344
368
345
static {
369
346
PARSER .declareString (ConstructingObjectParser .optionalConstructorArg (), new ParseField (TOOL_USE_ID_FIELD ));
@@ -378,11 +355,7 @@ public static AmazonBedrockUnifiedStreamingChatProcessor.ToolUseBlockStart parse
378
355
379
356
private static class TypeParser {
380
357
private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .Type , Void > PARSER =
381
- new ConstructingObjectParser <>(
382
- "type" ,
383
- true ,
384
- args -> new AmazonBedrockUnifiedStreamingChatProcessor .Type ((String ) args [0 ])
385
- );
358
+ new ConstructingObjectParser <>("type" , true , args -> new AmazonBedrockUnifiedStreamingChatProcessor .Type ((String ) args [0 ]));
386
359
387
360
static {
388
361
PARSER .declareString (ConstructingObjectParser .optionalConstructorArg (), new ParseField (TOOL_TYPE_FIELD ));
@@ -394,20 +367,6 @@ public static AmazonBedrockUnifiedStreamingChatProcessor.Type parse(XContentPars
394
367
}
395
368
}
396
369
397
-
398
-
399
-
400
-
401
-
402
-
403
-
404
-
405
-
406
-
407
-
408
-
409
-
410
-
411
370
public static class AmazonBedrockChatCompletionChunkParser {
412
371
private static @ Nullable StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Usage usageMetadataToChunk (
413
372
@ Nullable AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata usage
@@ -423,15 +382,17 @@ public static class AmazonBedrockChatCompletionChunkParser {
423
382
}
424
383
425
384
private static StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice blockStartToChoice (
426
- AmazonBedrockUnifiedStreamingChatProcessor .ContentBlockStart blockStart ) {
385
+ AmazonBedrockUnifiedStreamingChatProcessor .ContentBlockStart blockStart
386
+ ) {
427
387
StringBuilder contentTextBuilder = new StringBuilder ();
428
388
List <StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice .Delta .ToolCall > toolCalls = new ArrayList <>();
429
389
430
390
String toolUseId = null ;
431
391
String toolName = null ;
432
392
433
- var toolUseIdAndNameAreNotEmpty =
434
- blockStart .toolUse () != null && blockStart .toolUse ().toolUseId () != null && blockStart .toolUse ().name () != null ;
393
+ var toolUseIdAndNameAreNotEmpty = blockStart .toolUse () != null
394
+ && blockStart .toolUse ().toolUseId () != null
395
+ && blockStart .toolUse ().name () != null ;
435
396
436
397
if (toolUseIdAndNameAreNotEmpty ) {
437
398
toolUseId = blockStart .toolUse ().toolUseId ();
@@ -448,15 +409,19 @@ private static StreamingUnifiedChatCompletionResults.ChatCompletionChunk.Choice
448
409
@ SuppressWarnings ("unchecked" )
449
410
private static final ConstructingObjectParser <StreamingUnifiedChatCompletionResults .ChatCompletionChunk , Void > PARSER =
450
411
new ConstructingObjectParser <>("amazon_bedrock_chat_completion_chunk" , true , args -> {
451
- List <AmazonBedrockUnifiedStreamingChatProcessor .ContentBlockStart > blockStart = (List <AmazonBedrockUnifiedStreamingChatProcessor .ContentBlockStart >) args [0 ];
452
- AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata usage = (AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata ) args [1 ];
412
+ List <AmazonBedrockUnifiedStreamingChatProcessor .ContentBlockStart > blockStart = (List <
413
+ AmazonBedrockUnifiedStreamingChatProcessor .ContentBlockStart >) args [0 ];
414
+ AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata usage =
415
+ (AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata ) args [1 ];
453
416
String modelversion = (String ) args [2 ];
454
417
String responseId = (String ) args [3 ];
455
418
456
419
boolean blockStartIsEmpty = blockStart == null || blockStart .isEmpty ();
457
420
List <StreamingUnifiedChatCompletionResults .ChatCompletionChunk .Choice > choices = blockStartIsEmpty
458
- ? Collections .emptyList ()
459
- : blockStart .stream ().map (AmazonBedrockUnifiedStreamingChatProcessor .AmazonBedrockChatCompletionChunkParser ::blockStartToChoice ).toList ();
421
+ ? Collections .emptyList ()
422
+ : blockStart .stream ()
423
+ .map (AmazonBedrockUnifiedStreamingChatProcessor .AmazonBedrockChatCompletionChunkParser ::blockStartToChoice )
424
+ .toList ();
460
425
461
426
return new StreamingUnifiedChatCompletionResults .ChatCompletionChunk (
462
427
responseId ,
@@ -487,14 +452,11 @@ public static StreamingUnifiedChatCompletionResults.ChatCompletionChunk parse(XC
487
452
}
488
453
}
489
454
490
-
491
455
private record FunctionCall (String name , String args ) {}
492
456
493
457
private static class FunctionCallParser {
494
- private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .FunctionCall , Void > PARSER = new ConstructingObjectParser <>(
495
- FUNCTION_CALL_FIELD ,
496
- true ,
497
- args -> {
458
+ private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .FunctionCall , Void > PARSER =
459
+ new ConstructingObjectParser <>(FUNCTION_CALL_FIELD , true , args -> {
498
460
var name = (String ) args [0 ];
499
461
500
462
@ SuppressWarnings ("unchecked" )
@@ -510,8 +472,7 @@ private static class FunctionCallParser {
510
472
logger .warn ("Failed to parse and convert VertexAI function args to json" , e );
511
473
return new AmazonBedrockUnifiedStreamingChatProcessor .FunctionCall (name , null );
512
474
}
513
- }
514
- );
475
+ });
515
476
516
477
static {
517
478
PARSER .declareString (ConstructingObjectParser .constructorArg (), new ParseField (FUNCTION_NAME_FIELD ));
@@ -526,10 +487,8 @@ public static AmazonBedrockUnifiedStreamingChatProcessor.FunctionCall parse(XCon
526
487
private record UsageMetadata (int promptTokenCount , int candidatesTokenCount , int totalTokenCount ) {}
527
488
528
489
private static class UsageMetadataParser {
529
- private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata , Void > PARSER = new ConstructingObjectParser <>(
530
- USAGE_METADATA_FIELD ,
531
- true ,
532
- args -> {
490
+ private static final ConstructingObjectParser <AmazonBedrockUnifiedStreamingChatProcessor .UsageMetadata , Void > PARSER =
491
+ new ConstructingObjectParser <>(USAGE_METADATA_FIELD , true , args -> {
533
492
if (Objects .isNull (args [0 ]) && Objects .isNull (args [1 ]) && Objects .isNull (args [2 ])) {
534
493
return null ;
535
494
}
@@ -538,8 +497,7 @@ private static class UsageMetadataParser {
538
497
args [1 ] == null ? 0 : (int ) args [1 ],
539
498
args [2 ] == null ? 0 : (int ) args [2 ]
540
499
);
541
- }
542
- );
500
+ });
543
501
544
502
static {
545
503
PARSER .declareInt (ConstructingObjectParser .optionalConstructorArg (), new ParseField (PROMPT_TOKEN_COUNT_FIELD ));
0 commit comments