Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,10 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor {
final float[] correctionsAdd = new float[BULK_SIZE];
final int[] docIdsScratch;

int vectors;
int totalVectors;
boolean quantized = false;
float centroidDp;
final float[] centroid;
long slicePos;
OptimizedScalarQuantizer.QuantizationResult queryCorrections;
DocIdsWriter docIdsWriter = new DocIdsWriter();

Expand Down Expand Up @@ -367,12 +366,9 @@ public int resetPostingsScorer(long offset) throws IOException {
indexInput.seek(offset);
indexInput.readFloats(centroid, 0, centroid.length);
centroidDp = Float.intBitsToFloat(indexInput.readInt());
vectors = indexInput.readVInt();
// read the doc ids
assert vectors <= docIdsScratch.length;
docIdsWriter.readInts(indexInput, vectors, docIdsScratch);
slicePos = indexInput.getFilePointer();
return vectors;
totalVectors = indexInput.readVInt();

return totalVectors;
}

float scoreIndividually(int offset) throws IOException {
Expand All @@ -381,13 +377,13 @@ float scoreIndividually(int offset) throws IOException {
for (int j = 0; j < BULK_SIZE; j++) {
int doc = docIdsScratch[j + offset];
if (doc != -1) {
indexInput.seek(slicePos + (offset * quantizedByteLength) + (j * quantizedVectorByteSize));
float qcDist = osqVectorsScorer.quantizeScore(quantizedQueryScratch);
scores[j] = qcDist;
} else {
indexInput.skipBytes(quantizedVectorByteSize);
}
}
// read in all corrections
indexInput.seek(slicePos + (offset * quantizedByteLength) + (BULK_SIZE * quantizedVectorByteSize));
indexInput.readFloats(correctionsLower, 0, BULK_SIZE);
indexInput.readFloats(correctionsUpper, 0, BULK_SIZE);
for (int j = 0; j < BULK_SIZE; j++) {
Expand Down Expand Up @@ -444,18 +440,36 @@ private static int collect(int[] docIds, int offset, KnnCollector knnCollector,

@Override
public int visit(KnnCollector knnCollector) throws IOException {
byte postingListType = indexInput.readByte();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just always put the block of docs at the start of every block?

So every block is
[blk0, blk1, blk2, ... tail] [[encoded doc ids, vectors],...[tail encoded doc ids, vectors]]`

We know the block size (16), we know the previous base block (if we want to delta encode eventually).

If we ever split soar and regular docs, we can delta encode with the "doc base" (just like regular postings list).

Are we concerned about speed or just size increase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fair, I can have a go to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced this approach in 71e30a1. Much simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much simpler indeed. My only concern is index size & performance. I would expect them to be mostly comparable, but you never know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way docIdsWriter works, I would expect better compression of docIds with the penalty of one byte per 16 vectors, so all in all it should be the same or even smaller (I am checking).
I don't expect and see any performance implications.

Copy link
Contributor Author

@iverase iverase Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking the posting list size of 1m vectors with 1024 dims:

main: 302.780.965 bytes
PR: 301.815.585 bytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even smaller?!?!? awesome!

Copy link
Contributor Author

@iverase iverase Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flamegraphs show a performance penalty (because of the extra byte).

main:

image

PR:

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it breaks alignment, which is frustrating.

I am testing with sorted doc-ids right now (now that we aren't skipping duplicate vectors).

GroupVarInt also has a "single byte read" to determine the output flag. Having a single byte read for every group of 16 integers does seem weird.

I wonder if we can do something more clever by delta-encoding all the vectors (we read all the blocks in order anyways, so we can keep the running sum), and pick the appropriate encoding that works for all the blocks. Then we can write that encoding byte at the front of the entire list, and have uniform encoding for every block.

This might be slightly less disk efficient, but it will likely align better.

if (postingListType == DefaultIVFVectorsWriter.SINGLE_BLOCK_POSTING_LIST) {
return singleBlockVisit(knnCollector, totalVectors);
} else {
assert postingListType == DefaultIVFVectorsWriter.MULTI_BLOCK_POSTING_LIST;
final int numBlocks = indexInput.readVInt();
int scoredDocs = 0;
for (int i = 0; i < numBlocks; i++) {
final int numVectors = indexInput.readVInt();
scoredDocs += singleBlockVisit(knnCollector, numVectors);
}
return scoredDocs;
}
}

private int singleBlockVisit(KnnCollector knnCollector, int numVectors) throws IOException {
assert numVectors <= docIdsScratch.length : "numVectors: " + numVectors + ", docIdsScratch.length: " + docIdsScratch.length;
docIdsWriter.readInts(indexInput, numVectors, docIdsScratch);
// block processing
int scoredDocs = 0;
int limit = vectors - BULK_SIZE + 1;
int limit = numVectors - BULK_SIZE + 1;
int i = 0;
for (; i < limit; i += BULK_SIZE) {
int docsToScore = BULK_SIZE - filterDocs(docIdsScratch, i, needsScoring);
if (docsToScore == 0) {
indexInput.skipBytes(BULK_SIZE * quantizedByteLength);
continue;
}
quantizeQueryIfNecessary();
indexInput.seek(slicePos + i * quantizedByteLength);
float maxScore = Float.NEGATIVE_INFINITY;
float maxScore;
if (docsToScore < BULK_SIZE / 2) {
maxScore = scoreIndividually(i);
} else {
Expand All @@ -475,11 +489,10 @@ public int visit(KnnCollector knnCollector) throws IOException {
}
}
// process tail
for (; i < vectors; i++) {
for (; i < numVectors; i++) {
int doc = docIdsScratch[i];
if (needsScoring.test(doc)) {
quantizeQueryIfNecessary();
indexInput.seek(slicePos + i * quantizedByteLength);
float qcDist = osqVectorsScorer.quantizeScore(quantizedQueryScratch);
indexInput.readFloats(correctiveValues, 0, 3);
final int quantizedComponentSum = Short.toUnsignedInt(indexInput.readShort());
Expand All @@ -498,6 +511,8 @@ public int visit(KnnCollector knnCollector) throws IOException {
);
scoredDocs++;
knnCollector.collect(doc, score);
} else {
indexInput.skipBytes(quantizedByteLength);
}
}
if (scoredDocs > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.ByteOrder;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.function.IntPredicate;

/**
* Default implementation of {@link IVFVectorsWriter}. It uses {@link HierarchicalKMeans} algorithm to
Expand All @@ -43,6 +44,10 @@
*/
public class DefaultIVFVectorsWriter extends IVFVectorsWriter {
private static final Logger logger = LogManager.getLogger(DefaultIVFVectorsWriter.class);
// posting lists bigger than that will be split in two or more blocks
private static final int MAX_POSTING_LIST_BLOCK_SIZE = 16 * 100;
public static final byte SINGLE_BLOCK_POSTING_LIST = 0;
public static final byte MULTI_BLOCK_POSTING_LIST = 1;

private final int vectorPerCluster;
private final int centroidsPerParentCluster;
Expand Down Expand Up @@ -98,7 +103,7 @@ LongValues buildAndWritePostingsLists(
}
}
// write the max posting list size
postingsOutput.writeVInt(maxPostingListSize);
postingsOutput.writeVInt(Math.min(MAX_POSTING_LIST_BLOCK_SIZE, maxPostingListSize));
// write the posting lists
final PackedLongValues.Builder offsets = PackedLongValues.monotonicBuilder(PackedInts.COMPACT);
DocIdsWriter docIdsWriter = new DocIdsWriter();
Expand All @@ -121,13 +126,31 @@ LongValues buildAndWritePostingsLists(
int size = cluster.length;
// write docIds
postingsOutput.writeVInt(size);
onHeapQuantizedVectors.reset(centroid, size, ord -> cluster[ord]);
// TODO we might want to consider putting the docIds in a separate file
// to aid with only having to fetch vectors from slower storage when they are required
// keeping them in the same file indicates we pull the entire file into cache
docIdsWriter.writeDocIds(j -> floatVectorValues.ordToDoc(cluster[j]), size, postingsOutput);
// write vectors
bulkWriter.writeVectors(onHeapQuantizedVectors);
if (size > MAX_POSTING_LIST_BLOCK_SIZE) {
postingsOutput.writeByte(MULTI_BLOCK_POSTING_LIST);
writeOnHeapMultiBlockPostingList(
postingsOutput,
floatVectorValues,
onHeapQuantizedVectors,
centroid,
cluster,
size,
docIdsWriter,
bulkWriter
);
} else {
postingsOutput.writeByte(SINGLE_BLOCK_POSTING_LIST);
writeOnHeapSingleBlockPostingList(
postingsOutput,
floatVectorValues,
onHeapQuantizedVectors,
centroid,
k -> cluster[k],
size,
docIdsWriter,
bulkWriter
);
}
}

if (logger.isDebugEnabled()) {
Expand All @@ -137,6 +160,69 @@ LongValues buildAndWritePostingsLists(
return offsets.build();
}

private void writeOnHeapMultiBlockPostingList(
IndexOutput postingsOutput,
FloatVectorValues floatVectorValues,
OnHeapQuantizedVectors onHeapQuantizedVectors,
float[] centroid,
int[] cluster,
int size,
DocIdsWriter docIdsWriter,
DiskBBQBulkWriter bulkWriter
) throws IOException {
int numBlocks = (int) Math.ceil((double) size / MAX_POSTING_LIST_BLOCK_SIZE);
postingsOutput.writeVInt(numBlocks);
for (int i = 0; i < numBlocks - 1; i++) {
int offset = MAX_POSTING_LIST_BLOCK_SIZE * i;
postingsOutput.writeVInt(MAX_POSTING_LIST_BLOCK_SIZE);
writeOnHeapSingleBlockPostingList(
postingsOutput,
floatVectorValues,
onHeapQuantizedVectors,
centroid,
k -> cluster[offset + k],
MAX_POSTING_LIST_BLOCK_SIZE,
docIdsWriter,
bulkWriter
);
}
int lastBlock = size - (numBlocks - 1) * MAX_POSTING_LIST_BLOCK_SIZE;
assert lastBlock >= 0;
if (lastBlock > 0) {
postingsOutput.writeVInt(lastBlock);
writeOnHeapSingleBlockPostingList(
postingsOutput,
floatVectorValues,
onHeapQuantizedVectors,
centroid,
k -> cluster[(numBlocks - 1) * MAX_POSTING_LIST_BLOCK_SIZE + k],
lastBlock,
docIdsWriter,
bulkWriter
);
}
}

private void writeOnHeapSingleBlockPostingList(
IndexOutput postingsOutput,
FloatVectorValues floatVectorValues,
OnHeapQuantizedVectors onHeapQuantizedVectors,
float[] centroid,
IntToIntFunction cluster,
int size,
DocIdsWriter docIdsWriter,
DiskBBQBulkWriter bulkWriter
) throws IOException {

onHeapQuantizedVectors.reset(centroid, size, cluster);
// TODO we might want to consider putting the docIds in a separate file
// to aid with only having to fetch vectors from slower storage when they are required
// keeping them in the same file indicates we pull the entire file into cache
docIdsWriter.writeDocIds(j -> floatVectorValues.ordToDoc(cluster.apply(j)), size, postingsOutput);
// write vectors
bulkWriter.writeVectors(onHeapQuantizedVectors);
}

@Override
LongValues buildAndWritePostingsLists(
FieldInfo fieldInfo,
Expand Down Expand Up @@ -237,7 +323,7 @@ LongValues buildAndWritePostingsLists(
DiskBBQBulkWriter bulkWriter = new DiskBBQBulkWriter.OneBitDiskBBQBulkWriter(ES91OSQVectorsScorer.BULK_SIZE, postingsOutput);
final ByteBuffer buffer = ByteBuffer.allocate(fieldInfo.getVectorDimension() * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN);
// write the max posting list size
postingsOutput.writeVInt(maxPostingListSize);
postingsOutput.writeVInt(Math.min(MAX_POSTING_LIST_BLOCK_SIZE, maxPostingListSize));
// write the posting lists
for (int c = 0; c < centroidSupplier.size(); c++) {
float[] centroid = centroidSupplier.centroid(c);
Expand All @@ -252,13 +338,31 @@ LongValues buildAndWritePostingsLists(
// write docIds
int size = cluster.length;
postingsOutput.writeVInt(size);
offHeapQuantizedVectors.reset(size, ord -> isOverspill[ord], ord -> cluster[ord]);
// TODO we might want to consider putting the docIds in a separate file
// to aid with only having to fetch vectors from slower storage when they are required
// keeping them in the same file indicates we pull the entire file into cache
docIdsWriter.writeDocIds(j -> floatVectorValues.ordToDoc(cluster[j]), size, postingsOutput);
// write vectors
bulkWriter.writeVectors(offHeapQuantizedVectors);
if (size > MAX_POSTING_LIST_BLOCK_SIZE) {
postingsOutput.writeByte(MULTI_BLOCK_POSTING_LIST);
writeOffHeapMultiBlockPostingList(
postingsOutput,
floatVectorValues,
offHeapQuantizedVectors,
cluster,
size,
isOverspill,
docIdsWriter,
bulkWriter
);
} else {
postingsOutput.writeByte(SINGLE_BLOCK_POSTING_LIST);
writeOffHeapBlockPostingList(
postingsOutput,
floatVectorValues,
offHeapQuantizedVectors,
k -> cluster[k],
size,
b -> isOverspill[b],
docIdsWriter,
bulkWriter
);
}
}

if (logger.isDebugEnabled()) {
Expand All @@ -268,6 +372,69 @@ LongValues buildAndWritePostingsLists(
}
}

private void writeOffHeapMultiBlockPostingList(
IndexOutput postingsOutput,
FloatVectorValues floatVectorValues,
OffHeapQuantizedVectors offHeapQuantizedVectors,
int[] cluster,
int size,
boolean[] isOverspill,
DocIdsWriter docIdsWriter,
DiskBBQBulkWriter bulkWriter
) throws IOException {
int numBlocks = (int) Math.ceil((double) size / MAX_POSTING_LIST_BLOCK_SIZE);
postingsOutput.writeVInt(numBlocks);
for (int i = 0; i < numBlocks - 1; i++) {
int offset = MAX_POSTING_LIST_BLOCK_SIZE * i;
postingsOutput.writeVInt(MAX_POSTING_LIST_BLOCK_SIZE);
writeOffHeapBlockPostingList(
postingsOutput,
floatVectorValues,
offHeapQuantizedVectors,
k -> cluster[offset + k],
MAX_POSTING_LIST_BLOCK_SIZE,
b -> isOverspill[offset + b],
docIdsWriter,
bulkWriter
);
}
int lastBlock = size - (numBlocks - 1) * MAX_POSTING_LIST_BLOCK_SIZE;
assert lastBlock >= 0;
if (lastBlock > 0) {
postingsOutput.writeVInt(lastBlock);
writeOffHeapBlockPostingList(
postingsOutput,
floatVectorValues,
offHeapQuantizedVectors,
k -> cluster[(numBlocks - 1) * MAX_POSTING_LIST_BLOCK_SIZE + k],
lastBlock,
b -> isOverspill[(numBlocks - 1) * MAX_POSTING_LIST_BLOCK_SIZE + b],
docIdsWriter,
bulkWriter
);
}
}

private void writeOffHeapBlockPostingList(
IndexOutput postingsOutput,
FloatVectorValues floatVectorValues,
OffHeapQuantizedVectors offHeapQuantizedVectors,
IntToIntFunction cluster,
int size,
IntPredicate isOverspill,
DocIdsWriter docIdsWriter,
DiskBBQBulkWriter bulkWriter
) throws IOException {

offHeapQuantizedVectors.reset(size, isOverspill::test, cluster);
// TODO we might want to consider putting the docIds in a separate file
// to aid with only having to fetch vectors from slower storage when they are required
// keeping them in the same file indicates we pull the entire file into cache
docIdsWriter.writeDocIds(j -> floatVectorValues.ordToDoc(cluster.apply(j)), size, postingsOutput);
// write vectors
bulkWriter.writeVectors(offHeapQuantizedVectors);
}

private static void printClusterQualityStatistics(int[][] clusters) {
float min = Float.MAX_VALUE;
float max = Float.MIN_VALUE;
Expand Down
Loading