Skip to content

Commit b81985e

Browse files
authored
Block encode doc ids (#134170)
To protect against accidentally very large clusters, we shouldn't eagerly load ALL docIds for a postings list at time. This block encodes them into `BLOCK_SIZE` chunks (16). Running benchmarks I found no measurable impact on index time. There MAY be a small increase at query time? The numbers aren't clear. If there is an impact, it seems very small. As an aside, I am wondering if our block size is too conservative and if we should up it to 32 for vectors and docs as our typical running hardware is avx256 and 512...
1 parent c05c61d commit b81985e

File tree

3 files changed

+67
-42
lines changed

3 files changed

+67
-42
lines changed

server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,9 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor {
376376
final float[] correctionsUpper = new float[BULK_SIZE];
377377
final int[] correctionsSum = new int[BULK_SIZE];
378378
final float[] correctionsAdd = new float[BULK_SIZE];
379-
final int[] docIdsScratch;
379+
final int[] docIdsScratch = new int[BULK_SIZE];
380+
byte docEncoding;
381+
int docBase = 0;
380382

381383
int vectors;
382384
boolean quantized = false;
@@ -415,7 +417,6 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor {
415417
quantizedVectorByteSize = (discretizedDimensions / 8);
416418
quantizer = new OptimizedScalarQuantizer(fieldInfo.getVectorSimilarityFunction(), DEFAULT_LAMBDA, 1);
417419
osqVectorsScorer = ESVectorUtil.getES91OSQVectorsScorer(indexInput, fieldInfo.getVectorDimension());
418-
this.docIdsScratch = new int[maxPostingListSize];
419420
}
420421

421422
@Override
@@ -425,24 +426,17 @@ public int resetPostingsScorer(long offset) throws IOException {
425426
indexInput.readFloats(centroid, 0, centroid.length);
426427
centroidDp = Float.intBitsToFloat(indexInput.readInt());
427428
vectors = indexInput.readVInt();
428-
// read the doc ids
429-
assert vectors <= docIdsScratch.length;
430-
idsWriter.readInts(indexInput, vectors, docIdsScratch);
431-
// reconstitute from the deltas
432-
int sum = 0;
433-
for (int i = 0; i < vectors; i++) {
434-
sum += docIdsScratch[i];
435-
docIdsScratch[i] = sum;
436-
}
429+
docEncoding = indexInput.readByte();
430+
docBase = 0;
437431
slicePos = indexInput.getFilePointer();
438432
return vectors;
439433
}
440434

441-
private float scoreIndividually(int offset) throws IOException {
435+
private float scoreIndividually() throws IOException {
442436
float maxScore = Float.NEGATIVE_INFINITY;
443437
// score individually, first the quantized byte chunk
444438
for (int j = 0; j < BULK_SIZE; j++) {
445-
int doc = docIdsScratch[j + offset];
439+
int doc = docIdsScratch[j];
446440
if (doc != -1) {
447441
float qcDist = osqVectorsScorer.quantizeScore(quantizedQueryScratch);
448442
scores[j] = qcDist;
@@ -459,7 +453,7 @@ private float scoreIndividually(int offset) throws IOException {
459453
indexInput.readFloats(correctionsAdd, 0, BULK_SIZE);
460454
// Now apply corrections
461455
for (int j = 0; j < BULK_SIZE; j++) {
462-
int doc = docIdsScratch[offset + j];
456+
int doc = docIdsScratch[j];
463457
if (doc != -1) {
464458
scores[j] = osqVectorsScorer.score(
465459
queryCorrections.lowerInterval(),
@@ -482,45 +476,56 @@ private float scoreIndividually(int offset) throws IOException {
482476
return maxScore;
483477
}
484478

485-
private static int docToBulkScore(int[] docIds, int offset, Bits acceptDocs) {
479+
private static int docToBulkScore(int[] docIds, Bits acceptDocs) {
486480
assert acceptDocs != null : "acceptDocs must not be null";
487481
int docToScore = ES91OSQVectorsScorer.BULK_SIZE;
488482
for (int i = 0; i < ES91OSQVectorsScorer.BULK_SIZE; i++) {
489-
final int idx = offset + i;
490-
if (acceptDocs.get(docIds[idx]) == false) {
491-
docIds[idx] = -1;
483+
if (acceptDocs.get(docIds[i]) == false) {
484+
docIds[i] = -1;
492485
docToScore--;
493486
}
494487
}
495488
return docToScore;
496489
}
497490

498-
private static void collectBulk(int[] docIds, int offset, KnnCollector knnCollector, float[] scores) {
491+
private void collectBulk(KnnCollector knnCollector, float[] scores) {
499492
for (int i = 0; i < ES91OSQVectorsScorer.BULK_SIZE; i++) {
500-
final int doc = docIds[offset + i];
493+
final int doc = docIdsScratch[i];
501494
if (doc != -1) {
502495
knnCollector.collect(doc, scores[i]);
503496
}
504497
}
505498
}
506499

500+
private void readDocIds(int count) throws IOException {
501+
idsWriter.readInts(indexInput, count, docEncoding, docIdsScratch);
502+
// reconstitute from the deltas
503+
for (int j = 0; j < count; j++) {
504+
docBase += docIdsScratch[j];
505+
docIdsScratch[j] = docBase;
506+
}
507+
}
508+
507509
@Override
508510
public int visit(KnnCollector knnCollector) throws IOException {
509511
indexInput.seek(slicePos);
510512
// block processing
511513
int scoredDocs = 0;
512514
int limit = vectors - BULK_SIZE + 1;
513515
int i = 0;
516+
// read Docs
514517
for (; i < limit; i += BULK_SIZE) {
515-
final int docsToBulkScore = acceptDocs == null ? BULK_SIZE : docToBulkScore(docIdsScratch, i, acceptDocs);
518+
// read the doc ids
519+
readDocIds(BULK_SIZE);
520+
final int docsToBulkScore = acceptDocs == null ? BULK_SIZE : docToBulkScore(docIdsScratch, acceptDocs);
516521
if (docsToBulkScore == 0) {
517522
indexInput.skipBytes(quantizedByteLength * BULK_SIZE);
518523
continue;
519524
}
520525
quantizeQueryIfNecessary();
521526
final float maxScore;
522527
if (docsToBulkScore < BULK_SIZE / 2) {
523-
maxScore = scoreIndividually(i);
528+
maxScore = scoreIndividually();
524529
} else {
525530
maxScore = osqVectorsScorer.scoreBulk(
526531
quantizedQueryScratch,
@@ -534,13 +539,18 @@ public int visit(KnnCollector knnCollector) throws IOException {
534539
);
535540
}
536541
if (knnCollector.minCompetitiveSimilarity() < maxScore) {
537-
collectBulk(docIdsScratch, i, knnCollector, scores);
542+
collectBulk(knnCollector, scores);
538543
}
539544
scoredDocs += docsToBulkScore;
540545
}
541546
// process tail
547+
// read the doc ids
548+
if (i < vectors) {
549+
readDocIds(vectors - i);
550+
}
551+
int count = 0;
542552
for (; i < vectors; i++) {
543-
int doc = docIdsScratch[i];
553+
int doc = docIdsScratch[count++];
544554
if (acceptDocs == null || acceptDocs.get(doc)) {
545555
quantizeQueryIfNecessary();
546556
float qcDist = osqVectorsScorer.quantizeScore(quantizedQueryScratch);

server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsWriter.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ CentroidOffsetAndLength buildAndWritePostingsLists(
138138
docDeltas[j] = j == 0 ? docIds[clusterOrds[j]] : docIds[clusterOrds[j]] - docIds[clusterOrds[j - 1]];
139139
}
140140
onHeapQuantizedVectors.reset(centroid, size, ord -> cluster[clusterOrds[ord]]);
141-
// TODO we might want to consider putting the docIds in a separate file
142-
// to aid with only having to fetch vectors from slower storage when they are required
143-
// keeping them in the same file indicates we pull the entire file into cache
144-
idsWriter.writeDocIds(i -> docDeltas[i], size, postingsOutput);
145-
// write vectors
146-
bulkWriter.writeVectors(onHeapQuantizedVectors);
141+
byte encoding = idsWriter.calculateBlockEncoding(i -> docDeltas[i], size, ES91OSQVectorsScorer.BULK_SIZE);
142+
postingsOutput.writeByte(encoding);
143+
bulkWriter.writeVectors(onHeapQuantizedVectors, i -> {
144+
// for vector i we write `bulk` size docs or the remaining docs
145+
idsWriter.writeDocIds(d -> docDeltas[i + d], Math.min(ES91OSQVectorsScorer.BULK_SIZE, size - i), encoding, postingsOutput);
146+
});
147147
lengths.add(postingsOutput.getFilePointer() - fileOffset - offset);
148148
}
149149

@@ -287,15 +287,20 @@ CentroidOffsetAndLength buildAndWritePostingsLists(
287287
for (int j = 0; j < size; j++) {
288288
docDeltas[j] = j == 0 ? docIds[clusterOrds[j]] : docIds[clusterOrds[j]] - docIds[clusterOrds[j - 1]];
289289
}
290+
byte encoding = idsWriter.calculateBlockEncoding(i -> docDeltas[i], size, ES91OSQVectorsScorer.BULK_SIZE);
291+
postingsOutput.writeByte(encoding);
290292
offHeapQuantizedVectors.reset(size, ord -> isOverspill[clusterOrds[ord]], ord -> cluster[clusterOrds[ord]]);
291-
// TODO we might want to consider putting the docIds in a separate file
292-
// to aid with only having to fetch vectors from slower storage when they are required
293-
// keeping them in the same file indicates we pull the entire file into cache
294-
idsWriter.writeDocIds(i -> docDeltas[i], size, postingsOutput);
295293
// write vectors
296-
bulkWriter.writeVectors(offHeapQuantizedVectors);
294+
bulkWriter.writeVectors(offHeapQuantizedVectors, i -> {
295+
// for vector i we write `bulk` size docs or the remaining docs
296+
idsWriter.writeDocIds(
297+
d -> docDeltas[d + i],
298+
Math.min(ES91OSQVectorsScorer.BULK_SIZE, size - i),
299+
encoding,
300+
postingsOutput
301+
);
302+
});
297303
lengths.add(postingsOutput.getFilePointer() - fileOffset - offset);
298-
// lengths.add(1);
299304
}
300305

301306
if (logger.isDebugEnabled()) {
@@ -381,7 +386,7 @@ private void writeCentroidsWithParents(
381386
osq,
382387
globalCentroid
383388
);
384-
bulkWriter.writeVectors(parentQuantizeCentroid);
389+
bulkWriter.writeVectors(parentQuantizeCentroid, null);
385390
int offset = 0;
386391
for (int i = 0; i < centroidGroups.centroids().length; i++) {
387392
centroidOutput.writeInt(offset);
@@ -398,7 +403,7 @@ private void writeCentroidsWithParents(
398403
for (int i = 0; i < centroidGroups.centroids().length; i++) {
399404
final int[] centroidAssignments = centroidGroups.vectors()[i];
400405
childrenQuantizeCentroid.reset(idx -> centroidAssignments[idx], centroidAssignments.length);
401-
bulkWriter.writeVectors(childrenQuantizeCentroid);
406+
bulkWriter.writeVectors(childrenQuantizeCentroid, null);
402407
}
403408
// write the centroid offsets at the end of the file
404409
for (int i = 0; i < centroidGroups.centroids().length; i++) {
@@ -429,7 +434,7 @@ private void writeCentroidsWithoutParents(
429434
osq,
430435
globalCentroid
431436
);
432-
bulkWriter.writeVectors(quantizedCentroids);
437+
bulkWriter.writeVectors(quantizedCentroids, null);
433438
// write the centroid offsets at the end of the file
434439
for (int i = 0; i < centroidSupplier.size(); i++) {
435440
centroidOutput.writeLong(centroidOffsetAndLength.offsets().get(i));

server/src/main/java/org/elasticsearch/index/codec/vectors/DiskBBQBulkWriter.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.index.codec.vectors;
1111

12+
import org.apache.lucene.search.CheckedIntConsumer;
1213
import org.apache.lucene.store.IndexOutput;
1314

1415
import java.io.IOException;
@@ -27,7 +28,8 @@ protected DiskBBQBulkWriter(int bulkSize, IndexOutput out) {
2728
this.out = out;
2829
}
2930

30-
abstract void writeVectors(DefaultIVFVectorsWriter.QuantizedVectorValues qvv) throws IOException;
31+
abstract void writeVectors(DefaultIVFVectorsWriter.QuantizedVectorValues qvv, CheckedIntConsumer<IOException> docsWriter)
32+
throws IOException;
3133

3234
static class OneBitDiskBBQBulkWriter extends DiskBBQBulkWriter {
3335
private final OptimizedScalarQuantizer.QuantizationResult[] corrections;
@@ -38,17 +40,24 @@ static class OneBitDiskBBQBulkWriter extends DiskBBQBulkWriter {
3840
}
3941

4042
@Override
41-
void writeVectors(DefaultIVFVectorsWriter.QuantizedVectorValues qvv) throws IOException {
43+
void writeVectors(DefaultIVFVectorsWriter.QuantizedVectorValues qvv, CheckedIntConsumer<IOException> docsWriter)
44+
throws IOException {
4245
int limit = qvv.count() - bulkSize + 1;
4346
int i = 0;
4447
for (; i < limit; i += bulkSize) {
48+
if (docsWriter != null) {
49+
docsWriter.accept(i);
50+
}
4551
for (int j = 0; j < bulkSize; j++) {
4652
byte[] qv = qvv.next();
4753
corrections[j] = qvv.getCorrections();
4854
out.writeBytes(qv, qv.length);
4955
}
5056
writeCorrections(corrections);
5157
}
58+
if (i < qvv.count() && docsWriter != null) {
59+
docsWriter.accept(i);
60+
}
5261
// write tail
5362
for (; i < qvv.count(); ++i) {
5463
byte[] qv = qvv.next();
@@ -94,7 +103,8 @@ static class SevenBitDiskBBQBulkWriter extends DiskBBQBulkWriter {
94103
}
95104

96105
@Override
97-
void writeVectors(DefaultIVFVectorsWriter.QuantizedVectorValues qvv) throws IOException {
106+
void writeVectors(DefaultIVFVectorsWriter.QuantizedVectorValues qvv, CheckedIntConsumer<IOException> docsWriter)
107+
throws IOException {
98108
int limit = qvv.count() - bulkSize + 1;
99109
int i = 0;
100110
for (; i < limit; i += bulkSize) {

0 commit comments

Comments
 (0)