Skip to content

Commit 3801866

Browse files
prwhelanelasticsearchmachine
andauthored
[ML] Retry on streaming errors (#123076) (#124032)
* [ML] Retry on streaming errors (#123076) We now always retry based on the provider's configured retry logic rather than the HTTP status code. Some providers (e.g. Cohere, Anthropic) will return 200 status codes with error bodies, others (e.g. OpenAI, Azure) will return non-200 status codes with non-streaming bodies. Notes: - Refactored from HttpResult to StreamingHttpResult, the byte body is now the streaming element while the http response lives outside the stream. - Refactored StreamingHttpResultPublisher so that it only pushes byte body into a queue. - Tests all now have to wait for the response to be fully consumed before closing the service, otherwise the close method will shut down the mock web server and apache will throw an error. * [CI] Auto commit changes from spotless * Use old isSuccess API --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 017d7bf commit 3801866

File tree

16 files changed

+542
-547
lines changed

16 files changed

+542
-547
lines changed

docs/changelog/123076.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123076
2+
summary: Retry on streaming errors
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClient.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.io.IOException;
2828
import java.util.Objects;
2929
import java.util.concurrent.CancellationException;
30-
import java.util.concurrent.Flow;
3130
import java.util.concurrent.atomic.AtomicReference;
3231

3332
import static org.elasticsearch.core.Strings.format;
@@ -154,15 +153,15 @@ private void failUsingUtilityThread(Exception exception, ActionListener<?> liste
154153
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> listener.onFailure(exception));
155154
}
156155

157-
public void stream(HttpRequest request, HttpContext context, ActionListener<Flow.Publisher<HttpResult>> listener) throws IOException {
156+
public void stream(HttpRequest request, HttpContext context, ActionListener<StreamingHttpResult> listener) throws IOException {
158157
// The caller must call start() first before attempting to send a request
159158
assert status.get() == Status.STARTED : "call start() before attempting to send a request";
160159

161160
var streamingProcessor = new StreamingHttpResultPublisher(threadPool, settings, listener);
162161

163162
SocketAccess.doPrivileged(() -> client.execute(request.requestProducer(), streamingProcessor, context, new FutureCallback<>() {
164163
@Override
165-
public void completed(HttpResponse response) {
164+
public void completed(Void response) {
166165
streamingProcessor.close();
167166
}
168167

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.external.http;
9+
10+
import org.apache.http.HttpResponse;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.action.ActionListener;
13+
14+
import java.io.ByteArrayOutputStream;
15+
import java.util.concurrent.Flow;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
public record StreamingHttpResult(HttpResponse response, Flow.Publisher<byte[]> body) {
19+
public boolean isSuccessfulResponse() {
20+
var code = response.getStatusLine().getStatusCode();
21+
return code >= 200 && code < 300;
22+
}
23+
24+
public Flow.Publisher<HttpResult> toHttpResult() {
25+
return subscriber -> body().subscribe(new Flow.Subscriber<>() {
26+
@Override
27+
public void onSubscribe(Flow.Subscription subscription) {
28+
subscriber.onSubscribe(subscription);
29+
}
30+
31+
@Override
32+
public void onNext(byte[] item) {
33+
subscriber.onNext(new HttpResult(response(), item));
34+
}
35+
36+
@Override
37+
public void onError(Throwable throwable) {
38+
subscriber.onError(throwable);
39+
}
40+
41+
@Override
42+
public void onComplete() {
43+
subscriber.onComplete();
44+
}
45+
});
46+
}
47+
48+
public void readFullResponse(ActionListener<HttpResult> fullResponse) {
49+
var stream = new ByteArrayOutputStream();
50+
AtomicReference<Flow.Subscription> upstream = new AtomicReference<>(null);
51+
body.subscribe(new Flow.Subscriber<>() {
52+
@Override
53+
public void onSubscribe(Flow.Subscription subscription) {
54+
upstream.set(subscription);
55+
upstream.get().request(1);
56+
}
57+
58+
@Override
59+
public void onNext(byte[] item) {
60+
stream.writeBytes(item);
61+
upstream.get().request(1);
62+
}
63+
64+
@Override
65+
public void onError(Throwable throwable) {
66+
ExceptionsHelper.maybeError(throwable).ifPresent(ExceptionsHelper::maybeDieOnAnotherThread);
67+
fullResponse.onFailure(new RuntimeException("Fatal while fully consuming stream", throwable));
68+
}
69+
70+
@Override
71+
public void onComplete() {
72+
fullResponse.onResponse(new HttpResult(response, stream.toByteArray()));
73+
}
74+
});
75+
}
76+
}

0 commit comments

Comments
 (0)