From 6c40c65d9c33d15e08e4ec77b12734c0001705ce Mon Sep 17 00:00:00 2001 From: John Wagster Date: Wed, 13 Aug 2025 09:41:42 -0500 Subject: [PATCH 1/6] prefetch poc --- qa/vector/build.gradle | 2 +- .../vectors/DefaultIVFVectorsReader.java | 47 +++++++++++++++++-- .../vectors/DefaultIVFVectorsWriter.java | 36 ++++++++------ .../index/codec/vectors/IVFVectorsReader.java | 42 ++++++++++++++++- .../index/codec/vectors/IVFVectorsWriter.java | 16 ++++--- .../codec/vectors/cluster/KMeansLocal.java | 1 + 6 files changed, 116 insertions(+), 28 deletions(-) diff --git a/qa/vector/build.gradle b/qa/vector/build.gradle index 41064d2bb3451..16d9354078b09 100644 --- a/qa/vector/build.gradle +++ b/qa/vector/build.gradle @@ -44,7 +44,7 @@ tasks.register("checkVec", JavaExec) { systemProperty "es.logger.out", "console" systemProperty "es.logger.level", "INFO" // Change to DEBUG if needed systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../../libs/native/libraries/build/platform/").toString()) - jvmArgs '-Xms4g', '-Xmx4g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError' + jvmArgs '-Xms300m', '-Xmx300m', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError' if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) { jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED' } diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index d80fc216e556c..1f98cb477987a 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -109,6 +109,10 @@ private static CentroidIterator getCentroidIteratorNoParent( ); long offset = centroids.getFilePointer(); return new CentroidIterator() { + + // FIXME: clean this up + long postingListLength = -1; + @Override public boolean hasNext() { return neighborQueue.size() > 0; @@ -117,8 +121,18 @@ public boolean hasNext() { @Override public long nextPostingListOffset() throws IOException { int centroidOrdinal = neighborQueue.pop(); - centroids.seek(offset + (long) Long.BYTES * centroidOrdinal); - return centroids.readLong(); + centroids.seek(offset + (long) Long.BYTES * 2 * centroidOrdinal); + long postingListOffset = centroids.readLong(); + postingListLength = centroids.readLong(); + return postingListOffset; + } + + @Override + public long curPostingListLength() throws IOException { + if (postingListLength == -1) { + throw new IllegalStateException("nextPostingListOffset must be called before curPostingListLength"); + } + return postingListLength; } }; } @@ -179,6 +193,9 @@ private static CentroidIterator getCentroidIteratorWithParents( } final long childrenFileOffsets = childrenOffset + centroidQuantizeSize * numCentroids; return new CentroidIterator() { + // FIXME: clean this up + long postingListLength = -1; + @Override public boolean hasNext() { return neighborQueue.size() > 0; @@ -188,8 +205,18 @@ public boolean hasNext() { public long nextPostingListOffset() throws IOException { int centroidOrdinal = neighborQueue.pop(); updateQueue(); // add one children if available so the queue remains fully populated - centroids.seek(childrenFileOffsets + (long) Long.BYTES * centroidOrdinal); - return centroids.readLong(); + centroids.seek(childrenFileOffsets + (long) Long.BYTES * 2 * centroidOrdinal); + long postingListOffset = centroids.readLong(); + postingListLength = centroids.readLong(); + return postingListOffset; + } + + @Override + public long curPostingListLength() throws IOException { + if (postingListLength == -1) { + throw new IllegalStateException("nextPostingListOffset must be called before curPostingListLength"); + } + return postingListLength; } private void updateQueue() throws IOException { @@ -360,6 +387,17 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor { this.docIdsScratch = new int[maxPostingListSize]; } + @Override + public void prefetch(long offset) throws IOException { + // long postingLength = vectors / entry.postingListLength(); + + // long vIntFakeMax = 5; // FIXME: need to know exact number here + // long centroidLength = (long) fieldInfo.getVectorDimension() * Float.BYTES + + // Integer.BYTES + vIntFakeMax + (long) vectors * Integer.BYTES + + indexInput.prefetch(offset, slicePos + vectors * quantizedByteLength); + } + @Override public int resetPostingsScorer(long offset) throws IOException { quantized = false; @@ -452,6 +490,7 @@ public int visit(KnnCollector knnCollector) throws IOException { int scoredDocs = 0; int limit = vectors - BULK_SIZE + 1; int i = 0; + for (; i < limit; i += BULK_SIZE) { final int docsToBulkScore = acceptDocs == null ? BULK_SIZE : docToBulkScore(docIdsScratch, i, acceptDocs); if (docsToBulkScore == 0) { diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsWriter.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsWriter.java index 5e696b74530a8..b0de42854dc6a 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsWriter.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsWriter.java @@ -18,7 +18,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.IntroSorter; -import org.apache.lucene.util.LongValues; import org.apache.lucene.util.VectorUtil; import org.apache.lucene.util.hnsw.IntToIntFunction; import org.apache.lucene.util.packed.PackedInts; @@ -60,7 +59,7 @@ public DefaultIVFVectorsWriter( } @Override - LongValues buildAndWritePostingsLists( + CentroidOffsetAndLength buildAndWritePostingsLists( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, FloatVectorValues floatVectorValues, @@ -102,6 +101,7 @@ LongValues buildAndWritePostingsLists( postingsOutput.writeVInt(maxPostingListSize); // write the posting lists final PackedLongValues.Builder offsets = PackedLongValues.monotonicBuilder(PackedInts.COMPACT); + final PackedLongValues.Builder lengths = PackedLongValues.monotonicBuilder(PackedInts.COMPACT); DiskBBQBulkWriter bulkWriter = new DiskBBQBulkWriter.OneBitDiskBBQBulkWriter(ES91OSQVectorsScorer.BULK_SIZE, postingsOutput); OnHeapQuantizedVectors onHeapQuantizedVectors = new OnHeapQuantizedVectors( floatVectorValues, @@ -116,7 +116,8 @@ LongValues buildAndWritePostingsLists( for (int c = 0; c < centroidSupplier.size(); c++) { float[] centroid = centroidSupplier.centroid(c); int[] cluster = assignmentsByCluster[c]; - offsets.add(postingsOutput.alignFilePointer(Float.BYTES) - fileOffset); + long offset = postingsOutput.alignFilePointer(Float.BYTES) - fileOffset; + offsets.add(offset); buffer.asFloatBuffer().put(centroid); // write raw centroid for quantizing the query vectors postingsOutput.writeBytes(buffer.array(), buffer.array().length); @@ -142,17 +143,18 @@ LongValues buildAndWritePostingsLists( idsWriter.writeDocIds(i -> docDeltas[i], size, postingsOutput); // write vectors bulkWriter.writeVectors(onHeapQuantizedVectors); + lengths.add(postingsOutput.getFilePointer() - fileOffset - offset); } if (logger.isDebugEnabled()) { printClusterQualityStatistics(assignmentsByCluster); } - return offsets.build(); + return new CentroidOffsetAndLength(offsets.build(), lengths.build()); } @Override - LongValues buildAndWritePostingsLists( + CentroidOffsetAndLength buildAndWritePostingsLists( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, FloatVectorValues floatVectorValues, @@ -243,6 +245,7 @@ LongValues buildAndWritePostingsLists( // now we can read the quantized vectors from the temporary file try (IndexInput quantizedVectorsInput = mergeState.segmentInfo.dir.openInput(quantizedVectorsTempName, IOContext.DEFAULT)) { final PackedLongValues.Builder offsets = PackedLongValues.monotonicBuilder(PackedInts.COMPACT); + final PackedLongValues.Builder lengths = PackedLongValues.monotonicBuilder(PackedInts.COMPACT); OffHeapQuantizedVectors offHeapQuantizedVectors = new OffHeapQuantizedVectors( quantizedVectorsInput, fieldInfo.getVectorDimension() @@ -260,7 +263,8 @@ LongValues buildAndWritePostingsLists( float[] centroid = centroidSupplier.centroid(c); int[] cluster = assignmentsByCluster[c]; boolean[] isOverspill = isOverspillByCluster[c]; - offsets.add(postingsOutput.alignFilePointer(Float.BYTES) - fileOffset); + long offset = postingsOutput.alignFilePointer(Float.BYTES) - fileOffset; + offsets.add(offset); // write raw centroid for quantizing the query vectors buffer.asFloatBuffer().put(centroid); postingsOutput.writeBytes(buffer.array(), buffer.array().length); @@ -286,12 +290,14 @@ LongValues buildAndWritePostingsLists( idsWriter.writeDocIds(i -> docDeltas[i], size, postingsOutput); // write vectors bulkWriter.writeVectors(offHeapQuantizedVectors); + lengths.add(postingsOutput.getFilePointer() - fileOffset - offset); + // lengths.add(1); } if (logger.isDebugEnabled()) { printClusterQualityStatistics(assignmentsByCluster); } - return offsets.build(); + return new CentroidOffsetAndLength(offsets.build(), lengths.build()); } } @@ -335,16 +341,16 @@ void writeCentroids( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, float[] globalCentroid, - LongValues offsets, + CentroidOffsetAndLength centroidOffsetAndLength, IndexOutput centroidOutput ) throws IOException { // TODO do we want to store these distances as well for future use? // TODO: sort centroids by global centroid (was doing so previously here) // TODO: sorting tanks recall possibly because centroids ordinals no longer are aligned if (centroidSupplier.size() > centroidsPerParentCluster * centroidsPerParentCluster) { - writeCentroidsWithParents(fieldInfo, centroidSupplier, globalCentroid, offsets, centroidOutput); + writeCentroidsWithParents(fieldInfo, centroidSupplier, globalCentroid, centroidOffsetAndLength, centroidOutput); } else { - writeCentroidsWithoutParents(fieldInfo, centroidSupplier, globalCentroid, offsets, centroidOutput); + writeCentroidsWithoutParents(fieldInfo, centroidSupplier, globalCentroid, centroidOffsetAndLength, centroidOutput); } } @@ -352,7 +358,7 @@ private void writeCentroidsWithParents( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, float[] globalCentroid, - LongValues offsets, + CentroidOffsetAndLength centroidOffsetAndLength, IndexOutput centroidOutput ) throws IOException { DiskBBQBulkWriter.SevenBitDiskBBQBulkWriter bulkWriter = new DiskBBQBulkWriter.SevenBitDiskBBQBulkWriter( @@ -392,7 +398,8 @@ private void writeCentroidsWithParents( for (int i = 0; i < centroidGroups.centroids().length; i++) { final int[] centroidAssignments = centroidGroups.vectors()[i]; for (int assignment : centroidAssignments) { - centroidOutput.writeLong(offsets.get(assignment)); + centroidOutput.writeLong(centroidOffsetAndLength.offsets().get(assignment)); + centroidOutput.writeLong(centroidOffsetAndLength.lengths().get(assignment)); } } } @@ -401,7 +408,7 @@ private void writeCentroidsWithoutParents( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, float[] globalCentroid, - LongValues offsets, + CentroidOffsetAndLength centroidOffsetAndLength, IndexOutput centroidOutput ) throws IOException { centroidOutput.writeVInt(0); @@ -419,7 +426,8 @@ private void writeCentroidsWithoutParents( bulkWriter.writeVectors(quantizedCentroids); // write the centroid offsets at the end of the file for (int i = 0; i < centroidSupplier.size(); i++) { - centroidOutput.writeLong(offsets.get(i)); + centroidOutput.writeLong(centroidOffsetAndLength.offsets().get(i)); + centroidOutput.writeLong(centroidOffsetAndLength.lengths().get(i)); } } diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java index 0043f78590ac1..d93391ce656af 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java @@ -239,7 +239,8 @@ public final void search(String field, float[] target, KnnCollector knnCollector nProbe = Math.max(Math.min(nProbe, entry.numCentroids), 1); } CentroidIterator centroidIterator = getCentroidIterator(fieldInfo, entry.numCentroids, entry.centroidSlice(ivfCentroids), target); - PostingVisitor scorer = getPostingVisitor(fieldInfo, entry.postingListSlice(ivfClusters), target, acceptDocs); + IndexInput postListSlice = entry.postingListSlice(ivfClusters); + PostingVisitor scorer = getPostingVisitor(fieldInfo, postListSlice, target, acceptDocs); int centroidsVisited = 0; long expectedDocs = 0; long actualDocs = 0; @@ -247,13 +248,45 @@ public final void search(String field, float[] target, KnnCollector knnCollector // Note, numCollected is doing the bare minimum here. // TODO do we need to handle nested doc counts similarly to how we handle // filtering? E.g. keep exploring until we hit an expected number of parent documents vs. child vectors? + long nextOffset = -1; + long nextLength = -1; while (centroidIterator.hasNext() && (centroidsVisited < nProbe || knnCollector.minCompetitiveSimilarity() == Float.NEGATIVE_INFINITY)) { ++centroidsVisited; // todo do we actually need to know the score??? - long offset = centroidIterator.nextPostingListOffset(); + + // FIXME: opportunistically prefetch here?? (could do nProbe of these ... how does this help with low memory??? + // FIXME: prefetch the next one not the current one + // FIXME: deal with last one correctly + long offset; + if (nextOffset == -1) { + offset = centroidIterator.nextPostingListOffset(); + if (centroidIterator.hasNext()) { + nextOffset = centroidIterator.nextPostingListOffset(); + nextLength = centroidIterator.curPostingListLength(); + // scorer.prefetch(offset, length); + // scorer.prefetch(nextOffset); + // FIXME: clean this up + if (nextLength == -2) { + nextLength = postListSlice.length() - nextOffset; + } + postListSlice.prefetch(nextOffset, nextLength); + } + } else { + offset = nextOffset; + + nextOffset = centroidIterator.nextPostingListOffset(); + nextLength = centroidIterator.curPostingListLength(); + // FIXME: clean this up + if (nextLength == -2) { + nextLength = postListSlice.length() - nextOffset; + } + postListSlice.prefetch(nextOffset, nextLength); + // scorer.prefetch(nextOffset); + } // todo do we need direct access to the raw centroid???, this is used for quantizing, maybe hydrating and quantizing // is enough? + expectedDocs += scorer.resetPostingsScorer(offset); actualDocs += scorer.visit(knnCollector); } @@ -262,6 +295,7 @@ public final void search(String field, float[] target, KnnCollector knnCollector int filteredVectors = (int) Math.ceil(numVectors * percentFiltered); float expectedScored = Math.min(2 * filteredVectors * unfilteredRatioVisited, expectedDocs / 2f); while (centroidIterator.hasNext() && (actualDocs < expectedScored || actualDocs < knnCollector.k())) { + // FIXME: add prefetching logic here as well long offset = centroidIterator.nextPostingListOffset(); scorer.resetPostingsScorer(offset); actualDocs += scorer.visit(knnCollector); @@ -314,11 +348,15 @@ interface CentroidIterator { boolean hasNext(); long nextPostingListOffset() throws IOException; + + long curPostingListLength() throws IOException; } interface PostingVisitor { // TODO maybe we can not specifically pass the centroid... + void prefetch(long offset) throws IOException; + /** returns the number of documents in the posting list */ int resetPostingsScorer(long offset) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsWriter.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsWriter.java index 308ee391b5f4a..63253e1cd8db7 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsWriter.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsWriter.java @@ -123,15 +123,17 @@ public final KnnFieldVectorsWriter addField(FieldInfo fieldInfo) throws IOExc abstract CentroidAssignments calculateCentroids(FieldInfo fieldInfo, FloatVectorValues floatVectorValues, float[] globalCentroid) throws IOException; + record CentroidOffsetAndLength(LongValues offsets, LongValues lengths) {} + abstract void writeCentroids( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, float[] globalCentroid, - LongValues centroidOffset, + CentroidOffsetAndLength centroidOffsetAndLength, IndexOutput centroidOutput ) throws IOException; - abstract LongValues buildAndWritePostingsLists( + abstract CentroidOffsetAndLength buildAndWritePostingsLists( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, FloatVectorValues floatVectorValues, @@ -141,7 +143,7 @@ abstract LongValues buildAndWritePostingsLists( int[] overspillAssignments ) throws IOException; - abstract LongValues buildAndWritePostingsLists( + abstract CentroidOffsetAndLength buildAndWritePostingsLists( FieldInfo fieldInfo, CentroidSupplier centroidSupplier, FloatVectorValues floatVectorValues, @@ -172,7 +174,7 @@ public final void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { final CentroidSupplier centroidSupplier = new OnHeapCentroidSupplier(centroidAssignments.centroids()); // write posting lists final long postingListOffset = ivfClusters.alignFilePointer(Float.BYTES); - final LongValues offsets = buildAndWritePostingsLists( + final CentroidOffsetAndLength centroidOffsetAndLength = buildAndWritePostingsLists( fieldWriter.fieldInfo, centroidSupplier, floatVectorValues, @@ -184,7 +186,7 @@ public final void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { final long postingListLength = ivfClusters.getFilePointer() - postingListOffset; // write centroids final long centroidOffset = ivfCentroids.alignFilePointer(Float.BYTES); - writeCentroids(fieldWriter.fieldInfo, centroidSupplier, globalCentroid, offsets, ivfCentroids); + writeCentroids(fieldWriter.fieldInfo, centroidSupplier, globalCentroid, centroidOffsetAndLength, ivfCentroids); final long centroidLength = ivfCentroids.getFilePointer() - centroidOffset; // write meta file writeMeta( @@ -354,7 +356,7 @@ private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws ); // write posting lists postingListOffset = ivfClusters.alignFilePointer(Float.BYTES); - final LongValues offsets = buildAndWritePostingsLists( + final CentroidOffsetAndLength centroidOffsetAndLength = buildAndWritePostingsLists( fieldInfo, centroidSupplier, floatVectorValues, @@ -367,7 +369,7 @@ private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws postingListLength = ivfClusters.getFilePointer() - postingListOffset; // write centroids centroidOffset = ivfCentroids.alignFilePointer(Float.BYTES); - writeCentroids(fieldInfo, centroidSupplier, calculatedGlobalCentroid, offsets, ivfCentroids); + writeCentroids(fieldInfo, centroidSupplier, calculatedGlobalCentroid, centroidOffsetAndLength, ivfCentroids); centroidLength = ivfCentroids.getFilePointer() - centroidOffset; // write meta writeMeta( diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java index 9e83accef1268..4382704c03f68 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java @@ -285,6 +285,7 @@ private void assignSpilled( int currAssignment = assignments[i]; float[] currentCentroid = centroids[currAssignment]; // TODO: cache these? + // FIXME: if close to current don't do SOAR assignments float vectorCentroidDist = VectorUtil.squareDistance(vector, currentCentroid); if (vectorCentroidDist <= SOAR_MIN_DISTANCE) { spilledAssignments[i] = -1; // no SOAR assignment From 3a958e2cbeb6aeafbd89949b0ef35563ba0d2678 Mon Sep 17 00:00:00 2001 From: John Wagster Date: Sun, 17 Aug 2025 23:58:36 -0500 Subject: [PATCH 2/6] cleanup --- qa/vector/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/vector/build.gradle b/qa/vector/build.gradle index afe7e4b29b628..41064d2bb3451 100644 --- a/qa/vector/build.gradle +++ b/qa/vector/build.gradle @@ -45,9 +45,9 @@ tasks.register("checkVec", JavaExec) { systemProperty "es.logger.level", "INFO" // Change to DEBUG if needed systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../../libs/native/libraries/build/platform/").toString()) jvmArgs '-Xms4g', '-Xmx4g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError' -// if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) { + if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) { jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED' -// } + } if (System.getenv("DO_PROFILING") != null) { jvmArgs '-XX:StartFlightRecording=dumponexit=true,maxsize=250M,filename=knn.jfr,settings=profile.jfc' } From 0145341d233870bc1c455950a3c557ae4ff09c46 Mon Sep 17 00:00:00 2001 From: John Wagster Date: Mon, 18 Aug 2025 00:01:28 -0500 Subject: [PATCH 3/6] cleanup --- .../index/codec/vectors/DefaultIVFVectorsReader.java | 1 - .../elasticsearch/index/codec/vectors/cluster/KMeansLocal.java | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index 8b864c78a510c..3b1fd716cbcdc 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -148,7 +148,6 @@ private static CentroidIterator getCentroidIteratorNoParent( ); long offset = centroids.getFilePointer(); return new CentroidIterator() { - @Override public boolean hasNext() { return neighborQueue.size() > 0; diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java index 4382704c03f68..9e83accef1268 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/cluster/KMeansLocal.java @@ -285,7 +285,6 @@ private void assignSpilled( int currAssignment = assignments[i]; float[] currentCentroid = centroids[currAssignment]; // TODO: cache these? - // FIXME: if close to current don't do SOAR assignments float vectorCentroidDist = VectorUtil.squareDistance(vector, currentCentroid); if (vectorCentroidDist <= SOAR_MIN_DISTANCE) { spilledAssignments[i] = -1; // no SOAR assignment From 2737f395f647aeff9c6ba24034b5ed70b91fa01b Mon Sep 17 00:00:00 2001 From: John Wagster Date: Mon, 18 Aug 2025 09:14:42 -0500 Subject: [PATCH 4/6] iter --- .../vectors/DefaultIVFVectorsReader.java | 34 ++++++++----------- .../index/codec/vectors/IVFVectorsReader.java | 12 +++---- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index 3b1fd716cbcdc..3e96fd031d52e 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -47,10 +47,10 @@ public DefaultIVFVectorsReader(SegmentReadState state, FlatVectorsReader rawVect super(state, rawVectorsReader); } - @Override CentroidIterator getPostingListPrefetchIterator(CentroidIterator centroidIterator, IndexInput postingListSlice) throws IOException { return new CentroidIterator() { - CentroidOffsetAndLength nextOffsetAndLength = null; + CentroidOffsetAndLength nextOffsetAndLength = + centroidIterator.hasNext() ? centroidIterator.nextPostingListOffsetAndLength() : null; private void prefetch(CentroidOffsetAndLength offsetAndLength) throws IOException { postingListSlice.prefetch(offsetAndLength.offset(), offsetAndLength.length()); @@ -64,22 +64,12 @@ public boolean hasNext() { @Override public CentroidOffsetAndLength nextPostingListOffsetAndLength() throws IOException { - CentroidOffsetAndLength offsetAndLength; - // first time we need to fetch two if possible - if (nextOffsetAndLength == null) { - offsetAndLength = centroidIterator.nextPostingListOffsetAndLength(); - if (centroidIterator.hasNext()) { - nextOffsetAndLength = centroidIterator.nextPostingListOffsetAndLength(); - prefetch(nextOffsetAndLength); - } + CentroidOffsetAndLength offsetAndLength = nextOffsetAndLength; + if (centroidIterator.hasNext()) { + nextOffsetAndLength = centroidIterator.nextPostingListOffsetAndLength(); + prefetch(nextOffsetAndLength); } else { - offsetAndLength = nextOffsetAndLength; - if (centroidIterator.hasNext()) { - nextOffsetAndLength = centroidIterator.nextPostingListOffsetAndLength(); - prefetch(nextOffsetAndLength); - } else { - nextOffsetAndLength = null; // indicate we reached the end - } + nextOffsetAndLength = null; // indicate we reached the end } return offsetAndLength; } @@ -87,7 +77,8 @@ public CentroidOffsetAndLength nextPostingListOffsetAndLength() throws IOExcepti } @Override - CentroidIterator getCentroidIterator(FieldInfo fieldInfo, int numCentroids, IndexInput centroids, float[] targetQuery) + CentroidIterator getCentroidIterator( + FieldInfo fieldInfo, int numCentroids, IndexInput centroids, float[] targetQuery, IndexInput postingListSlice) throws IOException { final FieldEntry fieldEntry = fields.get(fieldInfo.number); final float globalCentroidDp = fieldEntry.globalCentroidDp(); @@ -110,8 +101,9 @@ CentroidIterator getCentroidIterator(FieldInfo fieldInfo, int numCentroids, Inde final ES92Int7VectorsScorer scorer = ESVectorUtil.getES92Int7VectorsScorer(centroids, fieldInfo.getVectorDimension()); centroids.seek(0L); int numParents = centroids.readVInt(); + CentroidIterator centroidIterator; if (numParents > 0) { - return getCentroidIteratorWithParents( + centroidIterator = getCentroidIteratorWithParents( fieldInfo, centroids, numParents, @@ -121,8 +113,10 @@ CentroidIterator getCentroidIterator(FieldInfo fieldInfo, int numCentroids, Inde queryParams, globalCentroidDp ); + } else { + centroidIterator = getCentroidIteratorNoParent(fieldInfo, centroids, numCentroids, scorer, quantized, queryParams, globalCentroidDp); } - return getCentroidIteratorNoParent(fieldInfo, centroids, numCentroids, scorer, quantized, queryParams, globalCentroidDp); + return getPostingListPrefetchIterator(centroidIterator, postingListSlice); } private static CentroidIterator getCentroidIteratorNoParent( diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java index 4448ba268ef70..9595dd1886eb7 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java @@ -85,10 +85,8 @@ protected IVFVectorsReader(SegmentReadState state, FlatVectorsReader rawVectorsR } } - abstract CentroidIterator getPostingListPrefetchIterator(CentroidIterator centroidIterator, IndexInput postingListSlice) - throws IOException; - - abstract CentroidIterator getCentroidIterator(FieldInfo fieldInfo, int numCentroids, IndexInput centroids, float[] target) + abstract CentroidIterator getCentroidIterator( + FieldInfo fieldInfo, int numCentroids, IndexInput centroids, float[] target, IndexInput postingListSlice) throws IOException; private static IndexInput openDataInput( @@ -245,10 +243,8 @@ public final void search(String field, float[] target, KnnCollector knnCollector // we account for soar vectors here. We can potentially visit a vector twice so we multiply by 2 here. long maxVectorVisited = (long) (2.0 * visitRatio * numVectors); IndexInput postListSlice = entry.postingListSlice(ivfClusters); - CentroidIterator centroidPrefetchingIterator = getPostingListPrefetchIterator( - getCentroidIterator(fieldInfo, entry.numCentroids, entry.centroidSlice(ivfCentroids), target), - postListSlice - ); + CentroidIterator centroidPrefetchingIterator = getCentroidIterator( + fieldInfo, entry.numCentroids, entry.centroidSlice(ivfCentroids), target, postListSlice); PostingVisitor scorer = getPostingVisitor(fieldInfo, postListSlice, target, acceptDocs); long expectedDocs = 0; long actualDocs = 0; From e3791dcb04649379e88c80f09f9a59358e561f99 Mon Sep 17 00:00:00 2001 From: John Wagster Date: Mon, 18 Aug 2025 09:16:33 -0500 Subject: [PATCH 5/6] spotless --- .../vectors/DefaultIVFVectorsReader.java | 23 +++++++++++++++---- .../index/codec/vectors/IVFVectorsReader.java | 15 +++++++++--- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index 3e96fd031d52e..fe4817a43e1e1 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -49,8 +49,9 @@ public DefaultIVFVectorsReader(SegmentReadState state, FlatVectorsReader rawVect CentroidIterator getPostingListPrefetchIterator(CentroidIterator centroidIterator, IndexInput postingListSlice) throws IOException { return new CentroidIterator() { - CentroidOffsetAndLength nextOffsetAndLength = - centroidIterator.hasNext() ? centroidIterator.nextPostingListOffsetAndLength() : null; + CentroidOffsetAndLength nextOffsetAndLength = centroidIterator.hasNext() + ? centroidIterator.nextPostingListOffsetAndLength() + : null; private void prefetch(CentroidOffsetAndLength offsetAndLength) throws IOException { postingListSlice.prefetch(offsetAndLength.offset(), offsetAndLength.length()); @@ -78,8 +79,12 @@ public CentroidOffsetAndLength nextPostingListOffsetAndLength() throws IOExcepti @Override CentroidIterator getCentroidIterator( - FieldInfo fieldInfo, int numCentroids, IndexInput centroids, float[] targetQuery, IndexInput postingListSlice) - throws IOException { + FieldInfo fieldInfo, + int numCentroids, + IndexInput centroids, + float[] targetQuery, + IndexInput postingListSlice + ) throws IOException { final FieldEntry fieldEntry = fields.get(fieldInfo.number); final float globalCentroidDp = fieldEntry.globalCentroidDp(); final OptimizedScalarQuantizer scalarQuantizer = new OptimizedScalarQuantizer(fieldInfo.getVectorSimilarityFunction()); @@ -114,7 +119,15 @@ CentroidIterator getCentroidIterator( globalCentroidDp ); } else { - centroidIterator = getCentroidIteratorNoParent(fieldInfo, centroids, numCentroids, scorer, quantized, queryParams, globalCentroidDp); + centroidIterator = getCentroidIteratorNoParent( + fieldInfo, + centroids, + numCentroids, + scorer, + quantized, + queryParams, + globalCentroidDp + ); } return getPostingListPrefetchIterator(centroidIterator, postingListSlice); } diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java index 9595dd1886eb7..3d1e9b4dce99f 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java @@ -86,8 +86,12 @@ protected IVFVectorsReader(SegmentReadState state, FlatVectorsReader rawVectorsR } abstract CentroidIterator getCentroidIterator( - FieldInfo fieldInfo, int numCentroids, IndexInput centroids, float[] target, IndexInput postingListSlice) - throws IOException; + FieldInfo fieldInfo, + int numCentroids, + IndexInput centroids, + float[] target, + IndexInput postingListSlice + ) throws IOException; private static IndexInput openDataInput( SegmentReadState state, @@ -244,7 +248,12 @@ public final void search(String field, float[] target, KnnCollector knnCollector long maxVectorVisited = (long) (2.0 * visitRatio * numVectors); IndexInput postListSlice = entry.postingListSlice(ivfClusters); CentroidIterator centroidPrefetchingIterator = getCentroidIterator( - fieldInfo, entry.numCentroids, entry.centroidSlice(ivfCentroids), target, postListSlice); + fieldInfo, + entry.numCentroids, + entry.centroidSlice(ivfCentroids), + target, + postListSlice + ); PostingVisitor scorer = getPostingVisitor(fieldInfo, postListSlice, target, acceptDocs); long expectedDocs = 0; long actualDocs = 0; From b92bfdd332e59610a0d2b81576dda25f7e3d4bff Mon Sep 17 00:00:00 2001 From: John Wagster Date: Mon, 18 Aug 2025 22:29:07 -0500 Subject: [PATCH 6/6] iter --- .../index/codec/vectors/DefaultIVFVectorsReader.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java index fe4817a43e1e1..e191ce96ea2ed 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java @@ -53,14 +53,20 @@ CentroidIterator getPostingListPrefetchIterator(CentroidIterator centroidIterato ? centroidIterator.nextPostingListOffsetAndLength() : null; - private void prefetch(CentroidOffsetAndLength offsetAndLength) throws IOException { + { + // prefetch the first one + if (nextOffsetAndLength != null) { + prefetch(nextOffsetAndLength); + } + } + + void prefetch(CentroidOffsetAndLength offsetAndLength) throws IOException { postingListSlice.prefetch(offsetAndLength.offset(), offsetAndLength.length()); } @Override public boolean hasNext() { - // none left or we have a one already fetched to return - return centroidIterator.hasNext() || nextOffsetAndLength != null; + return nextOffsetAndLength != null; } @Override