Skip to content

Commit 35c173c

Browse files
committed
[ML] Retry on streaming errors
We now always retry based on the provider's configured retry logic rather than the HTTP status code. Some providers (e.g. Cohere, Anthropic) will return 200 status codes with error bodies, others (e.g. OpenAI, Azure) will return non-200 status codes with non-streaming bodies. Notes: - Refactored from HttpResult to StreamingHttpResult, the byte body is now the streaming element while the http response lives outside the stream. - Refactored StreamingHttpResultPublisher so that it only pushes byte body into a queue. - Tests all now have to wait for the response to be fully consumed before closing the service, otherwise the close method will shut down the mock web server and apache will throw an error.
1 parent 9c19538 commit 35c173c

File tree

15 files changed

+537
-547
lines changed

15 files changed

+537
-547
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.io.IOException;
2727
import java.util.Objects;
2828
import java.util.concurrent.CancellationException;
29-
import java.util.concurrent.Flow;
3029
import java.util.concurrent.atomic.AtomicReference;
3130

3231
import static org.elasticsearch.core.Strings.format;
@@ -149,15 +148,15 @@ private void failUsingUtilityThread(Exception exception, ActionListener<?> liste
149148
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> listener.onFailure(exception));
150149
}
151150

152-
public void stream(HttpRequest request, HttpContext context, ActionListener<Flow.Publisher<HttpResult>> listener) throws IOException {
151+
public void stream(HttpRequest request, HttpContext context, ActionListener<StreamingHttpResult> listener) throws IOException {
153152
// The caller must call start() first before attempting to send a request
154153
assert status.get() == Status.STARTED : "call start() before attempting to send a request";
155154

156155
var streamingProcessor = new StreamingHttpResultPublisher(threadPool, settings, listener);
157156

158157
SocketAccess.doPrivileged(() -> client.execute(request.requestProducer(), streamingProcessor, context, new FutureCallback<>() {
159158
@Override
160-
public void completed(HttpResponse response) {
159+
public void completed(Void response) {
161160
streamingProcessor.close();
162161
}
163162

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.external.http;
9+
10+
import org.apache.http.HttpResponse;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.rest.RestStatus;
14+
15+
import java.io.ByteArrayOutputStream;
16+
import java.util.concurrent.Flow;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
19+
public record StreamingHttpResult(HttpResponse response, Flow.Publisher<byte[]> body) {
20+
public boolean isSuccessfulResponse() {
21+
return RestStatus.isSuccessful(response.getStatusLine().getStatusCode());
22+
}
23+
24+
public Flow.Publisher<HttpResult> toHttpResult() {
25+
return subscriber -> body().subscribe(new Flow.Subscriber<>() {
26+
@Override
27+
public void onSubscribe(Flow.Subscription subscription) {
28+
subscriber.onSubscribe(subscription);
29+
}
30+
31+
@Override
32+
public void onNext(byte[] item) {
33+
subscriber.onNext(new HttpResult(response(), item));
34+
}
35+
36+
@Override
37+
public void onError(Throwable throwable) {
38+
subscriber.onError(throwable);
39+
}
40+
41+
@Override
42+
public void onComplete() {
43+
subscriber.onComplete();
44+
}
45+
});
46+
}
47+
48+
public void readFullResponse(ActionListener<HttpResult> fullResponse) {
49+
var stream = new ByteArrayOutputStream();
50+
AtomicReference<Flow.Subscription> upstream = new AtomicReference<>(null);
51+
body.subscribe(new Flow.Subscriber<>() {
52+
@Override
53+
public void onSubscribe(Flow.Subscription subscription) {
54+
upstream.set(subscription);
55+
upstream.get().request(1);
56+
}
57+
58+
@Override
59+
public void onNext(byte[] item) {
60+
stream.writeBytes(item);
61+
upstream.get().request(1);
62+
}
63+
64+
@Override
65+
public void onError(Throwable throwable) {
66+
ExceptionsHelper.maybeError(throwable).ifPresent(ExceptionsHelper::maybeDieOnAnotherThread);
67+
fullResponse.onFailure(new RuntimeException("Fatal while fully consuming stream", throwable));
68+
}
69+
70+
@Override
71+
public void onComplete() {
72+
fullResponse.onResponse(new HttpResult(response, stream.toByteArray()));
73+
}
74+
});
75+
}
76+
}

0 commit comments

Comments
 (0)