Skip to content

Commit ff46418

Browse files
authored
[ML] Handle Errors and pre-streaming exceptions (elastic#115868) (elastic#115967)
If we fail to establish a connection to bedrock, the error is returned in the client's CompletableFuture. We will forward it to the listener via the stream processor. Any Errors are thrown on another thread.
1 parent 00d8fd7 commit ff46418

File tree

4 files changed

+15
-3
lines changed

4 files changed

+15
-3
lines changed

docs/changelog/115868.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115868
2+
summary: Forward bedrock connection errors to user
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/amazonbedrock/AmazonBedrockInferenceClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.awssdk.services.bedrockruntime.model.InvokeModelResponse;
2424

2525
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.ExceptionsHelper;
2627
import org.elasticsearch.SpecialPermission;
2728
import org.elasticsearch.action.ActionListener;
2829
import org.elasticsearch.common.xcontent.ChunkedToXContent;
@@ -93,11 +94,15 @@ public Flow.Publisher<? extends ChunkedToXContent> converseStream(ConverseStream
9394
internalClient.converseStream(
9495
request,
9596
ConverseStreamResponseHandler.builder().subscriber(() -> FlowAdapters.toSubscriber(awsResponseProcessor)).build()
96-
);
97+
).exceptionally(e -> {
98+
awsResponseProcessor.onError(e);
99+
return null; // Void
100+
});
97101
return awsResponseProcessor;
98102
}
99103

100104
private void onFailure(ActionListener<?> listener, Throwable t, String method) {
105+
ExceptionsHelper.maybeDieOnAnotherThread(t);
101106
var unwrappedException = t;
102107
if (t instanceof CompletionException || t instanceof ExecutionException) {
103108
unwrappedException = t.getCause() != null ? t.getCause() : t;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/amazonbedrock/AmazonBedrockStreamingChatProcessor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler;
1313

1414
import org.elasticsearch.ElasticsearchException;
15+
import org.elasticsearch.ExceptionsHelper;
1516
import org.elasticsearch.common.util.concurrent.EsExecutors;
1617
import org.elasticsearch.core.Strings;
1718
import org.elasticsearch.logging.LogManager;
@@ -89,14 +90,15 @@ private void sendDownstreamOnAnotherThread(ContentBlockDeltaEvent event) {
8990

9091
@Override
9192
public void onError(Throwable amazonBedrockRuntimeException) {
93+
ExceptionsHelper.maybeDieOnAnotherThread(amazonBedrockRuntimeException);
9294
error.set(
9395
new ElasticsearchException(
9496
Strings.format("AmazonBedrock StreamingChatProcessor failure: [%s]", amazonBedrockRuntimeException.getMessage()),
9597
amazonBedrockRuntimeException
9698
)
9799
);
98100
if (isDone.compareAndSet(false, true) && checkAndResetDemand() && onErrorCalled.compareAndSet(false, true)) {
99-
downstream.onError(error.get());
101+
runOnUtilityThreadPool(() -> downstream.onError(amazonBedrockRuntimeException));
100102
}
101103
}
102104

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public void onNext(ChunkedToXContent item) {
224224
@Override
225225
public void onError(Throwable throwable) {
226226
if (isLastPart.compareAndSet(false, true)) {
227-
logger.error("A failure occurred in ElasticSearch while streaming the response.", throwable);
227+
logger.warn("A failure occurred in ElasticSearch while streaming the response.", throwable);
228228
nextBodyPartListener().onResponse(new ServerSentEventResponseBodyPart(ServerSentEvents.ERROR, errorChunk(throwable)));
229229
}
230230
}

0 commit comments

Comments
 (0)