Skip to content

Commit 43de566

Browse files
committed
address comments
1 parent 9134c19 commit 43de566

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/openai/OpenAiUnifiedChatCompletionResponseHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,10 @@ protected Exception buildError(String message, Request request, HttpResult resul
6868
}
6969

7070
private static Exception buildMidStreamError(Request request, String message, Exception e) {
71-
var restStatus = RestStatus.OK;
7271
var errorResponse = OpenAiErrorResponse.fromString(message);
7372
if (errorResponse instanceof OpenAiErrorResponse oer) {
7473
return new UnifiedChatCompletionException(
75-
restStatus,
74+
RestStatus.INTERNAL_SERVER_ERROR,
7675
format(
7776
"%s for request from inference entity id [%s]. Error message: [%s]",
7877
SERVER_ERROR_OBJECT,
@@ -87,7 +86,7 @@ private static Exception buildMidStreamError(Request request, String message, Ex
8786
return UnifiedChatCompletionException.fromThrowable(e);
8887
} else {
8988
return new UnifiedChatCompletionException(
90-
restStatus,
89+
RestStatus.INTERNAL_SERVER_ERROR,
9190
format("%s for request from inference entity id [%s]", SERVER_ERROR_OBJECT, request.getInferenceEntityId()),
9291
errorResponse != null ? errorResponse.getClass().getSimpleName() : "unknown",
9392
"stream_error"

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/openai/OpenAiUnifiedStreamingProcessor.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class OpenAiUnifiedStreamingProcessor extends DelegatingProcessor<Deque<S
6161

6262
private final BiFunction<String, Exception, Exception> errorParser;
6363
private final Deque<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> buffer = new LinkedBlockingDeque<>();
64-
private volatile boolean previousEventWasNotError = true;
64+
private volatile boolean previousEventWasError = false;
6565

6666
public OpenAiUnifiedStreamingProcessor(BiFunction<String, Exception, Exception> errorParser) {
6767
this.errorParser = errorParser;
@@ -83,19 +83,19 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
8383
var results = new ArrayDeque<StreamingUnifiedChatCompletionResults.ChatCompletionChunk>(item.size());
8484
for (var event : item) {
8585
if (ServerSentEventField.EVENT == event.name() && "error".equals(event.value())) {
86-
previousEventWasNotError = false;
86+
previousEventWasError = true;
8787
} else if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
88-
if (previousEventWasNotError) {
89-
try {
90-
var delta = parse(parserConfig, event);
91-
delta.forEachRemaining(results::offer);
92-
} catch (Exception e) {
93-
logger.warn("Failed to parse event from inference provider: {}", event);
94-
throw errorParser.apply(event.value(), e);
95-
}
96-
} else {
88+
if (previousEventWasError) {
9789
throw errorParser.apply(event.value(), null);
9890
}
91+
92+
try {
93+
var delta = parse(parserConfig, event);
94+
delta.forEachRemaining(results::offer);
95+
} catch (Exception e) {
96+
logger.warn("Failed to parse event from inference provider: {}", event);
97+
throw errorParser.apply(event.value(), e);
98+
}
9999
}
100100
}
101101

0 commit comments

Comments
 (0)