Skip to content

Commit ac735af

Browse files
authored
[ML] Update response parsing for streaming (#112244)
Update Request and ResponseHandler for streaming HttpResult and converting it to InferenceServiceResults. This interface will be implemented in a later change, but it defaults to non-streaming.
1 parent 212fe03 commit ac735af

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/retry/ResponseHandler.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.xpack.inference.external.request.Request;
1414
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
1515

16+
import java.util.concurrent.Flow;
17+
1618
/**
1719
* A contract for clients to specify behavior for handling http responses. Clients can pass this contract to the retry sender to parse
1820
* the response and help with logging.
@@ -47,4 +49,25 @@ public interface ResponseHandler {
4749
* @return a {@link String} indicating the request type that was sent (e.g. elser, elser hugging face etc)
4850
*/
4951
String getRequestType();
52+
53+
/**
54+
* Returns {@code true} if the response handler can handle streaming results, or {@code false} if can only parse the entire payload.
55+
* Defaults to {@code false}.
56+
*/
57+
default boolean canHandleStreamingResponses() {
58+
return false;
59+
}
60+
61+
/**
62+
* A method for parsing the streamed response from the server.
63+
* @param request The original request sent to the server
64+
* @param result The first result that initiated the stream. If the result is HTTP 200, this result will not contain content bytes
65+
* @param flow The remaining stream of results from the server. If the result is HTTP 200, these results will contain content bytes
66+
* @return an inference results with {@link InferenceServiceResults#publisher()} set and {@link InferenceServiceResults#isStreaming()}
67+
* set to true
68+
*/
69+
default InferenceServiceResults parseResult(Request request, HttpResult result, Flow.Publisher<HttpResult> flow) {
70+
assert canHandleStreamingResponses() == false : "This must be implemented when canHandleStreamingResponses() == true";
71+
throw new UnsupportedOperationException("This must be implemented when canHandleStreamingResponses() == true");
72+
}
5073
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/request/Request.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,12 @@ public interface Request {
3131
* @return the unique identifier for the inference entity configuration
3232
*/
3333
String getInferenceEntityId();
34+
35+
/**
36+
* Streams the result in bytes to the {@link org.elasticsearch.inference.InferenceServiceResults}.
37+
* Defaults to false.
38+
*/
39+
default boolean isStreaming() {
40+
return false;
41+
}
3442
}

0 commit comments

Comments
 (0)