7
7
8
8
package org .elasticsearch .xpack .inference .services .amazonbedrock .client ;
9
9
10
- import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
11
- import org .elasticsearch .xcontent .XContentFactory ;
12
- import org .elasticsearch .xcontent .XContentParser ;
13
- import org .elasticsearch .xcontent .XContentParserConfiguration ;
14
- import org .elasticsearch .xcontent .XContentType ;
15
- import org .elasticsearch .xpack .core .inference .results .StreamingUnifiedChatCompletionResults ;
16
-
17
10
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockDeltaEvent ;
18
11
import software .amazon .awssdk .services .bedrockruntime .model .ContentBlockStartEvent ;
19
12
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamMetadataEvent ;
20
13
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamOutput ;
21
14
import software .amazon .awssdk .services .bedrockruntime .model .ConverseStreamResponseHandler ;
15
+ import software .amazon .awssdk .services .bedrockruntime .model .MessageStartEvent ;
22
16
23
17
import org .elasticsearch .ElasticsearchException ;
24
18
import org .elasticsearch .ExceptionsHelper ;
25
19
import org .elasticsearch .common .util .concurrent .EsExecutors ;
20
+ import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
26
21
import org .elasticsearch .core .Strings ;
27
22
import org .elasticsearch .logging .LogManager ;
28
23
import org .elasticsearch .logging .Logger ;
29
24
import org .elasticsearch .threadpool .ThreadPool ;
30
-
31
- import software .amazon .awssdk .services .bedrockruntime .model .MessageStartEvent ;
25
+ import org .elasticsearch .xcontent .XContentFactory ;
26
+ import org .elasticsearch .xcontent .XContentParser ;
27
+ import org .elasticsearch .xcontent .XContentParserConfiguration ;
28
+ import org .elasticsearch .xcontent .XContentType ;
29
+ import org .elasticsearch .xpack .core .inference .results .StreamingUnifiedChatCompletionResults ;
32
30
33
31
import java .io .IOException ;
34
32
import java .util .ArrayDeque ;
45
43
import static org .elasticsearch .xpack .inference .external .response .XContentUtils .positionParserAtTokenAfterField ;
46
44
47
45
@ SuppressWarnings ("checkstyle:LineLength" )
48
- class AmazonBedrockUnifiedStreamingChatProcessor implements Flow .Processor <ConverseStreamOutput , StreamingUnifiedChatCompletionResults .Results > {
46
+ class AmazonBedrockUnifiedStreamingChatProcessor
47
+ implements
48
+ Flow .Processor <ConverseStreamOutput , StreamingUnifiedChatCompletionResults .Results > {
49
49
private static final Logger logger = LogManager .getLogger (AmazonBedrockStreamingChatProcessor .class );
50
50
private static final String FAILED_TO_FIND_FIELD_TEMPLATE = "Failed to find required field [%s] in Anthropic chat completions response" ;
51
51
@@ -94,32 +94,41 @@ public void onNext(ConverseStreamOutput item) {
94
94
switch (eventType ) {
95
95
case ConverseStreamOutput .EventType .MESSAGE_START -> {
96
96
demand .set (0 ); // reset demand before we fork to another thread
97
- item .accept (ConverseStreamResponseHandler .Visitor .builder ()
98
- .onMessageStart (event -> handleMessageStart (event , chunks , parserConfig )).build ());
97
+ item .accept (
98
+ ConverseStreamResponseHandler .Visitor .builder ()
99
+ .onMessageStart (event -> handleMessageStart (event , chunks , parserConfig ))
100
+ .build ()
101
+ );
99
102
return ;
100
103
}
101
104
case ConverseStreamOutput .EventType .CONTENT_BLOCK_START -> {
102
105
demand .set (0 ); // reset demand before we fork to another thread
103
- item .accept (ConverseStreamResponseHandler .Visitor .builder ()
104
- .onContentBlockStart (event -> handleContentBlockStart (event , chunks , parserConfig )).build ());
106
+ item .accept (
107
+ ConverseStreamResponseHandler .Visitor .builder ()
108
+ .onContentBlockStart (event -> handleContentBlockStart (event , chunks , parserConfig ))
109
+ .build ()
110
+ );
105
111
return ;
106
112
}
107
113
case ConverseStreamOutput .EventType .CONTENT_BLOCK_DELTA -> {
108
114
demand .set (0 ); // reset demand before we fork to another thread
109
- item .accept (ConverseStreamResponseHandler .Visitor .builder ()
110
- .onContentBlockDelta (event -> handleContentBlockDelta (event , chunks , parserConfig )).build ());
115
+ item .accept (
116
+ ConverseStreamResponseHandler .Visitor .builder ()
117
+ .onContentBlockDelta (event -> handleContentBlockDelta (event , chunks , parserConfig ))
118
+ .build ()
119
+ );
111
120
return ;
112
121
}
113
122
case ConverseStreamOutput .EventType .METADATA -> {
114
123
demand .set (0 ); // reset demand before we fork to another thread
115
- item .accept (ConverseStreamResponseHandler .Visitor .builder ()
116
- .onMetadata (event -> handleMetadata (event , chunks , parserConfig )).build ());
124
+ item .accept (
125
+ ConverseStreamResponseHandler .Visitor .builder ().onMetadata (event -> handleMetadata (event , chunks , parserConfig )).build ()
126
+ );
117
127
return ;
118
128
}
119
129
case ConverseStreamOutput .EventType .MESSAGE_STOP -> {
120
130
demand .set (0 ); // reset demand before we fork to another thread
121
- item .accept (ConverseStreamResponseHandler .Visitor .builder ()
122
- .onMessageStop (event -> Stream .empty ()).build ());
131
+ item .accept (ConverseStreamResponseHandler .Visitor .builder ().onMessageStop (event -> Stream .empty ()).build ());
123
132
return ;
124
133
}
125
134
default -> {
@@ -136,7 +145,9 @@ public void onNext(ConverseStreamOutput item) {
136
145
137
146
private void handleMessageStart (
138
147
MessageStartEvent event ,
139
- ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks , XContentParserConfiguration parserConfig ) {
148
+ ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks ,
149
+ XContentParserConfiguration parserConfig
150
+ ) {
140
151
runOnUtilityThreadPool (() -> {
141
152
var data = event .role ().name ();
142
153
try {
@@ -150,8 +161,11 @@ private void handleMessageStart(
150
161
});
151
162
}
152
163
153
- private void handleContentBlockStart (ContentBlockStartEvent event ,
154
- ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks , XContentParserConfiguration parserConfig ) {
164
+ private void handleContentBlockStart (
165
+ ContentBlockStartEvent event ,
166
+ ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks ,
167
+ XContentParserConfiguration parserConfig
168
+ ) {
155
169
var data = event .start ().toString ();
156
170
try {
157
171
var contentBlockStart = parseContentBlockStart (parserConfig , data );
@@ -163,8 +177,11 @@ private void handleContentBlockStart(ContentBlockStartEvent event,
163
177
downstream .onNext (results );
164
178
}
165
179
166
- private void handleContentBlockDelta (ContentBlockDeltaEvent event ,
167
- ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks , XContentParserConfiguration parserConfig ) {
180
+ private void handleContentBlockDelta (
181
+ ContentBlockDeltaEvent event ,
182
+ ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks ,
183
+ XContentParserConfiguration parserConfig
184
+ ) {
168
185
runOnUtilityThreadPool (() -> {
169
186
var data = event .delta ().toString ();
170
187
try {
@@ -180,7 +197,9 @@ private void handleContentBlockDelta(ContentBlockDeltaEvent event,
180
197
181
198
private void handleMetadata (
182
199
ConverseStreamMetadataEvent event ,
183
- ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks , XContentParserConfiguration parserConfig ) {
200
+ ArrayDeque <StreamingUnifiedChatCompletionResults .ChatCompletionChunk > chunks ,
201
+ XContentParserConfiguration parserConfig
202
+ ) {
184
203
runOnUtilityThreadPool (() -> {
185
204
var data = event .toString ();
186
205
try {
@@ -282,7 +301,6 @@ public void cancel() {
282
301
public static final String ROLE_FIELD = "role" ;
283
302
public static final String CONTENT_FIELD = "content" ;
284
303
285
-
286
304
public static final String INDEX_FIELD = "index" ;
287
305
public static final String NAME_FIELD = "name" ;
288
306
public static final String INPUT_TOKENS_FIELD = "input_tokens" ;
0 commit comments