Skip to content

Commit 2db174b

Browse files
authored
Merge branch 'main' into t-digest-sum-of-empty-change
2 parents 79c639b + eb20ffa commit 2db174b

File tree

9 files changed

+65
-21
lines changed

9 files changed

+65
-21
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,10 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
783783
}
784784
}
785785

786-
private class CompareAndExchangeOperation {
786+
/**
787+
* An implementation of {@link BlobContainer#compareAndExchangeRegister} based on strongly-consistent multipart upload APIs.
788+
*/
789+
private class MultipartUploadCompareAndExchangeOperation {
787790

788791
private final OperationPurpose purpose;
789792
private final S3Client client;
@@ -792,7 +795,13 @@ private class CompareAndExchangeOperation {
792795
private final String blobKey;
793796
private final ThreadPool threadPool;
794797

795-
CompareAndExchangeOperation(OperationPurpose purpose, S3Client client, String bucket, String key, ThreadPool threadPool) {
798+
MultipartUploadCompareAndExchangeOperation(
799+
OperationPurpose purpose,
800+
S3Client client,
801+
String bucket,
802+
String key,
803+
ThreadPool threadPool
804+
) {
796805
this.purpose = purpose;
797806
this.client = client;
798807
this.bucket = bucket;
@@ -802,6 +811,23 @@ private class CompareAndExchangeOperation {
802811
}
803812

804813
void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
814+
innerRun(expected, updated, listener.delegateResponse((delegate, e) -> {
815+
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", rawKey), e);
816+
if (e instanceof AwsServiceException awsServiceException
817+
&& (awsServiceException.statusCode() == 404
818+
|| awsServiceException.statusCode() == 200
819+
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
820+
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
821+
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
822+
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
823+
delegate.onResponse(OptionalBytesReference.MISSING);
824+
} else {
825+
delegate.onFailure(e);
826+
}
827+
}));
828+
}
829+
830+
void innerRun(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
805831
BlobContainerUtils.ensureValidRegisterContent(updated);
806832

807833
if (hasPreexistingUploads()) {
@@ -1094,25 +1120,15 @@ public void compareAndExchangeRegister(
10941120
ActionListener<OptionalBytesReference> listener
10951121
) {
10961122
final var clientReference = blobStore.clientReference();
1097-
ActionListener.run(ActionListener.releaseAfter(listener.delegateResponse((delegate, e) -> {
1098-
logger.trace(() -> Strings.format("[%s]: compareAndExchangeRegister failed", key), e);
1099-
if (e instanceof AwsServiceException awsServiceException
1100-
&& (awsServiceException.statusCode() == 404
1101-
|| awsServiceException.statusCode() == 200
1102-
&& "NoSuchUpload".equals(awsServiceException.awsErrorDetails().errorCode()))) {
1103-
// An uncaught 404 means that our multipart upload was aborted by a concurrent operation before we could complete it.
1104-
// Also (rarely) S3 can start processing the request during a concurrent abort and this can result in a 200 OK with an
1105-
// <Error><Code>NoSuchUpload</Code>... in the response. Either way, this means that our write encountered contention:
1106-
delegate.onResponse(OptionalBytesReference.MISSING);
1107-
} else {
1108-
delegate.onFailure(e);
1109-
}
1110-
}), clientReference),
1111-
l -> new CompareAndExchangeOperation(purpose, clientReference.client(), blobStore.bucket(), key, blobStore.getThreadPool()).run(
1112-
expected,
1113-
updated,
1114-
l
1115-
)
1123+
ActionListener.run(
1124+
ActionListener.releaseBefore(clientReference, listener),
1125+
l -> new MultipartUploadCompareAndExchangeOperation(
1126+
purpose,
1127+
clientReference.client(),
1128+
blobStore.bucket(),
1129+
key,
1130+
blobStore.getThreadPool()
1131+
).run(expected, updated, l)
11161132
);
11171133
}
11181134

server/src/main/java/org/elasticsearch/index/codec/vectors/diskbbq/next/ESNextDiskBBQVectorsReader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,20 @@ private static CentroidIterator getCentroidIteratorWithParents(
324324
final NeighborQueue currentParentQueue = new NeighborQueue(maxChildrenSize, true);
325325
final int bufferSize = (int) Math.min(Math.max(centroidRatio * numCentroids, 1), numCentroids);
326326
final int numCentroidsFiltered = acceptCentroids == null ? numCentroids : acceptCentroids.cardinality();
327+
if (numCentroidsFiltered == 0) {
328+
// TODO maybe this makes CentroidIterator polymorphic?
329+
return new CentroidIterator() {
330+
@Override
331+
public boolean hasNext() {
332+
return false;
333+
}
334+
335+
@Override
336+
public CentroidOffsetAndLength nextPostingListOffsetAndLength() {
337+
return null;
338+
}
339+
};
340+
}
327341
final float[] scores = new float[ES92Int7VectorsScorer.BULK_SIZE];
328342
final NeighborQueue neighborQueue;
329343
if (acceptCentroids != null && numCentroidsFiltered <= bufferSize) {

server/src/test/java/org/elasticsearch/index/codec/vectors/diskbbq/next/ESNextDiskBBQVectorsFormatTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.lucene.index.VectorEncoding;
3030
import org.apache.lucene.index.VectorSimilarityFunction;
3131
import org.apache.lucene.search.AcceptDocs;
32+
import org.apache.lucene.search.DocIdSetIterator;
3233
import org.apache.lucene.search.KnnCollector;
3334
import org.apache.lucene.search.TopDocs;
3435
import org.apache.lucene.search.TopKnnCollector;
@@ -382,6 +383,13 @@ private void doRestrictiveFilter(boolean dense) throws IOException {
382383
uniqueDocIds.add(topDocs.scoreDocs[i].doc);
383384
}
384385
assertEquals(matchingDocs, uniqueDocIds.size());
386+
// match no docs
387+
leafReader.searchNearestVectors(
388+
"f",
389+
vector,
390+
new TopKnnCollector(2, Integer.MAX_VALUE),
391+
AcceptDocs.fromIteratorSupplier(DocIdSetIterator::empty, leafReader.getLiveDocs(), leafReader.maxDoc())
392+
);
385393
}
386394
}
387395
}

x-pack/plugin/snapshot-repo-test-kit/qa/azure/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/AzureRepositoryAnalysisRestIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ private static Predicate<String> decideAuthHeaderPredicate() {
7474
private static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
7575
.module("repository-azure")
7676
.module("snapshot-repo-test-kit")
77+
.setting("thread_pool.snapshot.max", "10")
7778
.keystore("azure.client.repository_test_kit.account", AZURE_TEST_ACCOUNT)
7879
.keystore("azure.client.repository_test_kit.key", () -> AZURE_TEST_KEY, s -> Strings.hasText(AZURE_TEST_KEY))
7980
.keystore("azure.client.repository_test_kit.sas_token", () -> AZURE_TEST_SASTOKEN, s -> Strings.hasText(AZURE_TEST_SASTOKEN))

x-pack/plugin/snapshot-repo-test-kit/qa/gcs/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/GCSRepositoryAnalysisRestIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class GCSRepositoryAnalysisRestIT extends AbstractRepositoryAnalysisRestT
3131
private static ElasticsearchCluster cluster = ElasticsearchCluster.local()
3232
.module("repository-gcs")
3333
.module("snapshot-repo-test-kit")
34+
.setting("thread_pool.snapshot.max", "10")
3435
.setting("gcs.client.repository_test_kit.endpoint", () -> fixture.getAddress(), s -> USE_FIXTURE)
3536
.setting("gcs.client.repository_test_kit.token_uri", () -> fixture.getAddress() + "/o/oauth2/token", s -> USE_FIXTURE)
3637
.apply(c -> {

x-pack/plugin/snapshot-repo-test-kit/qa/hdfs/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/HdfsRepositoryAnalysisRestIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class HdfsRepositoryAnalysisRestIT extends AbstractHdfsRepositoryAnalysis
2424
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
2525
.distribution(DistributionType.DEFAULT)
2626
.plugin("repository-hdfs")
27+
.setting("thread_pool.snapshot.max", "10")
2728
.setting("xpack.license.self_generated.type", "trial")
2829
.setting("xpack.security.enabled", "false")
2930
.build();

x-pack/plugin/snapshot-repo-test-kit/qa/hdfs/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/SecureHdfsRepositoryAnalysisRestIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class SecureHdfsRepositoryAnalysisRestIT extends AbstractHdfsRepositoryAn
3131
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
3232
.distribution(DistributionType.DEFAULT)
3333
.plugin("repository-hdfs")
34+
.setting("thread_pool.snapshot.max", "10")
3435
.setting("xpack.license.self_generated.type", "trial")
3536
.setting("xpack.security.enabled", "false")
3637
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())

x-pack/plugin/snapshot-repo-test-kit/qa/minio/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/MinioRepositoryAnalysisRestIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class MinioRepositoryAnalysisRestIT extends AbstractRepositoryAnalysisRes
3131
.distribution(DistributionType.DEFAULT)
3232
.keystore("s3.client.repository_test_kit.access_key", "s3_test_access_key")
3333
.keystore("s3.client.repository_test_kit.secret_key", "s3_test_secret_key")
34+
.setting("thread_pool.snapshot.max", "10")
3435
.setting("s3.client.repository_test_kit.endpoint", minioFixture::getAddress)
3536
.setting("xpack.security.enabled", "false")
3637
.setting("xpack.ml.enabled", "false")

x-pack/plugin/snapshot-repo-test-kit/qa/s3/src/javaRestTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/S3RepositoryAnalysisRestIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ private static void ensurePurposeParameterPresent(S3HttpHandler.S3Request reques
8282
.setting("s3.client.repo_test_kit.region", regionSupplier, (n) -> USE_FIXTURE)
8383
.setting("s3.client.repo-test_kit.add_purpose_custom_query_parameter", () -> randomFrom("true", "false"), n -> randomBoolean())
8484
.setting("xpack.security.enabled", "false")
85+
.setting("thread_pool.snapshot.max", "10")
8586
.build();
8687

8788
@ClassRule

0 commit comments

Comments
 (0)