Skip to content

Commit d2eebff

Browse files
authored
[ML] Mitigate IOSession timeouts (#115414) (#115526)
We are seeing exceptions ~0.03% of the time in our integration tests: ``` org.apache.http.ConnectionClosedException: Connection closed unexpectedly ``` The `contentDecoder` does not always fully consume the body within `SimpleInputBuffer.consumeContent`. When we return back to Apache, the rest of the body is never delivered, and the IOSession eventually times out and gets cleaned up. During that cleanup process, Apache calls our Consumer with the above exception. If we read 0 bytes and return back immediately, Apache has a better chance to load the rest of the body/footer, and it will call `consumeContent` again. This reduces the exception rate down to ~0.001%. Fix #114105 Fix #114232 Fix #114327 Fix #114385
1 parent 5021d06 commit d2eebff

File tree

8 files changed

+27
-28
lines changed

8 files changed

+27
-28
lines changed

docs/changelog/115414.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
pr: 115414
2+
summary: Mitigate IOSession timeouts
3+
area: Machine Learning
4+
type: bug
5+
issues:
6+
- 114385
7+
- 114327
8+
- 114105
9+
- 114232

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/StreamingHttpResultPublisher.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,10 @@ public void consumeContent(ContentDecoder contentDecoder, IOControl ioControl) t
9696

9797
try {
9898
var consumed = inputBuffer.consumeContent(contentDecoder);
99-
var allBytes = new byte[consumed];
100-
inputBuffer.read(allBytes);
101-
102-
// we can have empty bytes, don't bother sending them
103-
if (allBytes.length > 0) {
99+
// we could have read 0 bytes if the body was delayed getting in, we need to return out so apache can load the body/footer
100+
if (consumed > 0) {
101+
var allBytes = new byte[consumed];
102+
inputBuffer.read(allBytes);
104103
queue.offer(() -> {
105104
subscriber.onNext(new HttpResult(response, allBytes));
106105
var currentBytesInQueue = bytesInQueue.updateAndGet(current -> Long.max(0, current - allBytes.length));
@@ -111,18 +110,17 @@ public void consumeContent(ContentDecoder contentDecoder, IOControl ioControl) t
111110
}
112111
}
113112
});
114-
}
115113

116-
// always check if totalByteSize > the configured setting in case the settings change
117-
if (bytesInQueue.accumulateAndGet(allBytes.length, Long::sum) >= settings.getMaxResponseSize().getBytes()) {
118-
pauseProducer(ioControl);
119-
}
114+
// always check if totalByteSize > the configured setting in case the settings change
115+
if (bytesInQueue.accumulateAndGet(allBytes.length, Long::sum) >= settings.getMaxResponseSize().getBytes()) {
116+
pauseProducer(ioControl);
117+
}
120118

121-
// always run in case we're waking up from a pause and need to start a new thread
122-
taskRunner.requestNextRun();
119+
taskRunner.requestNextRun();
123120

124-
if (listenerCalled.compareAndSet(false, true)) {
125-
listener.onResponse(this);
121+
if (listenerCalled.compareAndSet(false, true)) {
122+
listener.onResponse(this);
123+
}
126124
}
127125
} finally {
128126
inputBuffer.reset();

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/InferenceEventsAssertion.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.common.xcontent.ChunkedToXContent;
1313
import org.elasticsearch.common.xcontent.XContentHelper;
1414
import org.elasticsearch.inference.InferenceServiceResults;
15-
import org.elasticsearch.test.ESTestCase;
1615
import org.elasticsearch.xcontent.XContentFactory;
1716
import org.hamcrest.MatcherAssert;
1817
import org.hamcrest.Matchers;
@@ -26,6 +25,7 @@
2625
import java.util.concurrent.atomic.AtomicInteger;
2726
import java.util.stream.Stream;
2827

28+
import static org.elasticsearch.test.ESTestCase.fail;
2929
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
3030
import static org.hamcrest.CoreMatchers.is;
3131

@@ -47,7 +47,9 @@ public InferenceEventsAssertion hasFinishedStream() {
4747
}
4848

4949
public InferenceEventsAssertion hasNoErrors() {
50-
MatcherAssert.assertThat("Expected no errors from stream.", error, Matchers.nullValue());
50+
if (error != null) {
51+
fail(error, "Expected no errors from stream.");
52+
}
5153
return this;
5254
}
5355

@@ -66,7 +68,7 @@ public InferenceEventsAssertion hasErrorWithStatusCode(int statusCode) {
6668
}
6769
t = t.getCause();
6870
}
69-
ESTestCase.fail(error, "Expected an underlying ElasticsearchStatusException.");
71+
fail(error, "Expected an underlying ElasticsearchStatusException.");
7072
return this;
7173
}
7274

@@ -79,7 +81,7 @@ public InferenceEventsAssertion hasErrorContaining(String message) {
7981
}
8082
t = t.getCause();
8183
}
82-
ESTestCase.fail(error, "Expected exception to contain string: " + message);
84+
fail(error, "Expected exception to contain string: " + message);
8385
return this;
8486
}
8587

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/anthropic/AnthropicServiceTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,6 @@ public void testInfer_SendsCompletionRequest() throws IOException {
532532
}
533533
}
534534

535-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
536535
public void testInfer_StreamRequest() throws Exception {
537536
String responseJson = """
538537
data: {"type": "message_start", "message": {"model": "claude, probably"}}
@@ -578,7 +577,6 @@ private InferenceServiceResults streamChatCompletion() throws IOException {
578577
}
579578
}
580579

581-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
582580
public void testInfer_StreamRequest_ErrorResponse() throws Exception {
583581
String responseJson = """
584582
data: {"type": "error", "error": {"type": "request_too_large", "message": "blah"}}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/azureaistudio/AzureAiStudioServiceTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,7 +1197,6 @@ public void testInfer_UnauthorisedResponse() throws IOException {
11971197
}
11981198
}
11991199

1200-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
12011200
public void testInfer_StreamRequest() throws Exception {
12021201
String responseJson = """
12031202
data: {\
@@ -1253,7 +1252,6 @@ private InferenceServiceResults streamChatCompletion() throws IOException, URISy
12531252
}
12541253
}
12551254

1256-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
12571255
public void testInfer_StreamRequest_ErrorResponse() throws Exception {
12581256
String responseJson = """
12591257
{

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/azureopenai/AzureOpenAiServiceTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,7 +1322,6 @@ private void testChunkedInfer(AzureOpenAiEmbeddingsModel model) throws IOExcepti
13221322
}
13231323
}
13241324

1325-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
13261325
public void testInfer_StreamRequest() throws Exception {
13271326
String responseJson = """
13281327
data: {\
@@ -1381,7 +1380,6 @@ private InferenceServiceResults streamChatCompletion() throws IOException, URISy
13811380
}
13821381
}
13831382

1384-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
13851383
public void testInfer_StreamRequest_ErrorResponse() throws Exception {
13861384
String responseJson = """
13871385
{

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/cohere/CohereServiceTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,6 @@ public void testDefaultSimilarity() {
15321532
assertEquals(SimilarityMeasure.DOT_PRODUCT, CohereService.defaultSimilarity());
15331533
}
15341534

1535-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
15361535
public void testInfer_StreamRequest() throws Exception {
15371536
String responseJson = """
15381537
{"event_type":"text-generation", "text":"hello"}
@@ -1566,7 +1565,6 @@ private InferenceServiceResults streamChatCompletion() throws IOException {
15661565
}
15671566
}
15681567

1569-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
15701568
public void testInfer_StreamRequest_ErrorResponse() throws Exception {
15711569
String responseJson = """
15721570
{ "event_type":"stream-end", "finish_reason":"ERROR", "response":{ "text": "how dare you" } }

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/openai/OpenAiServiceTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,6 @@ public void testInfer_SendsRequest() throws IOException {
914914
}
915915
}
916916

917-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
918917
public void testInfer_StreamRequest() throws Exception {
919918
String responseJson = """
920919
data: {\
@@ -964,7 +963,6 @@ private InferenceServiceResults streamChatCompletion() throws IOException {
964963
}
965964
}
966965

967-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
968966
public void testInfer_StreamRequest_ErrorResponse() throws Exception {
969967
String responseJson = """
970968
{

0 commit comments

Comments
 (0)