Skip to content

Commit 1ddb4dd

Browse files
committed
Merge remote-tracking branch 'origin/main' into promql-datetime-literal-fix
2 parents ffa50fe + 5e91ec0 commit 1ddb4dd

File tree

42 files changed

+1253
-582
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1253
-582
lines changed

docs/changelog/134709.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134709
2+
summary: "OTLP: store units in mappings"
3+
area: Mapping
4+
type: enhancement
5+
issues: []

docs/changelog/138663.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138663
2+
summary: Support weaker consistency model for S3 MPUs
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

docs/release-notes/known-issues.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,4 @@ This issue will be fixed in a future patch release (see [PR #126990](https://git
8989
* switching the order of the grouping keys (eg. `STATS ... BY keyword2, keyword1`, if the `keyword2` has a lower cardinality)
9090
* reducing the grouping key cardinality, by filtering out values before STATS
9191
92-
* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using an affected version of {{es}} and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue currently affects all supported versions of {{es}}. The plan to address it is described in [#137197](https://github.com/elastic/elasticsearch/issues/137197).
92+
* Repository analyses of snapshot repositories based on AWS S3 include some checks that the APIs which relate to multipart uploads have linearizable (strongly-consistent) semantics, based on guarantees offered by representatives from AWS on this subject. Further investigation has determined that these guarantees do not hold under all conditions as previously claimed. If you are analyzing a snapshot repository based on AWS S3 using a version of {{es}} prior to 9.3.0 and you encounter a failure related to linearizable register operations, you may work around the issue and suppress these checks by setting the query parameter `?register_operation_count=1` and running the analysis using a one-node cluster. This issue is fixed in {{es}} version 9.3.0 by [#138663](https://github.com/elastic/elasticsearch/pull/138663).

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
2121
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
2222
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
23+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2324
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
2425
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
2526
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
@@ -56,6 +57,7 @@
5657
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
5758
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
5859
import org.elasticsearch.common.blobstore.support.BlobMetadata;
60+
import org.elasticsearch.common.bytes.BytesArray;
5961
import org.elasticsearch.common.bytes.BytesReference;
6062
import org.elasticsearch.common.collect.Iterators;
6163
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -832,11 +834,14 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
832834
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
833835
if ((e instanceof AwsServiceException awsServiceException)
834836
&& (awsServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()
837+
|| awsServiceException.statusCode() == RestStatus.CONFLICT.getStatus()
838+
|| awsServiceException.statusCode() == RestStatus.PRECONDITION_FAILED.getStatus()
835839
|| awsServiceException.statusCode() == RestStatus.OK.getStatus()
836840
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
837841
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
838842
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
839-
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
843+
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention.
844+
// Also if something else changed the blob out from under us then the If-Match check results in a 409 or a 412.
840845
delegate.onResponse(OptionalBytesReference.MISSING);
841846
} else {
842847
delegate.onFailure(e);
@@ -891,20 +896,20 @@ void innerRun(BytesReference expected, BytesReference updated, ActionListener<Op
891896
// cannot have observed a stale value, whereas if our operation ultimately fails then it doesn't matter what this read
892897
// observes.
893898

894-
.<OptionalBytesReference>andThen(l -> getRegister(purpose, rawKey, l))
899+
.<RegisterAndEtag>andThen(l -> getRegisterAndEtag(purpose, rawKey, l))
895900

896901
// Step 5: Perform the compare-and-swap by completing our upload iff the witnessed value matches the expected value.
897902

898-
.andThenApply(currentValue -> {
899-
if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) {
903+
.andThenApply(currentValueAndEtag -> {
904+
if (currentValueAndEtag.registerContents().equals(expected)) {
900905
logger.trace("[{}] completing upload [{}]", blobKey, uploadId);
901-
completeMultipartUpload(uploadId, partETag);
906+
completeMultipartUpload(uploadId, partETag, currentValueAndEtag.eTag());
902907
} else {
903908
// Best-effort attempt to clean up after ourselves.
904909
logger.trace("[{}] aborting upload [{}]", blobKey, uploadId);
905910
safeAbortMultipartUpload(uploadId);
906911
}
907-
return currentValue;
912+
return OptionalBytesReference.of(currentValueAndEtag.registerContents());
908913
})
909914

910915
// Step 6: Complete the listener.
@@ -1111,7 +1116,7 @@ private void abortMultipartUploadIfExists(String uploadId) {
11111116
}
11121117
}
11131118

1114-
private void completeMultipartUpload(String uploadId, String partETag) {
1119+
private void completeMultipartUpload(String uploadId, String partETag, String existingEtag) {
11151120
final var completeMultipartUploadRequestBuilder = CompleteMultipartUploadRequest.builder()
11161121
.bucket(bucket)
11171122
.key(blobKey)
@@ -1123,6 +1128,14 @@ private void completeMultipartUpload(String uploadId, String partETag) {
11231128
Operation.PUT_MULTIPART_OBJECT,
11241129
purpose
11251130
);
1131+
if (blobStore.supportsConditionalWrites(purpose)) {
1132+
if (existingEtag == null) {
1133+
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
1134+
} else {
1135+
completeMultipartUploadRequestBuilder.ifMatch(existingEtag);
1136+
}
1137+
}
1138+
11261139
final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
11271140
client.completeMultipartUpload(completeMultipartUploadRequest);
11281141
}
@@ -1146,8 +1159,23 @@ public void compareAndExchangeRegister(
11461159
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
11471160
}
11481161

1162+
/**
1163+
* @param registerContents Contents of the register blob; {@link BytesArray#EMPTY} if the blob is absent.
1164+
* @param eTag Etag of the register blob; {@code null} if and only if the blob is absent.
1165+
*/
1166+
private record RegisterAndEtag(BytesReference registerContents, String eTag) {
1167+
/**
1168+
* Sentinel value to indicate that the register blob is absent.
1169+
*/
1170+
static RegisterAndEtag ABSENT = new RegisterAndEtag(BytesArray.EMPTY, null);
1171+
}
1172+
11491173
@Override
11501174
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
1175+
getRegisterAndEtag(purpose, key, listener.map(registerAndEtag -> OptionalBytesReference.of(registerAndEtag.registerContents())));
1176+
}
1177+
1178+
void getRegisterAndEtag(OperationPurpose purpose, String key, ActionListener<RegisterAndEtag> listener) {
11511179
ActionListener.completeWith(listener, () -> {
11521180
final var backoffPolicy = purpose == OperationPurpose.REPOSITORY_ANALYSIS
11531181
? BackoffPolicy.noBackoff()
@@ -1163,11 +1191,14 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
11631191
var clientReference = blobStore.clientReference();
11641192
var s3Object = clientReference.client().getObject(getObjectRequest);
11651193
) {
1166-
return OptionalBytesReference.of(getRegisterUsingConsistentRead(s3Object, keyPath, key));
1194+
return new RegisterAndEtag(
1195+
getRegisterUsingConsistentRead(s3Object, keyPath, key),
1196+
getRequiredEtag(purpose, s3Object.response())
1197+
);
11671198
} catch (Exception attemptException) {
11681199
logger.trace(() -> Strings.format("[%s]: getRegister failed", key), attemptException);
11691200
if (attemptException instanceof SdkServiceException sdkException && sdkException.statusCode() == 404) {
1170-
return OptionalBytesReference.EMPTY;
1201+
return RegisterAndEtag.ABSENT;
11711202
} else if (finalException == null) {
11721203
finalException = attemptException;
11731204
} else if (finalException != attemptException) {
@@ -1191,6 +1222,23 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
11911222
});
11921223
}
11931224

1225+
/**
1226+
* @return the {@code ETag} header from a {@link GetObjectResponse}, failing with an exception if it is omitted (unless not required
1227+
* for the given {@link OperationPurpose}).
1228+
*/
1229+
private String getRequiredEtag(OperationPurpose purpose, GetObjectResponse getObjectResponse) {
1230+
final var etag = getObjectResponse.eTag();
1231+
if (Strings.hasText(etag)) {
1232+
return etag;
1233+
} else if (blobStore.supportsConditionalWrites(purpose)) {
1234+
throw new UnsupportedOperationException("GetObject response contained no ETag header, cannot perform conditional write");
1235+
} else {
1236+
// blob stores which do not support conditional writes may also not return ETag headers, but we won't use it anyway so return
1237+
// a non-null dummy value
1238+
return "es-missing-but-ignored-etag";
1239+
}
1240+
}
1241+
11941242
ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCountingRunnable refs) {
11951243
try (var clientReference = blobStore.clientReference()) {
11961244
final var bucket = blobStore.bucket();

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import java.util.regex.Pattern;
9494

9595
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
96+
import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes;
9697
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
9798
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
9899
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
@@ -248,7 +249,7 @@ protected BlobContainer createBlobContainer(
248249
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
249250
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
250251
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
251-
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY),
252+
S3Repository.UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES.getDefault(Settings.EMPTY) == Boolean.FALSE,
252253
repositoryMetadata,
253254
BigArrays.NON_RECYCLING_INSTANCE,
254255
new DeterministicTaskQueue().getThreadPool(),
@@ -1381,6 +1382,76 @@ public void handle(HttpExchange exchange) throws IOException {
13811382
);
13821383
}
13831384

1385+
public void testCompareAndExchangeWithConcurrentPutObject() throws Exception {
1386+
final var blobContainerPath = BlobPath.EMPTY.add(getTestName());
1387+
final var statefulBlobContainer = createBlobContainer(1, null, null, null, null, null, blobContainerPath);
1388+
1389+
final var objectContentsRequestedLatch = new CountDownLatch(1);
1390+
1391+
@SuppressForbidden(reason = "use a http server")
1392+
class AwaitsListMultipartUploads extends S3HttpHandler {
1393+
AwaitsListMultipartUploads() {
1394+
super("bucket");
1395+
}
1396+
1397+
@Override
1398+
public void handle(HttpExchange exchange) throws IOException {
1399+
if (parseRequest(exchange).isGetObjectRequest()) {
1400+
// delay the overwrite until the CAS is checking the object contents, forcing a race
1401+
objectContentsRequestedLatch.countDown();
1402+
}
1403+
super.handle(exchange);
1404+
}
1405+
}
1406+
1407+
httpServer.createContext("/", new AwaitsListMultipartUploads());
1408+
1409+
final var blobName = randomIdentifier();
1410+
final var initialValue = randomBytesReference(8);
1411+
final var overwriteValue = randomValueOtherThan(initialValue, () -> randomBytesReference(8));
1412+
final var casTargetValue = randomValueOtherThanMany(
1413+
v -> v.equals(initialValue) || v.equals(overwriteValue),
1414+
() -> randomBytesReference(8)
1415+
);
1416+
1417+
statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, initialValue, randomBoolean());
1418+
1419+
runInParallel(
1420+
() -> safeAwait(
1421+
l -> statefulBlobContainer.compareAndExchangeRegister(
1422+
randomPurpose(),
1423+
blobName,
1424+
initialValue,
1425+
casTargetValue,
1426+
l.map(result -> {
1427+
// Really anything can happen here: success (sees initialValue) or failure (sees overwriteValue), or contention
1428+
if (result.isPresent()) {
1429+
assertThat(
1430+
result.toString(),
1431+
result.bytesReference(),
1432+
anyOf(equalBytes(initialValue), equalBytes(overwriteValue))
1433+
);
1434+
}
1435+
return null;
1436+
})
1437+
)
1438+
),
1439+
() -> {
1440+
try {
1441+
safeAwait(objectContentsRequestedLatch);
1442+
statefulBlobContainer.writeBlobAtomic(randomPurpose(), blobName, overwriteValue, false);
1443+
} catch (IOException e) {
1444+
throw new AssertionError("writeBlobAtomic failed", e);
1445+
}
1446+
}
1447+
);
1448+
1449+
// If the CAS happens before the overwrite then we'll see the overwritten value, whereas if the CAS happens second then it will
1450+
// fail because the value was overwritten leaving the overwritten value in place. If, however, the CAS were not atomic with respect
1451+
// to other non-CAS-based writes, then we would see casTargetValue here:
1452+
assertThat(Streams.readFully(statefulBlobContainer.readBlob(randomPurpose(), blobName)), equalBytes(overwriteValue));
1453+
}
1454+
13841455
@Override
13851456
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
13861457
// some attempts make meaningful progress and do not count towards the max retry limit

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
203203
AtomicBoolean nextPage = new AtomicBoolean(false);
204204

205205
ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
206-
for (int i = 0; i < 4; ++i) {
206+
for (int i = 0; i < 5; ++i) {
207207
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
208208
add512BRequests(requests, index);
209209
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
@@ -230,6 +230,8 @@ public void testIncrementalBulkHighWatermarkBackOff() throws Exception {
230230
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
231231
add512BRequests(requestsThrottle, index);
232232
add512BRequests(requestsThrottle, index);
233+
// Ensure we'll be above SPLIT_BULK_HIGH_WATERMARK
234+
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() + 1024, greaterThan(4096L));
233235

234236
CountDownLatch finishLatch = new CountDownLatch(1);
235237
blockWriteCoordinationPool(threadPool, finishLatch);

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/NodeIndexingMetricsIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
796796
AtomicBoolean nextPage = new AtomicBoolean(false);
797797

798798
ArrayList<IncrementalBulkService.Handler> handlers = new ArrayList<>();
799-
for (int i = 0; i < 4; ++i) {
799+
for (int i = 0; i < 5; ++i) {
800800
ArrayList<DocWriteRequest<?>> requests = new ArrayList<>();
801801
add512BRequests(requests, index);
802802
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
@@ -838,6 +838,8 @@ public void testIncrementalBulkHighWatermarkSplitMetrics() throws Exception {
838838
// Test that a request larger than SPLIT_BULK_HIGH_WATERMARK_SIZE (1KB) is throttled
839839
add512BRequests(requestsThrottle, index);
840840
add512BRequests(requestsThrottle, index);
841+
// Ensure we'll be above SPLIT_BULK_HIGH_WATERMARK
842+
assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes() + 1024, greaterThan(4096L));
841843

842844
CountDownLatch finishLatch = new CountDownLatch(1);
843845
blockWriteCoordinationPool(threadPool, finishLatch);

0 commit comments

Comments
 (0)