Skip to content

Commit 9ddac90

Browse files
authored
Merge branch 'main' into do-not-add-use-case-header-twice
2 parents c07cb29 + 44a74f9 commit 9ddac90

File tree

17 files changed

+239
-166
lines changed

17 files changed

+239
-166
lines changed

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
import com.amazonaws.DnsResolver;
1515
import com.amazonaws.SdkClientException;
1616
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
17-
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
1817
import com.amazonaws.services.s3.model.AmazonS3Exception;
19-
import com.amazonaws.util.Base16;
2018
import com.sun.net.httpserver.HttpExchange;
2119
import com.sun.net.httpserver.HttpHandler;
2220

@@ -30,7 +28,9 @@
3028
import org.elasticsearch.common.blobstore.BlobPath;
3129
import org.elasticsearch.common.blobstore.OperationPurpose;
3230
import org.elasticsearch.common.blobstore.OptionalBytesReference;
31+
import org.elasticsearch.common.bytes.BytesArray;
3332
import org.elasticsearch.common.bytes.BytesReference;
33+
import org.elasticsearch.common.hash.MessageDigests;
3434
import org.elasticsearch.common.io.Streams;
3535
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
3636
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@@ -377,13 +377,12 @@ public void testWriteLargeBlob() throws Exception {
377377
}
378378
} else if (s3Request.isUploadPartRequest()) {
379379
// upload part request
380-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
381-
BytesReference bytes = Streams.readFully(md5);
380+
BytesReference bytes = Streams.readFully(exchange.getRequestBody());
382381
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
383382
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
384383

385384
if (countDownUploads.decrementAndGet() % 2 == 0) {
386-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
385+
exchange.getResponseHeaders().add("ETag", getBase16MD5Digest(bytes));
387386
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
388387
exchange.close();
389388
return;
@@ -475,12 +474,11 @@ public void testWriteLargeBlobStreaming() throws Exception {
475474
}
476475
} else if (s3Request.isUploadPartRequest()) {
477476
// upload part request
478-
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
479-
BytesReference bytes = Streams.readFully(md5);
477+
BytesReference bytes = Streams.readFully(exchange.getRequestBody());
480478

481479
if (counterUploads.incrementAndGet() % 2 == 0) {
482480
bytesReceived.addAndGet(bytes.length());
483-
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
481+
exchange.getResponseHeaders().add("ETag", getBase16MD5Digest(bytes));
484482
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
485483
exchange.close();
486484
return;
@@ -1105,6 +1103,21 @@ public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException
11051103
}
11061104
}
11071105

1106+
private static String getBase16MD5Digest(BytesReference bytesReference) {
1107+
return MessageDigests.toHexString(MessageDigests.digest(bytesReference, MessageDigests.md5()));
1108+
}
1109+
1110+
public void testGetBase16MD5Digest() {
1111+
// from Wikipedia, see also org.elasticsearch.common.hash.MessageDigestsTests.testMd5
1112+
assertBase16MD5Digest("", "d41d8cd98f00b204e9800998ecf8427e");
1113+
assertBase16MD5Digest("The quick brown fox jumps over the lazy dog", "9e107d9d372bb6826bd81d3542a419d6");
1114+
assertBase16MD5Digest("The quick brown fox jumps over the lazy dog.", "e4d909c290d0fb1ca068ffaddf22cbd0");
1115+
}
1116+
1117+
private static void assertBase16MD5Digest(String input, String expectedDigestString) {
1118+
assertEquals(expectedDigestString, getBase16MD5Digest(new BytesArray(input)));
1119+
}
1120+
11081121
@Override
11091122
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
11101123
// some attempts make meaningful progress and do not count towards the max retry limit

muted-tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,15 @@ tests:
386386
- class: org.elasticsearch.snapshots.SharedClusterSnapshotRestoreIT
387387
method: testDeletionOfFailingToRecoverIndexShouldStopRestore
388388
issue: https://github.com/elastic/elasticsearch/issues/126204
389+
- class: org.elasticsearch.index.engine.ThreadPoolMergeSchedulerTests
390+
method: testSchedulerCloseWaitsForRunningMerge
391+
issue: https://github.com/elastic/elasticsearch/issues/125236
392+
- class: org.elasticsearch.entitlement.runtime.policy.PolicyUtilsTests
393+
method: testFormatFilesEntitlement
394+
issue: https://github.com/elastic/elasticsearch/issues/126176
395+
- class: org.elasticsearch.xpack.security.SecurityRolesMultiProjectIT
396+
method: testUpdatingFileBasedRoleAffectsAllProjects
397+
issue: https://github.com/elastic/elasticsearch/issues/126223
389398

390399
# Examples:
391400
#

x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -299,17 +299,13 @@ public void testUnsupportedStream() throws Exception {
299299

300300
try {
301301
var events = streamInferOnMockService(modelId, TaskType.SPARSE_EMBEDDING, List.of(randomUUID()), null);
302-
assertThat(events.size(), equalTo(2));
302+
assertThat(events.size(), equalTo(1));
303303
events.forEach(event -> {
304-
switch (event.name()) {
305-
case EVENT -> assertThat(event.value(), equalToIgnoringCase("error"));
306-
case DATA -> assertThat(
307-
event.value(),
308-
containsString(
309-
"Streaming is not allowed for service [streaming_completion_test_service] and task [sparse_embedding]"
310-
)
311-
);
312-
}
304+
assertThat(event.type(), equalToIgnoringCase("error"));
305+
assertThat(
306+
event.data(),
307+
containsString("Streaming is not allowed for service [streaming_completion_test_service] and task [sparse_embedding]")
308+
);
313309
});
314310
} finally {
315311
deleteModel(modelId);
@@ -331,12 +327,10 @@ public void testSupportedStream() throws Exception {
331327
input.stream().map(s -> s.toUpperCase(Locale.ROOT)).map(str -> "{\"completion\":[{\"delta\":\"" + str + "\"}]}"),
332328
Stream.of("[DONE]")
333329
).iterator();
334-
assertThat(events.size(), equalTo((input.size() + 1) * 2));
330+
assertThat(events.size(), equalTo(input.size() + 1));
335331
events.forEach(event -> {
336-
switch (event.name()) {
337-
case EVENT -> assertThat(event.value(), equalToIgnoringCase("message"));
338-
case DATA -> assertThat(event.value(), equalTo(expectedResponses.next()));
339-
}
332+
assertThat(event.type(), equalToIgnoringCase("message"));
333+
assertThat(event.data(), equalTo(expectedResponses.next()));
340334
});
341335
} finally {
342336
deleteModel(modelId);
@@ -359,12 +353,10 @@ public void testUnifiedCompletionInference() throws Exception {
359353
VALIDATE_ELASTIC_PRODUCT_HEADER_CONSUMER
360354
);
361355
var expectedResponses = expectedResultsIterator(input);
362-
assertThat(events.size(), equalTo((input.size() + 1) * 2));
356+
assertThat(events.size(), equalTo(input.size() + 1));
363357
events.forEach(event -> {
364-
switch (event.name()) {
365-
case EVENT -> assertThat(event.value(), equalToIgnoringCase("message"));
366-
case DATA -> assertThat(event.value(), equalTo(expectedResponses.next()));
367-
}
358+
assertThat(event.type(), equalToIgnoringCase("message"));
359+
assertThat(event.data(), equalTo(expectedResponses.next()));
368360
});
369361
} finally {
370362
deleteModel(modelId);

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListenerTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.elasticsearch.xpack.core.inference.action.InferenceAction;
5151
import org.elasticsearch.xpack.core.inference.results.XContentFormattedException;
5252
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
53-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
5453
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventParser;
5554

5655
import java.io.IOException;
@@ -353,9 +352,8 @@ private static class RandomStringCollector {
353352
private void collect(String str) throws IOException {
354353
sseParser.parse(str.getBytes(StandardCharsets.UTF_8))
355354
.stream()
356-
.filter(event -> event.name() == ServerSentEventField.DATA)
357-
.filter(ServerSentEvent::hasValue)
358-
.map(ServerSentEvent::value)
355+
.filter(ServerSentEvent::hasData)
356+
.map(ServerSentEvent::data)
359357
.forEach(stringsVerified::offer);
360358
}
361359
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/DelegatingProcessor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.xcontent.XContentParserConfiguration;
1313
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
14-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
1514

1615
import java.io.IOException;
1716
import java.util.ArrayDeque;
@@ -40,7 +39,7 @@ public static <ParsedChunk> Deque<ParsedChunk> parseEvent(
4039
) throws Exception {
4140
var results = new ArrayDeque<ParsedChunk>(item.size());
4241
for (ServerSentEvent event : item) {
43-
if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
42+
if (event.hasData()) {
4443
try {
4544
var delta = parseFunction.apply(parserConfig, event);
4645
delta.forEachRemaining(results::offer);

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/response/streaming/ServerSentEvent.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,26 @@
99

1010
/**
1111
* Server-Sent Event message: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
12-
* Messages always contain a {@link ServerSentEventField} and a non-null payload value.
13-
* When the stream is parsed and there is no value associated with a {@link ServerSentEventField}, an empty-string is set as the value.
1412
*/
15-
public record ServerSentEvent(ServerSentEventField name, String value) {
13+
public record ServerSentEvent(String type, String data) {
1614

1715
private static final String EMPTY = "";
16+
private static final String MESSAGE = "message";
1817

19-
public ServerSentEvent(ServerSentEventField name) {
20-
this(name, EMPTY);
18+
public static ServerSentEvent empty() {
19+
return new ServerSentEvent(EMPTY, EMPTY);
2120
}
2221

23-
// treat null value as an empty string, don't break parsing
24-
public ServerSentEvent(ServerSentEventField name, String value) {
25-
this.name = name;
26-
this.value = value != null ? value : EMPTY;
22+
public ServerSentEvent(String data) {
23+
this(MESSAGE, data);
2724
}
2825

29-
public boolean hasValue() {
30-
return value.isBlank() == false;
26+
public ServerSentEvent {
27+
data = data != null ? data : EMPTY;
28+
type = type != null && type.isBlank() == false ? type : MESSAGE;
29+
}
30+
31+
public boolean hasData() {
32+
return data.isBlank() == false;
3133
}
3234
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/response/streaming/ServerSentEventField.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)