From 1aeb1129fee2e2567e315ee0f7f22ca8b0ec4f95 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Thu, 19 Jun 2025 09:20:49 +0200 Subject: [PATCH 1/3] Clone IndexInput when creating MemorySegmentPostingsVisitor --- .../vectors/DefaultIVFVectorsReader.java | 2 +- .../codec/vectors/IVFVectorsFormatTests.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index 2a2bef3dfcf19..5c86f602a654e 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -122,7 +122,7 @@ NeighborQueue scorePostingLists(FieldInfo fieldInfo, KnnCollector knnCollector, PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring) throws IOException { FieldEntry entry = fields.get(fieldInfo.number); - return new MemorySegmentPostingsVisitor(target, indexInput, entry, fieldInfo, needsScoring); + return new MemorySegmentPostingsVisitor(target, indexInput.clone(), entry, fieldInfo, needsScoring); } // TODO can we do this in off-heap blocks? diff --git a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java index 177a3d00c3dc4..79483f47f0e3c 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; import static java.lang.String.format; import static org.elasticsearch.index.codec.vectors.IVFVectorsFormat.MAX_VECTORS_PER_CLUSTER; @@ -128,4 +129,49 @@ public void testSimpleOffHeapSize() throws IOException { } } } + + public void testWithThreads() throws Exception { + final int numThreads = random().nextInt(2, 5); + final int numSearches = atLeast(100); + final int numDocs = atLeast(1000); + final int dimensions = random().nextInt(12, 500); + try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) { + for (int docCount = 0; docCount < numDocs; docCount++) { + final Document doc = new Document(); + doc.add(new KnnFloatVectorField("f", randomVector(dimensions), VectorSimilarityFunction.EUCLIDEAN)); + w.addDocument(doc); + } + w.forceMerge(1); + try (IndexReader reader = DirectoryReader.open(w)) { + final AtomicBoolean failed = new AtomicBoolean(); + Thread[] threads = new Thread[numThreads]; + for (int threadID = 0; threadID < numThreads; threadID++) { + threads[threadID] = + new Thread(() -> { + try { + long totSearch = 0; + for (; totSearch < numSearches && failed.get() == false; totSearch++) { + float[] vector = randomVector(dimensions); + LeafReader leafReader = getOnlyLeafReader(reader); + leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE); + } + assertTrue(totSearch > 0); + } catch (Exception exc) { + failed.set(true); + throw new RuntimeException(exc); + } + }); + threads[threadID].setDaemon(true); + } + + for (Thread t : threads) { + t.start(); + } + + for (Thread t : threads) { + t.join(); + } + } + } + } } From ac7a54de42cc8ddf0f8c3f9a1162efd5fb4b25ad Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Thu, 19 Jun 2025 09:25:36 +0200 Subject: [PATCH 2/3] iter --- .../elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java index 79483f47f0e3c..2603a51c6b5b3 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java @@ -130,6 +130,7 @@ public void testSimpleOffHeapSize() throws IOException { } } + // this is a modified version of lucene's TestSearchWithThreads test case public void testWithThreads() throws Exception { final int numThreads = random().nextInt(2, 5); final int numSearches = atLeast(100); From 19e1bfae0304f4c15649ca171ba4745fb4cc3b36 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 19 Jun 2025 07:32:33 +0000 Subject: [PATCH 3/3] [CI] Auto commit changes from spotless --- .../codec/vectors/IVFVectorsFormatTests.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java index 2603a51c6b5b3..8499aa9a17320 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java @@ -147,21 +147,20 @@ public void testWithThreads() throws Exception { final AtomicBoolean failed = new AtomicBoolean(); Thread[] threads = new Thread[numThreads]; for (int threadID = 0; threadID < numThreads; threadID++) { - threads[threadID] = - new Thread(() -> { - try { - long totSearch = 0; - for (; totSearch < numSearches && failed.get() == false; totSearch++) { - float[] vector = randomVector(dimensions); - LeafReader leafReader = getOnlyLeafReader(reader); - leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE); - } - assertTrue(totSearch > 0); - } catch (Exception exc) { - failed.set(true); - throw new RuntimeException(exc); + threads[threadID] = new Thread(() -> { + try { + long totSearch = 0; + for (; totSearch < numSearches && failed.get() == false; totSearch++) { + float[] vector = randomVector(dimensions); + LeafReader leafReader = getOnlyLeafReader(reader); + leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE); } - }); + assertTrue(totSearch > 0); + } catch (Exception exc) { + failed.set(true); + throw new RuntimeException(exc); + } + }); threads[threadID].setDaemon(true); }