Skip to content

Commit 9c7d7fa

Browse files
[ML] Checking for presence of error object when validating response (#118375) (#118795)
* Refactoring error handling logic * Refactoring base response handler to remove duplication * Update docs/changelog/118375.yaml * Addressing feedback --------- Co-authored-by: Max Hniebergall <[email protected]> (cherry picked from commit f4dc716) # Conflicts: # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/alibabacloudsearch/AlibabaCloudSearchResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/anthropic/AnthropicResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/cohere/CohereResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/elastic/ElasticInferenceServiceResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/googleaistudio/GoogleAiStudioResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/googlevertexai/GoogleVertexAiResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/huggingface/HuggingFaceResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/ibmwatsonx/IbmWatsonxResponseHandler.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/openai/OpenAiResponseHandler.java
1 parent 6ff46da commit 9c7d7fa

File tree

32 files changed

+348
-403
lines changed

32 files changed

+348
-403
lines changed

docs/changelog/118375.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118375
2+
summary: Check for presence of error object when validating streaming responses from integrations in the inference API
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/alibabacloudsearch/AlibabaCloudSearchResponseHandler.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@
77

88
package org.elasticsearch.xpack.inference.external.alibabacloudsearch;
99

10-
import org.apache.logging.log4j.Logger;
1110
import org.elasticsearch.xpack.inference.external.http.HttpResult;
1211
import org.elasticsearch.xpack.inference.external.http.retry.BaseResponseHandler;
1312
import org.elasticsearch.xpack.inference.external.http.retry.ResponseParser;
1413
import org.elasticsearch.xpack.inference.external.http.retry.RetryException;
1514
import org.elasticsearch.xpack.inference.external.request.Request;
1615
import org.elasticsearch.xpack.inference.external.response.alibabacloudsearch.AlibabaCloudSearchErrorResponseEntity;
17-
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
18-
19-
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
2016

2117
/**
2218
* Defines how to handle various errors returned from the AlibabaCloudSearch integration.
@@ -27,21 +23,15 @@ public AlibabaCloudSearchResponseHandler(String requestType, ResponseParser pars
2723
super(requestType, parseFunction, AlibabaCloudSearchErrorResponseEntity::fromResponse);
2824
}
2925

30-
@Override
31-
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result)
32-
throws RetryException {
33-
checkForFailureStatusCode(request, result);
34-
checkForEmptyBody(throttlerManager, logger, request, result);
35-
}
36-
3726
/**
3827
* Validates the status code throws an RetryException if not in the range [200, 300).
3928
*
4029
* @param request The http request
4130
* @param result The http response and body
4231
* @throws RetryException Throws if status code is {@code >= 300 or < 200 }
4332
*/
44-
void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
33+
@Override
34+
protected void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
4535
int statusCode = result.response().getStatusLine().getStatusCode();
4636
if (statusCode >= 200 && statusCode < 300) {
4737
return;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/anthropic/AnthropicResponseHandler.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.inference.external.anthropic;
99

10-
import org.apache.logging.log4j.Logger;
1110
import org.elasticsearch.common.Strings;
1211
import org.elasticsearch.inference.InferenceServiceResults;
1312
import org.elasticsearch.xpack.core.inference.results.StreamingChatCompletionResults;
@@ -19,11 +18,9 @@
1918
import org.elasticsearch.xpack.inference.external.response.ErrorMessageResponseEntity;
2019
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventParser;
2120
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventProcessor;
22-
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
2321

2422
import java.util.concurrent.Flow;
2523

26-
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
2724
import static org.elasticsearch.xpack.inference.external.http.retry.ResponseHandlerUtils.getFirstHeaderOrUnknown;
2825

2926
public class AnthropicResponseHandler extends BaseResponseHandler {
@@ -54,13 +51,6 @@ public AnthropicResponseHandler(String requestType, ResponseParser parseFunction
5451
this.canHandleStreamingResponses = canHandleStreamingResponses;
5552
}
5653

57-
@Override
58-
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result)
59-
throws RetryException {
60-
checkForFailureStatusCode(request, result);
61-
checkForEmptyBody(throttlerManager, logger, request, result);
62-
}
63-
6454
@Override
6555
public boolean canHandleStreamingResponses() {
6656
return canHandleStreamingResponses;
@@ -83,7 +73,8 @@ public InferenceServiceResults parseResult(Request request, Flow.Publisher<HttpR
8373
* @param result The http response and body
8474
* @throws RetryException Throws if status code is {@code >= 300 or < 200 }
8575
*/
86-
void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
76+
@Override
77+
protected void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
8778
int statusCode = result.response().getStatusLine().getStatusCode();
8879
if (statusCode >= 200 && statusCode < 300) {
8980
return;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/cohere/CohereResponseHandler.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.inference.external.cohere;
99

10-
import org.apache.logging.log4j.Logger;
1110
import org.elasticsearch.inference.InferenceServiceResults;
1211
import org.elasticsearch.xpack.core.inference.results.StreamingChatCompletionResults;
1312
import org.elasticsearch.xpack.inference.external.http.HttpResult;
@@ -17,12 +16,9 @@
1716
import org.elasticsearch.xpack.inference.external.request.Request;
1817
import org.elasticsearch.xpack.inference.external.response.cohere.CohereErrorResponseEntity;
1918
import org.elasticsearch.xpack.inference.external.response.streaming.NewlineDelimitedByteProcessor;
20-
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
2119

2220
import java.util.concurrent.Flow;
2321

24-
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
25-
2622
/**
2723
* Defines how to handle various errors returned from the Cohere integration.
2824
*
@@ -45,13 +41,6 @@ public CohereResponseHandler(String requestType, ResponseParser parseFunction, b
4541
this.canHandleStreamingResponse = canHandleStreamingResponse;
4642
}
4743

48-
@Override
49-
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result)
50-
throws RetryException {
51-
checkForFailureStatusCode(request, result);
52-
checkForEmptyBody(throttlerManager, logger, request, result);
53-
}
54-
5544
@Override
5645
public boolean canHandleStreamingResponses() {
5746
return canHandleStreamingResponse;
@@ -73,7 +62,8 @@ public InferenceServiceResults parseResult(Request request, Flow.Publisher<HttpR
7362
* @param result The http response and body
7463
* @throws RetryException Throws if status code is {@code >= 300 or < 200 }
7564
*/
76-
void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
65+
@Override
66+
protected void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
7767
int statusCode = result.response().getStatusLine().getStatusCode();
7868
if (statusCode >= 200 && statusCode < 300) {
7969
return;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/elastic/ElasticInferenceServiceResponseHandler.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,13 @@
77

88
package org.elasticsearch.xpack.inference.external.elastic;
99

10-
import org.apache.logging.log4j.Logger;
1110
import org.elasticsearch.xpack.inference.external.http.HttpResult;
1211
import org.elasticsearch.xpack.inference.external.http.retry.BaseResponseHandler;
1312
import org.elasticsearch.xpack.inference.external.http.retry.ContentTooLargeException;
1413
import org.elasticsearch.xpack.inference.external.http.retry.ResponseParser;
1514
import org.elasticsearch.xpack.inference.external.http.retry.RetryException;
1615
import org.elasticsearch.xpack.inference.external.request.Request;
1716
import org.elasticsearch.xpack.inference.external.response.elastic.ElasticInferenceServiceErrorResponseEntity;
18-
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
19-
20-
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
2117

2218
public class ElasticInferenceServiceResponseHandler extends BaseResponseHandler {
2319

@@ -26,13 +22,7 @@ public ElasticInferenceServiceResponseHandler(String requestType, ResponseParser
2622
}
2723

2824
@Override
29-
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result)
30-
throws RetryException {
31-
checkForFailureStatusCode(request, result);
32-
checkForEmptyBody(throttlerManager, logger, request, result);
33-
}
34-
35-
void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
25+
protected void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
3626
int statusCode = result.response().getStatusLine().getStatusCode();
3727
if (statusCode >= 200 && statusCode < 300) {
3828
return;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/googleaistudio/GoogleAiStudioResponseHandler.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.inference.external.googleaistudio;
99

10-
import org.apache.logging.log4j.Logger;
1110
import org.elasticsearch.core.CheckedFunction;
1211
import org.elasticsearch.inference.InferenceServiceResults;
1312
import org.elasticsearch.xcontent.XContentParser;
@@ -20,13 +19,11 @@
2019
import org.elasticsearch.xpack.inference.external.response.googleaistudio.GoogleAiStudioErrorResponseEntity;
2120
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventParser;
2221
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventProcessor;
23-
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
2422

2523
import java.io.IOException;
2624
import java.util.concurrent.Flow;
2725

2826
import static org.elasticsearch.core.Strings.format;
29-
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
3027

3128
public class GoogleAiStudioResponseHandler extends BaseResponseHandler {
3229

@@ -52,13 +49,6 @@ public GoogleAiStudioResponseHandler(
5249
this.content = content;
5350
}
5451

55-
@Override
56-
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result)
57-
throws RetryException {
58-
checkForFailureStatusCode(request, result);
59-
checkForEmptyBody(throttlerManager, logger, request, result);
60-
}
61-
6252
/**
6353
* Validates the status code and throws a RetryException if not in the range [200, 300).
6454
*
@@ -67,7 +57,8 @@ public void validateResponse(ThrottlerManager throttlerManager, Logger logger, R
6757
* @param result The http response and body
6858
* @throws RetryException Throws if status code is {@code >= 300 or < 200 }
6959
*/
70-
void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
60+
@Override
61+
protected void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
7162
int statusCode = result.response().getStatusLine().getStatusCode();
7263
if (statusCode >= 200 && statusCode < 300) {
7364
return;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/googlevertexai/GoogleVertexAiResponseHandler.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,14 @@
77

88
package org.elasticsearch.xpack.inference.external.googlevertexai;
99

10-
import org.apache.logging.log4j.Logger;
1110
import org.elasticsearch.xpack.inference.external.http.HttpResult;
1211
import org.elasticsearch.xpack.inference.external.http.retry.BaseResponseHandler;
1312
import org.elasticsearch.xpack.inference.external.http.retry.ResponseParser;
1413
import org.elasticsearch.xpack.inference.external.http.retry.RetryException;
1514
import org.elasticsearch.xpack.inference.external.request.Request;
1615
import org.elasticsearch.xpack.inference.external.response.googlevertexai.GoogleVertexAiErrorResponseEntity;
17-
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
1816

1917
import static org.elasticsearch.core.Strings.format;
20-
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
2118

2219
public class GoogleVertexAiResponseHandler extends BaseResponseHandler {
2320

@@ -28,13 +25,7 @@ public GoogleVertexAiResponseHandler(String requestType, ResponseParser parseFun
2825
}
2926

3027
@Override
31-
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result)
32-
throws RetryException {
33-
checkForFailureStatusCode(request, result);
34-
checkForEmptyBody(throttlerManager, logger, request, result);
35-
}
36-
37-
void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
28+
protected void checkForFailureStatusCode(Request request, HttpResult result) throws RetryException {
3829
int statusCode = result.response().getStatusLine().getStatusCode();
3930
if (statusCode >= 200 && statusCode < 300) {
4031
return;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/retry/BaseResponseHandler.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,20 @@
77

88
package org.elasticsearch.xpack.inference.external.http.retry;
99

10+
import org.apache.logging.log4j.Logger;
1011
import org.elasticsearch.ElasticsearchStatusException;
12+
import org.elasticsearch.common.Strings;
1113
import org.elasticsearch.inference.InferenceServiceResults;
1214
import org.elasticsearch.rest.RestStatus;
1315
import org.elasticsearch.xpack.inference.external.http.HttpResult;
1416
import org.elasticsearch.xpack.inference.external.request.Request;
17+
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
1518

1619
import java.util.Objects;
1720
import java.util.function.Function;
1821

1922
import static org.elasticsearch.core.Strings.format;
23+
import static org.elasticsearch.xpack.inference.external.http.HttpUtils.checkForEmptyBody;
2024

2125
public abstract class BaseResponseHandler implements ResponseHandler {
2226

@@ -27,14 +31,15 @@ public abstract class BaseResponseHandler implements ResponseHandler {
2731
public static final String REDIRECTION = "Unhandled redirection";
2832
public static final String CONTENT_TOO_LARGE = "Received a content too large status code";
2933
public static final String UNSUCCESSFUL = "Received an unsuccessful status code";
34+
public static final String SERVER_ERROR_OBJECT = "Received an error response";
3035
public static final String BAD_REQUEST = "Received a bad request status code";
3136
public static final String METHOD_NOT_ALLOWED = "Received a method not allowed status code";
3237

3338
protected final String requestType;
3439
private final ResponseParser parseFunction;
35-
private final Function<HttpResult, ErrorMessage> errorParseFunction;
40+
private final Function<HttpResult, ErrorResponse> errorParseFunction;
3641

37-
public BaseResponseHandler(String requestType, ResponseParser parseFunction, Function<HttpResult, ErrorMessage> errorParseFunction) {
42+
public BaseResponseHandler(String requestType, ResponseParser parseFunction, Function<HttpResult, ErrorResponse> errorParseFunction) {
3843
this.requestType = Objects.requireNonNull(requestType);
3944
this.parseFunction = Objects.requireNonNull(parseFunction);
4045
this.errorParseFunction = Objects.requireNonNull(errorParseFunction);
@@ -54,11 +59,42 @@ public String getRequestType() {
5459
return requestType;
5560
}
5661

62+
@Override
63+
public void validateResponse(ThrottlerManager throttlerManager, Logger logger, Request request, HttpResult result) {
64+
checkForFailureStatusCode(request, result);
65+
checkForEmptyBody(throttlerManager, logger, request, result);
66+
67+
// When the response is streamed the status code could be 200 but the error object will be set
68+
// so we need to check for that specifically
69+
checkForErrorObject(request, result);
70+
}
71+
72+
protected abstract void checkForFailureStatusCode(Request request, HttpResult result);
73+
74+
private void checkForErrorObject(Request request, HttpResult result) {
75+
var errorEntity = errorParseFunction.apply(result);
76+
77+
if (errorEntity.errorStructureFound()) {
78+
// We don't really know what happened because the status code was 200 so we'll return a failure and let the
79+
// client retry if necessary
80+
// If we did want to retry here, we'll need to determine if this was a streaming request, if it was
81+
// we shouldn't retry because that would replay the entire streaming request and the client would get
82+
// duplicate chunks back
83+
throw new RetryException(false, buildError(SERVER_ERROR_OBJECT, request, result, errorEntity));
84+
}
85+
}
86+
5787
protected Exception buildError(String message, Request request, HttpResult result) {
5888
var errorEntityMsg = errorParseFunction.apply(result);
89+
return buildError(message, request, result, errorEntityMsg);
90+
}
91+
92+
protected Exception buildError(String message, Request request, HttpResult result, ErrorResponse errorResponse) {
5993
var responseStatusCode = result.response().getStatusLine().getStatusCode();
6094

61-
if (errorEntityMsg == null) {
95+
if (errorResponse == null
96+
|| errorResponse.errorStructureFound() == false
97+
|| Strings.isNullOrEmpty(errorResponse.getErrorMessage())) {
6298
return new ElasticsearchStatusException(
6399
format(
64100
"%s for request from inference entity id [%s] status [%s]",
@@ -76,7 +112,7 @@ protected Exception buildError(String message, Request request, HttpResult resul
76112
message,
77113
request.getInferenceEntityId(),
78114
responseStatusCode,
79-
errorEntityMsg.getErrorMessage()
115+
errorResponse.getErrorMessage()
80116
),
81117
toRestStatus(responseStatusCode)
82118
);

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/retry/ErrorMessage.java

Lines changed: 0 additions & 12 deletions
This file was deleted.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.external.http.retry;
9+
10+
import java.util.Objects;
11+
12+
public class ErrorResponse {
13+
14+
// Denotes an error object that was not found
15+
public static final ErrorResponse UNDEFINED_ERROR = new ErrorResponse(false);
16+
17+
private final String errorMessage;
18+
private final boolean errorStructureFound;
19+
20+
public ErrorResponse(String errorMessage) {
21+
this.errorMessage = Objects.requireNonNull(errorMessage);
22+
this.errorStructureFound = true;
23+
}
24+
25+
private ErrorResponse(boolean errorStructureFound) {
26+
this.errorMessage = "";
27+
this.errorStructureFound = errorStructureFound;
28+
}
29+
30+
public String getErrorMessage() {
31+
return errorMessage;
32+
}
33+
34+
public boolean errorStructureFound() {
35+
return errorStructureFound;
36+
}
37+
}

0 commit comments

Comments
 (0)