Skip to content

Commit 39fe811

Browse files
authored
Merge branch 'main' into esql-reranker-boostrap
2 parents 109a3dc + 69180ea commit 39fe811

File tree

16 files changed

+212
-158
lines changed

16 files changed

+212
-158
lines changed

muted-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ tests:
389389
- class: org.elasticsearch.xpack.esql.inference.RerankOperatorTests
390390
method: testSimpleCircuitBreaking
391391
issue: https://github.com/elastic/elasticsearch/issues/124337
392+
- class: org.elasticsearch.index.engine.ThreadPoolMergeSchedulerTests
393+
method: testSchedulerCloseWaitsForRunningMerge
394+
issue: https://github.com/elastic/elasticsearch/issues/125236
392395

393396
# Examples:
394397
#

x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -299,17 +299,13 @@ public void testUnsupportedStream() throws Exception {
299299

300300
try {
301301
var events = streamInferOnMockService(modelId, TaskType.SPARSE_EMBEDDING, List.of(randomUUID()), null);
302-
assertThat(events.size(), equalTo(2));
302+
assertThat(events.size(), equalTo(1));
303303
events.forEach(event -> {
304-
switch (event.name()) {
305-
case EVENT -> assertThat(event.value(), equalToIgnoringCase("error"));
306-
case DATA -> assertThat(
307-
event.value(),
308-
containsString(
309-
"Streaming is not allowed for service [streaming_completion_test_service] and task [sparse_embedding]"
310-
)
311-
);
312-
}
304+
assertThat(event.type(), equalToIgnoringCase("error"));
305+
assertThat(
306+
event.data(),
307+
containsString("Streaming is not allowed for service [streaming_completion_test_service] and task [sparse_embedding]")
308+
);
313309
});
314310
} finally {
315311
deleteModel(modelId);
@@ -331,12 +327,10 @@ public void testSupportedStream() throws Exception {
331327
input.stream().map(s -> s.toUpperCase(Locale.ROOT)).map(str -> "{\"completion\":[{\"delta\":\"" + str + "\"}]}"),
332328
Stream.of("[DONE]")
333329
).iterator();
334-
assertThat(events.size(), equalTo((input.size() + 1) * 2));
330+
assertThat(events.size(), equalTo(input.size() + 1));
335331
events.forEach(event -> {
336-
switch (event.name()) {
337-
case EVENT -> assertThat(event.value(), equalToIgnoringCase("message"));
338-
case DATA -> assertThat(event.value(), equalTo(expectedResponses.next()));
339-
}
332+
assertThat(event.type(), equalToIgnoringCase("message"));
333+
assertThat(event.data(), equalTo(expectedResponses.next()));
340334
});
341335
} finally {
342336
deleteModel(modelId);
@@ -359,12 +353,10 @@ public void testUnifiedCompletionInference() throws Exception {
359353
VALIDATE_ELASTIC_PRODUCT_HEADER_CONSUMER
360354
);
361355
var expectedResponses = expectedResultsIterator(input);
362-
assertThat(events.size(), equalTo((input.size() + 1) * 2));
356+
assertThat(events.size(), equalTo(input.size() + 1));
363357
events.forEach(event -> {
364-
switch (event.name()) {
365-
case EVENT -> assertThat(event.value(), equalToIgnoringCase("message"));
366-
case DATA -> assertThat(event.value(), equalTo(expectedResponses.next()));
367-
}
358+
assertThat(event.type(), equalToIgnoringCase("message"));
359+
assertThat(event.data(), equalTo(expectedResponses.next()));
368360
});
369361
} finally {
370362
deleteModel(modelId);

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.elasticsearch.xpack.core.inference.action.InferenceAction;
5151
import org.elasticsearch.xpack.core.inference.results.XContentFormattedException;
5252
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
53-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
5453
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventParser;
5554

5655
import java.io.IOException;
@@ -353,9 +352,8 @@ private static class RandomStringCollector {
353352
private void collect(String str) throws IOException {
354353
sseParser.parse(str.getBytes(StandardCharsets.UTF_8))
355354
.stream()
356-
.filter(event -> event.name() == ServerSentEventField.DATA)
357-
.filter(ServerSentEvent::hasValue)
358-
.map(ServerSentEvent::value)
355+
.filter(ServerSentEvent::hasData)
356+
.map(ServerSentEvent::data)
359357
.forEach(stringsVerified::offer);
360358
}
361359
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/DelegatingProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.xcontent.XContentParserConfiguration;
1313
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
14-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
1514

1615
import java.io.IOException;
1716
import java.util.ArrayDeque;
@@ -40,7 +39,7 @@ public static <ParsedChunk> Deque<ParsedChunk> parseEvent(
4039
) throws Exception {
4140
var results = new ArrayDeque<ParsedChunk>(item.size());
4241
for (ServerSentEvent event : item) {
43-
if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
42+
if (event.hasData()) {
4443
try {
4544
var delta = parseFunction.apply(parserConfig, event);
4645
delta.forEachRemaining(results::offer);

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/response/streaming/ServerSentEvent.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,26 @@
99

1010
/**
1111
* Server-Sent Event message: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
12-
* Messages always contain a {@link ServerSentEventField} and a non-null payload value.
13-
* When the stream is parsed and there is no value associated with a {@link ServerSentEventField}, an empty-string is set as the value.
1412
*/
15-
public record ServerSentEvent(ServerSentEventField name, String value) {
13+
public record ServerSentEvent(String type, String data) {
1614

1715
private static final String EMPTY = "";
16+
private static final String MESSAGE = "message";
1817

19-
public ServerSentEvent(ServerSentEventField name) {
20-
this(name, EMPTY);
18+
public static ServerSentEvent empty() {
19+
return new ServerSentEvent(EMPTY, EMPTY);
2120
}
2221

23-
// treat null value as an empty string, don't break parsing
24-
public ServerSentEvent(ServerSentEventField name, String value) {
25-
this.name = name;
26-
this.value = value != null ? value : EMPTY;
22+
public ServerSentEvent(String data) {
23+
this(MESSAGE, data);
2724
}
2825

29-
public boolean hasValue() {
30-
return value.isBlank() == false;
26+
public ServerSentEvent {
27+
data = data != null ? data : EMPTY;
28+
type = type != null && type.isBlank() == false ? type : MESSAGE;
29+
}
30+
31+
public boolean hasData() {
32+
return data.isBlank() == false;
3133
}
3234
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/response/streaming/ServerSentEventField.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/response/streaming/ServerSentEventParser.java

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import java.nio.charset.StandardCharsets;
1111
import java.util.ArrayDeque;
1212
import java.util.Deque;
13+
import java.util.Locale;
1314
import java.util.Optional;
14-
import java.util.regex.Pattern;
1515

1616
/**
1717
* https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
@@ -20,11 +20,14 @@
2020
* If the line starts with a colon, we discard this event.
2121
* If the line contains a colon, we process it into {@link ServerSentEvent} with a non-empty value.
2222
* If the line does not contain a colon, we process it into {@link ServerSentEvent}with an empty string value.
23-
* If the line's field is not one of {@link ServerSentEventField}, we discard this event.
23+
* If the line's field is not one of (data, event), we discard this event. `id` and `retry` are not implemented, because we do not use them
24+
* and have no plans to use them.
2425
*/
2526
public class ServerSentEventParser {
26-
private static final Pattern END_OF_LINE_REGEX = Pattern.compile("\\n|\\r|\\r\\n");
2727
private static final String BOM = "\uFEFF";
28+
private static final String TYPE_FIELD = "event";
29+
private static final String DATA_FIELD = "data";
30+
private final EventBuffer eventBuffer = new EventBuffer();
2831
private volatile String previousTokens = "";
2932

3033
public Deque<ServerSentEvent> parse(byte[] bytes) {
@@ -33,43 +36,101 @@ public Deque<ServerSentEvent> parse(byte[] bytes) {
3336
}
3437

3538
var body = previousTokens + new String(bytes, StandardCharsets.UTF_8);
36-
var lines = END_OF_LINE_REGEX.split(body, -1); // -1 because we actually want trailing empty strings
39+
var lines = body.lines();
3740

38-
var collector = new ArrayDeque<ServerSentEvent>(lines.length);
39-
for (var i = 0; i < lines.length - 1; i++) {
40-
var line = lines[i].replace(BOM, "");
41+
var collector = new ArrayDeque<ServerSentEvent>();
42+
lines.reduce((previousLine, nextLine) -> {
43+
var line = previousLine.replace(BOM, "");
4144

42-
if (line.isBlank() == false && line.startsWith(":") == false) {
45+
if (line.isEmpty()) {
46+
eventBuffer.dispatch().ifPresent(collector::offer);
47+
} else if (line.startsWith(":") == false) {
4348
if (line.contains(":")) {
44-
fieldValueEvent(line).ifPresent(collector::offer);
45-
} else {
46-
ServerSentEventField.oneOf(line).map(ServerSentEvent::new).ifPresent(collector::offer);
49+
fieldValueEvent(line);
50+
} else if (DATA_FIELD.equals(line.toLowerCase(Locale.ROOT))) {
51+
eventBuffer.data("");
4752
}
4853
}
49-
}
50-
51-
// we can sometimes get bytes for incomplete messages, so we save them for the next onNext invocation
52-
// if we get an onComplete before we clear this cache, we follow the spec to treat it as an incomplete event and discard it since
53-
// it was not followed by a blank line
54-
previousTokens = lines[lines.length - 1];
54+
return nextLine;
55+
}).ifPresent(lastLine -> {
56+
if (lastLine.isEmpty()) {
57+
// if the last line is an empty line, then we dispatch the event and clear the previousToken cache
58+
eventBuffer.dispatch().ifPresent(collector::offer);
59+
previousTokens = "";
60+
} else {
61+
// we can sometimes get bytes for incomplete messages, so we save them for the next onNext invocation
62+
// if we get an onComplete before we clear this cache, we follow the spec to treat it as an incomplete event and discard it
63+
// since it was not followed by a blank line
64+
previousTokens = lastLine;
65+
}
66+
});
5567
return collector;
5668
}
5769

58-
private Optional<ServerSentEvent> fieldValueEvent(String lineWithColon) {
70+
private void fieldValueEvent(String lineWithColon) {
5971
var firstColon = lineWithColon.indexOf(":");
60-
var fieldStr = lineWithColon.substring(0, firstColon);
61-
var serverSentField = ServerSentEventField.oneOf(fieldStr);
62-
63-
if ((firstColon + 1) != lineWithColon.length()) {
64-
var value = lineWithColon.substring(firstColon + 1);
65-
if (value.equals(" ") == false) {
66-
var trimmedValue = value.charAt(0) == ' ' ? value.substring(1) : value;
67-
return serverSentField.map(field -> new ServerSentEvent(field, trimmedValue));
72+
var fieldStr = lineWithColon.substring(0, firstColon).toLowerCase(Locale.ROOT);
73+
74+
var value = lineWithColon.substring(firstColon + 1);
75+
// "If value starts with a U+0020 SPACE character, remove it from value."
76+
var trimmedValue = value.length() > 0 && value.charAt(0) == ' ' ? value.substring(1) : value;
77+
78+
if (DATA_FIELD.equals(fieldStr)) {
79+
eventBuffer.data(trimmedValue);
80+
} else if (TYPE_FIELD.equals(fieldStr)) {
81+
eventBuffer.type(trimmedValue);
82+
}
83+
}
84+
85+
private static class EventBuffer {
86+
private static final char LINE_FEED = '\n';
87+
private static final String MESSAGE = "message";
88+
private StringBuilder type = new StringBuilder();
89+
private StringBuilder data = new StringBuilder();
90+
private boolean appendLineFeed = false;
91+
92+
private void type(String type) {
93+
this.type.append(type);
94+
}
95+
96+
private void data(String data) {
97+
// "Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer."
98+
// But then we're told "If the data buffer's last character is a U+000A LINE FEED (LF) character,
99+
// then remove the last character from the data buffer."
100+
// Rather than add + remove for single-line data fields, we only append a LINE FEED on subsequent data lines.
101+
if (appendLineFeed) {
102+
this.data.append(LINE_FEED);
103+
} else {
104+
appendLineFeed = true;
68105
}
106+
this.data.append(data);
69107
}
70108

71-
// if we have "data:" or "data: ", treat it like a no-value line
72-
return serverSentField.map(ServerSentEvent::new);
109+
private Optional<ServerSentEvent> dispatch() {
110+
// "If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string and return."
111+
// We don't process empty events anywhere, so we just drop the events.
112+
if (data.isEmpty()) {
113+
reset();
114+
return Optional.empty();
115+
}
116+
var dataValue = data.toString();
117+
118+
// "Initialize event's type attribute to "message""
119+
// "If the event type buffer has a value other than the empty string,
120+
// change the type of the newly created event to equal the value of the event type buffer."
121+
var typeValue = type.toString();
122+
typeValue = typeValue.isBlank() ? MESSAGE : typeValue;
123+
124+
reset();
125+
126+
return Optional.of(new ServerSentEvent(typeValue, dataValue));
127+
}
128+
129+
private void reset() {
130+
type = new StringBuilder();
131+
data = new StringBuilder();
132+
appendLineFeed = false;
133+
}
73134
}
74135

75136
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/anthropic/AnthropicStreamingProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.xpack.core.inference.results.StreamingChatCompletionResults;
1919
import org.elasticsearch.xpack.inference.common.DelegatingProcessor;
2020
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
21-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
2221

2322
import java.io.IOException;
2423
import java.util.ArrayDeque;
@@ -42,8 +41,8 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
4241

4342
var results = new ArrayDeque<StreamingChatCompletionResults.Result>(item.size());
4443
for (var event : item) {
45-
if (event.name() == ServerSentEventField.DATA && event.hasValue()) {
46-
try (var parser = parser(event.value())) {
44+
if (event.hasData()) {
45+
try (var parser = parser(event.data())) {
4746
var eventType = eventType(parser);
4847
switch (eventType) {
4948
case "error" -> {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/googleaistudio/GoogleAiStudioStreamingProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.xpack.core.inference.results.StreamingChatCompletionResults;
1919
import org.elasticsearch.xpack.inference.common.DelegatingProcessor;
2020
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
21-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
2221

2322
import java.io.IOException;
2423
import java.util.ArrayDeque;
@@ -37,8 +36,8 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
3736
var parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE);
3837
var results = new ArrayDeque<StreamingChatCompletionResults.Result>(item.size());
3938
for (ServerSentEvent event : item) {
40-
if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
41-
try (XContentParser jsonParser = XContentFactory.xContent(XContentType.JSON).createParser(parserConfig, event.value())) {
39+
if (event.hasData()) {
40+
try (XContentParser jsonParser = XContentFactory.xContent(XContentType.JSON).createParser(parserConfig, event.data())) {
4241
var delta = content.apply(jsonParser);
4342
results.offer(new StreamingChatCompletionResults.Result(delta));
4443
} catch (Exception e) {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/openai/OpenAiStreamingProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
124124

125125
private static Iterator<StreamingChatCompletionResults.Result> parse(XContentParserConfiguration parserConfig, ServerSentEvent event)
126126
throws IOException {
127-
if (DONE_MESSAGE.equalsIgnoreCase(event.value())) {
127+
if (DONE_MESSAGE.equalsIgnoreCase(event.data())) {
128128
return Collections.emptyIterator();
129129
}
130130

131-
try (XContentParser jsonParser = XContentFactory.xContent(XContentType.JSON).createParser(parserConfig, event.value())) {
131+
try (XContentParser jsonParser = XContentFactory.xContent(XContentType.JSON).createParser(parserConfig, event.data())) {
132132
moveToFirstToken(jsonParser);
133133

134134
XContentParser.Token token = jsonParser.currentToken();

0 commit comments

Comments
 (0)