Skip to content

Commit fab9dcc

Browse files
Merge branch 'main' into add_cache_read_miss_metrics
2 parents b856169 + 7ec5b41 commit fab9dcc

File tree

10 files changed

+561
-446
lines changed

10 files changed

+561
-446
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 77 additions & 84 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ private static void score(
297297
PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring)
298298
throws IOException {
299299
FieldEntry entry = fields.get(fieldInfo.number);
300-
return new MemorySegmentPostingsVisitor(target, indexInput.clone(), entry, fieldInfo, needsScoring);
300+
final int maxPostingListSize = indexInput.readVInt();
301+
return new MemorySegmentPostingsVisitor(target, indexInput, entry, fieldInfo, maxPostingListSize, needsScoring);
301302
}
302303

303304
@Override
@@ -318,8 +319,8 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor {
318319
final float[] correctionsUpper = new float[BULK_SIZE];
319320
final int[] correctionsSum = new int[BULK_SIZE];
320321
final float[] correctionsAdd = new float[BULK_SIZE];
322+
final int[] docIdsScratch;
321323

322-
int[] docIdsScratch = new int[0];
323324
int vectors;
324325
boolean quantized = false;
325326
float centroidDp;
@@ -340,6 +341,7 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor {
340341
IndexInput indexInput,
341342
FieldEntry entry,
342343
FieldInfo fieldInfo,
344+
int maxPostingListSize,
343345
IntPredicate needsScoring
344346
) throws IOException {
345347
this.target = target;
@@ -356,6 +358,7 @@ private static class MemorySegmentPostingsVisitor implements PostingVisitor {
356358
quantizedVectorByteSize = (discretizedDimensions / 8);
357359
quantizer = new OptimizedScalarQuantizer(fieldInfo.getVectorSimilarityFunction(), DEFAULT_LAMBDA, 1);
358360
osqVectorsScorer = ESVectorUtil.getES91OSQVectorsScorer(indexInput, fieldInfo.getVectorDimension());
361+
this.docIdsScratch = new int[maxPostingListSize];
359362
}
360363

361364
@Override
@@ -366,7 +369,7 @@ public int resetPostingsScorer(long offset) throws IOException {
366369
centroidDp = Float.intBitsToFloat(indexInput.readInt());
367370
vectors = indexInput.readVInt();
368371
// read the doc ids
369-
docIdsScratch = vectors > docIdsScratch.length ? new int[vectors] : docIdsScratch;
372+
assert vectors <= docIdsScratch.length;
370373
docIdsWriter.readInts(indexInput, vectors, docIdsScratch);
371374
slicePos = indexInput.getFilePointer();
372375
return vectors;

server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsWriter.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ LongValues buildAndWritePostingsLists(
6464
CentroidSupplier centroidSupplier,
6565
FloatVectorValues floatVectorValues,
6666
IndexOutput postingsOutput,
67+
long fileOffset,
6768
int[] assignments,
6869
int[] overspillAssignments
6970
) throws IOException {
@@ -76,9 +77,12 @@ LongValues buildAndWritePostingsLists(
7677
}
7778
}
7879

80+
int maxPostingListSize = 0;
7981
int[][] assignmentsByCluster = new int[centroidSupplier.size()][];
8082
for (int c = 0; c < centroidSupplier.size(); c++) {
81-
assignmentsByCluster[c] = new int[centroidVectorCount[c]];
83+
int size = centroidVectorCount[c];
84+
maxPostingListSize = Math.max(maxPostingListSize, size);
85+
assignmentsByCluster[c] = new int[size];
8286
}
8387
Arrays.fill(centroidVectorCount, 0);
8488

@@ -93,6 +97,8 @@ LongValues buildAndWritePostingsLists(
9397
}
9498
}
9599
}
100+
// write the max posting list size
101+
postingsOutput.writeVInt(maxPostingListSize);
96102
// write the posting lists
97103
final PackedLongValues.Builder offsets = PackedLongValues.monotonicBuilder(PackedInts.COMPACT);
98104
DocIdsWriter docIdsWriter = new DocIdsWriter();
@@ -106,7 +112,7 @@ LongValues buildAndWritePostingsLists(
106112
for (int c = 0; c < centroidSupplier.size(); c++) {
107113
float[] centroid = centroidSupplier.centroid(c);
108114
int[] cluster = assignmentsByCluster[c];
109-
offsets.add(postingsOutput.alignFilePointer(Float.BYTES));
115+
offsets.add(postingsOutput.alignFilePointer(Float.BYTES) - fileOffset);
110116
buffer.asFloatBuffer().put(centroid);
111117
// write raw centroid for quantizing the query vectors
112118
postingsOutput.writeBytes(buffer.array(), buffer.array().length);
@@ -137,6 +143,7 @@ LongValues buildAndWritePostingsLists(
137143
CentroidSupplier centroidSupplier,
138144
FloatVectorValues floatVectorValues,
139145
IndexOutput postingsOutput,
146+
long fileOffset,
140147
MergeState mergeState,
141148
int[] assignments,
142149
int[] overspillAssignments
@@ -196,11 +203,14 @@ LongValues buildAndWritePostingsLists(
196203
}
197204
}
198205

206+
int maxPostingListSize = 0;
199207
int[][] assignmentsByCluster = new int[centroidSupplier.size()][];
200208
boolean[][] isOverspillByCluster = new boolean[centroidSupplier.size()][];
201209
for (int c = 0; c < centroidSupplier.size(); c++) {
202-
assignmentsByCluster[c] = new int[centroidVectorCount[c]];
203-
isOverspillByCluster[c] = new boolean[centroidVectorCount[c]];
210+
int size = centroidVectorCount[c];
211+
maxPostingListSize = Math.max(maxPostingListSize, size);
212+
assignmentsByCluster[c] = new int[size];
213+
isOverspillByCluster[c] = new boolean[size];
204214
}
205215
Arrays.fill(centroidVectorCount, 0);
206216

@@ -226,11 +236,14 @@ LongValues buildAndWritePostingsLists(
226236
DocIdsWriter docIdsWriter = new DocIdsWriter();
227237
DiskBBQBulkWriter bulkWriter = new DiskBBQBulkWriter.OneBitDiskBBQBulkWriter(ES91OSQVectorsScorer.BULK_SIZE, postingsOutput);
228238
final ByteBuffer buffer = ByteBuffer.allocate(fieldInfo.getVectorDimension() * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN);
239+
// write the max posting list size
240+
postingsOutput.writeVInt(maxPostingListSize);
241+
// write the posting lists
229242
for (int c = 0; c < centroidSupplier.size(); c++) {
230243
float[] centroid = centroidSupplier.centroid(c);
231244
int[] cluster = assignmentsByCluster[c];
232245
boolean[] isOverspill = isOverspillByCluster[c];
233-
offsets.add(postingsOutput.alignFilePointer(Float.BYTES));
246+
offsets.add(postingsOutput.alignFilePointer(Float.BYTES) - fileOffset);
234247
// write raw centroid for quantizing the query vectors
235248
buffer.asFloatBuffer().put(centroid);
236249
postingsOutput.writeBytes(buffer.array(), buffer.array().length);

server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsReader.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,12 @@ private FieldEntry readField(IndexInput input, FieldInfo info) throws IOExceptio
153153
final long centroidOffset = input.readLong();
154154
final long centroidLength = input.readLong();
155155
final float[] globalCentroid = new float[info.getVectorDimension()];
156+
long postingListOffset = -1;
157+
long postingListLength = -1;
156158
float globalCentroidDp = 0;
157159
if (centroidLength > 0) {
160+
postingListOffset = input.readLong();
161+
postingListLength = input.readLong();
158162
input.readFloats(globalCentroid, 0, globalCentroid.length);
159163
globalCentroidDp = Float.intBitsToFloat(input.readInt());
160164
}
@@ -164,6 +168,8 @@ private FieldEntry readField(IndexInput input, FieldInfo info) throws IOExceptio
164168
numCentroids,
165169
centroidOffset,
166170
centroidLength,
171+
postingListOffset,
172+
postingListLength,
167173
globalCentroid,
168174
globalCentroidDp
169175
);
@@ -245,7 +251,7 @@ public final void search(String field, float[] target, KnnCollector knnCollector
245251
nProbe = Math.max(Math.min(nProbe, entry.numCentroids), 1);
246252
}
247253
CentroidIterator centroidIterator = getCentroidIterator(fieldInfo, entry.numCentroids, entry.centroidSlice(ivfCentroids), target);
248-
PostingVisitor scorer = getPostingVisitor(fieldInfo, ivfClusters, target, needsScoring);
254+
PostingVisitor scorer = getPostingVisitor(fieldInfo, entry.postingListSlice(ivfClusters), target, needsScoring);
249255
int centroidsVisited = 0;
250256
long expectedDocs = 0;
251257
long actualDocs = 0;
@@ -298,12 +304,18 @@ protected record FieldEntry(
298304
int numCentroids,
299305
long centroidOffset,
300306
long centroidLength,
307+
long postingListOffset,
308+
long postingListLength,
301309
float[] globalCentroid,
302310
float globalCentroidDp
303311
) {
304312
IndexInput centroidSlice(IndexInput centroidFile) throws IOException {
305313
return centroidFile.slice("centroids", centroidOffset, centroidLength);
306314
}
315+
316+
IndexInput postingListSlice(IndexInput postingListFile) throws IOException {
317+
return postingListFile.slice("postingLists", postingListOffset, postingListLength);
318+
}
307319
}
308320

309321
abstract PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput postingsLists, float[] target, IntPredicate needsScoring)

server/src/main/java/org/elasticsearch/index/codec/vectors/IVFVectorsWriter.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ abstract LongValues buildAndWritePostingsLists(
136136
CentroidSupplier centroidSupplier,
137137
FloatVectorValues floatVectorValues,
138138
IndexOutput postingsOutput,
139+
long fileOffset,
139140
int[] assignments,
140141
int[] overspillAssignments
141142
) throws IOException;
@@ -145,6 +146,7 @@ abstract LongValues buildAndWritePostingsLists(
145146
CentroidSupplier centroidSupplier,
146147
FloatVectorValues floatVectorValues,
147148
IndexOutput postingsOutput,
149+
long fileOffset,
148150
MergeState mergeState,
149151
int[] assignments,
150152
int[] overspillAssignments
@@ -169,20 +171,31 @@ public final void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
169171
// wrap centroids with a supplier
170172
final CentroidSupplier centroidSupplier = new OnHeapCentroidSupplier(centroidAssignments.centroids());
171173
// write posting lists
174+
final long postingListOffset = ivfClusters.alignFilePointer(Float.BYTES);
172175
final LongValues offsets = buildAndWritePostingsLists(
173176
fieldWriter.fieldInfo,
174177
centroidSupplier,
175178
floatVectorValues,
176179
ivfClusters,
180+
postingListOffset,
177181
centroidAssignments.assignments(),
178182
centroidAssignments.overspillAssignments()
179183
);
184+
final long postingListLength = ivfClusters.getFilePointer() - postingListOffset;
180185
// write centroids
181186
final long centroidOffset = ivfCentroids.alignFilePointer(Float.BYTES);
182187
writeCentroids(fieldWriter.fieldInfo, centroidSupplier, globalCentroid, offsets, ivfCentroids);
183188
final long centroidLength = ivfCentroids.getFilePointer() - centroidOffset;
184189
// write meta file
185-
writeMeta(fieldWriter.fieldInfo, centroidSupplier.size(), centroidOffset, centroidLength, globalCentroid);
190+
writeMeta(
191+
fieldWriter.fieldInfo,
192+
centroidSupplier.size(),
193+
centroidOffset,
194+
centroidLength,
195+
postingListOffset,
196+
postingListLength,
197+
globalCentroid
198+
);
186199
}
187200
}
188201

@@ -288,6 +301,8 @@ private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws
288301

289302
final long centroidOffset;
290303
final long centroidLength;
304+
final long postingListOffset;
305+
final long postingListLength;
291306
final int numCentroids;
292307
final int[] assignments;
293308
final int[] overspillAssignments;
@@ -322,7 +337,7 @@ private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws
322337
try {
323338
if (numCentroids == 0) {
324339
centroidOffset = ivfCentroids.getFilePointer();
325-
writeMeta(fieldInfo, 0, centroidOffset, 0, null);
340+
writeMeta(fieldInfo, 0, centroidOffset, 0, 0, 0, null);
326341
CodecUtil.writeFooter(centroidTemp);
327342
IOUtils.close(centroidTemp);
328343
return;
@@ -338,21 +353,32 @@ private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws
338353
calculatedGlobalCentroid
339354
);
340355
// write posting lists
356+
postingListOffset = ivfClusters.alignFilePointer(Float.BYTES);
341357
final LongValues offsets = buildAndWritePostingsLists(
342358
fieldInfo,
343359
centroidSupplier,
344360
floatVectorValues,
345361
ivfClusters,
362+
postingListOffset,
346363
mergeState,
347364
assignments,
348365
overspillAssignments
349366
);
367+
postingListLength = ivfClusters.getFilePointer() - postingListOffset;
350368
// write centroids
351369
centroidOffset = ivfCentroids.alignFilePointer(Float.BYTES);
352370
writeCentroids(fieldInfo, centroidSupplier, calculatedGlobalCentroid, offsets, ivfCentroids);
353371
centroidLength = ivfCentroids.getFilePointer() - centroidOffset;
354372
// write meta
355-
writeMeta(fieldInfo, centroidSupplier.size(), centroidOffset, centroidLength, calculatedGlobalCentroid);
373+
writeMeta(
374+
fieldInfo,
375+
centroidSupplier.size(),
376+
centroidOffset,
377+
centroidLength,
378+
postingListOffset,
379+
postingListLength,
380+
calculatedGlobalCentroid
381+
);
356382
}
357383
} finally {
358384
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, centroidTempName);
@@ -435,15 +461,24 @@ private static int writeFloatVectorValues(
435461
return numVectors;
436462
}
437463

438-
private void writeMeta(FieldInfo field, int numCentroids, long centroidOffset, long centroidLength, float[] globalCentroid)
439-
throws IOException {
464+
private void writeMeta(
465+
FieldInfo field,
466+
int numCentroids,
467+
long centroidOffset,
468+
long centroidLength,
469+
long postingListOffset,
470+
long postingListLength,
471+
float[] globalCentroid
472+
) throws IOException {
440473
ivfMeta.writeInt(field.number);
441474
ivfMeta.writeInt(field.getVectorEncoding().ordinal());
442475
ivfMeta.writeInt(distFuncToOrd(field.getVectorSimilarityFunction()));
443476
ivfMeta.writeInt(numCentroids);
444477
ivfMeta.writeLong(centroidOffset);
445478
ivfMeta.writeLong(centroidLength);
446479
if (centroidLength > 0) {
480+
ivfMeta.writeLong(postingListOffset);
481+
ivfMeta.writeLong(postingListLength);
447482
final ByteBuffer buffer = ByteBuffer.allocate(globalCentroid.length * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN);
448483
buffer.asFloatBuffer().put(globalCentroid);
449484
ivfMeta.writeBytes(buffer.array(), buffer.array().length);

x-pack/plugin/esql/src/main/antlr/parser/Expression.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ functionExpression
5454

5555
functionName
5656
: identifierOrParameter
57+
| FIRST
58+
| LAST
5759
;
5860

5961
mapExpression

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,17 @@
3434
import java.util.function.Function;
3535
import java.util.function.Predicate;
3636

37+
/**
38+
* Perform filters as early as possible in the logical plan by pushing them past certain plan nodes (like {@link Eval},
39+
* {@link RegexExtract}, {@link Enrich}, {@link Project}, {@link OrderBy} and left {@link Join}s) where possible.
40+
* Ideally, the filter ends up all the way down at the data source and can be turned into a Lucene query.
41+
* When pushing down past nodes, only conditions that do not depend on fields created by those nodes are pushed down; if the condition
42+
* consists of {@code AND}s, we split out the parts that do not depend on the previous node.
43+
* For joins, it splits the filter condition into parts that can be applied to the left or right side of the join and only pushes down
44+
* the left hand side filters to the left child.
45+
*
46+
* Also combines adjacent filters using a logical {@code AND}.
47+
*/
3748
public final class PushDownAndCombineFilters extends OptimizerRules.OptimizerRule<Filter> {
3849
@Override
3950
protected LogicalPlan rule(Filter filter) {
@@ -73,7 +84,7 @@ protected LogicalPlan rule(Filter filter) {
7384
var attributes = AttributeSet.of(Expressions.asAttributes(re.extractedFields()));
7485
plan = maybePushDownPastUnary(filter, re, attributes::contains, NO_OP);
7586
} else if (child instanceof InferencePlan<?> inferencePlan) {
76-
// Push down filters that do not rely on attributes created by Cpmpletion
87+
// Push down filters that do not rely on attributes created by Completion
7788
var attributes = AttributeSet.of(inferencePlan.generatedAttributes());
7889
plan = maybePushDownPastUnary(filter, inferencePlan, attributes::contains, NO_OP);
7990
} else if (child instanceof Enrich enrich) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlBaseParser.interp

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)