Skip to content

Commit d62100e

Browse files
GalLaloucheKubik42
authored andcommitted
ESQL: Late materialization after TopN (Node level) (elastic#132757)
This PR adds a late(r) materialization for TopN queries, such that the materialization happes in the "node_reduce" phase instead of during the "data" phase. For example, if the limit is 20, and each data node spawns 10 workers, we would only read 20 additional columns (i.e., ones not needed for the TopN) filters, instead of 200. To support this, the reducer node maintains a global list of all shard contexts used by its individual data workers (although some of those might be closed if they are no longer needed, thanks to elastic#129454). There is some additional book-keeping involved, since previously, every data node held a local list of shard contexts, and used its local indices to access it. To avoid changing too much (this local-index logic is spread throughout much of the code!), a new global index is introduced, which replaces the local index after all the rows are merged together in the reduce phase's TopN.
1 parent 9508f70 commit d62100e

File tree

111 files changed

+1966
-618
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+1966
-618
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/_nightly/esql/ValuesSourceReaderBenchmark.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@
4141
import org.elasticsearch.compute.data.LongBlock;
4242
import org.elasticsearch.compute.data.LongVector;
4343
import org.elasticsearch.compute.data.Page;
44+
import org.elasticsearch.compute.lucene.AlwaysReferencedIndexedByShardId;
45+
import org.elasticsearch.compute.lucene.IndexedByShardIdFromSingleton;
4446
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
45-
import org.elasticsearch.compute.lucene.ShardRefCounted;
4647
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
4748
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
4849
import org.elasticsearch.compute.operator.topn.TopNOperator;
@@ -368,7 +369,7 @@ public void benchmark() {
368369
blockFactory,
369370
ByteSizeValue.ofMb(1).getBytes(),
370371
fields(name),
371-
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
372+
new IndexedByShardIdFromSingleton<>(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
372373
throw new UnsupportedOperationException("can't load _source here");
373374
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
374375
0
@@ -538,7 +539,7 @@ private void setupPages() {
538539
pages.add(
539540
new Page(
540541
new DocVector(
541-
ShardRefCounted.ALWAYS_REFERENCED,
542+
AlwaysReferencedIndexedByShardId.INSTANCE,
542543
blockFactory.newConstantIntBlockWith(0, end - begin).asVector(),
543544
blockFactory.newConstantIntBlockWith(ctx.ord, end - begin).asVector(),
544545
docs.build(),
@@ -575,8 +576,7 @@ record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {}
575576
pages.add(
576577
new Page(
577578
new DocVector(
578-
579-
ShardRefCounted.ALWAYS_REFERENCED,
579+
AlwaysReferencedIndexedByShardId.INSTANCE,
580580
blockFactory.newConstantIntVector(0, size),
581581
leafs.build(),
582582
docs.build(),
@@ -594,7 +594,7 @@ record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {}
594594
pages.add(
595595
new Page(
596596
new DocVector(
597-
ShardRefCounted.ALWAYS_REFERENCED,
597+
AlwaysReferencedIndexedByShardId.INSTANCE,
598598
blockFactory.newConstantIntBlockWith(0, size).asVector(),
599599
leafs.build().asBlock().asVector(),
600600
docs.build(),
@@ -621,8 +621,7 @@ record ItrAndOrd(PrimitiveIterator.OfInt itr, int ord) {}
621621
pages.add(
622622
new Page(
623623
new DocVector(
624-
625-
ShardRefCounted.ALWAYS_REFERENCED,
624+
AlwaysReferencedIndexedByShardId.INSTANCE,
626625
blockFactory.newConstantIntVector(0, 1),
627626
blockFactory.newConstantIntVector(next.ord, 1),
628627
blockFactory.newConstantIntVector(next.itr.nextInt(), 1),

docs/changelog/132757.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132757
2+
summary: Late materialization after TopN (Node level)
3+
area: ES|QL
4+
type: feature
5+
issues: []
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9194000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
available_processors_in_os_stats,9193000
1+
esql_reduce_late_materialization,9194000

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequestBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,5 @@ public final ActionType<Response> action() {
4141

4242
public abstract EsqlQueryRequestBuilder<Request, Response> allowPartialResults(boolean allowPartialResults);
4343

44+
public abstract EsqlQueryRequestBuilder<Request, Response> profile(boolean profile);
4445
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/CollectionUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,26 @@ public static <T> List<T> combine(Collection<? extends T> left, T... entries) {
6868
}
6969
return list;
7070
}
71+
72+
/**
73+
* Creates a copy of the given collection with the given element prepended.
74+
*
75+
* @param collection collection to copy
76+
* @param element element to prepend
77+
*/
78+
@SuppressWarnings("unchecked")
79+
public static <T> List<T> prependToCopy(T element, Collection<T> collection) {
80+
T[] result = (T[]) new Object[collection.size() + 1];
81+
result[0] = element;
82+
if (collection instanceof ArrayList<T> arrayList && arrayList.size() <= 1_000_000) {
83+
// Creating an array out of a relatively small ArrayList and copying it is faster than iterating.
84+
System.arraycopy(arrayList.toArray(), 0, result, 1, result.length - 1);
85+
} else {
86+
var i = 1;
87+
for (T t : collection) {
88+
result[i++] = t;
89+
}
90+
}
91+
return List.of(result);
92+
}
7193
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
import org.elasticsearch.common.io.stream.StreamOutput;
1111
import org.elasticsearch.common.unit.ByteSizeValue;
12-
import org.elasticsearch.compute.lucene.ShardRefCounted;
12+
import org.elasticsearch.compute.lucene.AlwaysReferencedIndexedByShardId;
13+
import org.elasticsearch.compute.lucene.IndexedByShardId;
1314
import org.elasticsearch.core.RefCounted;
1415
import org.elasticsearch.core.ReleasableIterator;
1516
import org.elasticsearch.core.Releasables;
@@ -92,6 +93,14 @@ public void closeInternal() {
9293
Releasables.closeExpectNoException(vector);
9394
}
9495

96+
@Override
97+
public String toString() {
98+
final StringBuffer sb = new StringBuffer("DocBlock[");
99+
sb.append("vector=").append(vector);
100+
sb.append(']');
101+
return sb.toString();
102+
}
103+
95104
/**
96105
* A builder the for {@link DocBlock}.
97106
*/
@@ -103,9 +112,9 @@ public static class Builder implements Block.Builder {
103112
private final IntVector.Builder shards;
104113
private final IntVector.Builder segments;
105114
private final IntVector.Builder docs;
106-
private ShardRefCounted shardRefCounters = ShardRefCounted.ALWAYS_REFERENCED;
115+
private IndexedByShardId<? extends RefCounted> shardRefCounters = null;
107116

108-
public Builder setShardRefCounted(ShardRefCounted shardRefCounters) {
117+
public Builder shardRefCounters(IndexedByShardId<? extends RefCounted> shardRefCounters) {
109118
this.shardRefCounters = shardRefCounters;
110119
return this;
111120
}
@@ -196,7 +205,13 @@ public DocBlock build() {
196205
shards = this.shards.build();
197206
segments = this.segments.build();
198207
docs = this.docs.build();
199-
result = new DocVector(shardRefCounters, shards, segments, docs, null);
208+
result = new DocVector(
209+
shardRefCounters == null ? AlwaysReferencedIndexedByShardId.INSTANCE : shardRefCounters,
210+
shards,
211+
segments,
212+
docs,
213+
null
214+
);
200215
return result.asBlock();
201216
} finally {
202217
if (result == null) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.apache.lucene.util.IntroSorter;
1111
import org.apache.lucene.util.RamUsageEstimator;
1212
import org.elasticsearch.common.unit.ByteSizeValue;
13-
import org.elasticsearch.compute.lucene.ShardRefCounted;
13+
import org.elasticsearch.compute.lucene.IndexedByShardId;
1414
import org.elasticsearch.core.RefCounted;
1515
import org.elasticsearch.core.ReleasableIterator;
1616
import org.elasticsearch.core.Releasables;
@@ -31,6 +31,11 @@ public final class DocVector extends AbstractVector implements Vector {
3131
*/
3232
public static final int SHARD_SEGMENT_DOC_MAP_PER_ROW_OVERHEAD = Integer.BYTES * 2;
3333

34+
/**
35+
* The shard IDs for each position. Note that these shard IDs are shared between all doc vectors running in the same node, but a given
36+
* doc vector might only reference a subset of the shard IDs (Which is the subset is also the one exposed by {@link #refCounteds}).
37+
* These shard IDs are sliced up by DataNodeComputeHandler, and depend on the MAX_CONCURRENT_SHARDS_PER_NODE setting.
38+
*/
3439
private final IntVector shards;
3540
private final IntVector segments;
3641
private final IntVector docs;
@@ -51,21 +56,41 @@ public final class DocVector extends AbstractVector implements Vector {
5156
*/
5257
private int[] shardSegmentDocMapBackwards;
5358

54-
private final ShardRefCounted shardRefCounters;
59+
private final IndexedByShardId<? extends RefCounted> refCounteds;
5560

56-
public ShardRefCounted shardRefCounted() {
57-
return shardRefCounters;
61+
public RefCounted shardRefCounted(int position) {
62+
return refCounteds.get(shards.getInt(position));
5863
}
5964

6065
public DocVector(
61-
ShardRefCounted shardRefCounters,
66+
IndexedByShardId<? extends RefCounted> refCounteds,
6267
IntVector shards,
6368
IntVector segments,
6469
IntVector docs,
6570
Boolean singleSegmentNonDecreasing
71+
) {
72+
this(refCounteds, shards, segments, docs, singleSegmentNonDecreasing, true);
73+
}
74+
75+
public static DocVector withoutIncrementingShardRefCounts(
76+
IndexedByShardId<? extends RefCounted> refCounteds,
77+
IntVector shards,
78+
IntVector segments,
79+
IntVector docs
80+
) {
81+
return new DocVector(refCounteds, shards, segments, docs, null, false);
82+
}
83+
84+
private DocVector(
85+
IndexedByShardId<? extends RefCounted> refCounteds,
86+
IntVector shards,
87+
IntVector segments,
88+
IntVector docs,
89+
Boolean singleSegmentNonDecreasing,
90+
boolean incrementShardRefCounts
6691
) {
6792
super(shards.getPositionCount(), shards.blockFactory());
68-
this.shardRefCounters = shardRefCounters;
93+
this.refCounteds = refCounteds;
6994
this.shards = shards;
7095
this.segments = segments;
7196
this.docs = docs;
@@ -82,18 +107,20 @@ public DocVector(
82107
}
83108
blockFactory().adjustBreaker(BASE_RAM_BYTES_USED);
84109

85-
forEachShardRefCounter(RefCounted::mustIncRef);
110+
if (incrementShardRefCounts) {
111+
forEachShardRefCounter(RefCounted::mustIncRef);
112+
}
86113
}
87114

88115
public DocVector(
89-
ShardRefCounted shardRefCounters,
116+
IndexedByShardId<? extends RefCounted> refCounteds,
90117
IntVector shards,
91118
IntVector segments,
92119
IntVector docs,
93120
int[] docMapForwards,
94121
int[] docMapBackwards
95122
) {
96-
this(shardRefCounters, shards, segments, docs, null);
123+
this(refCounteds, shards, segments, docs, null);
97124
this.shardSegmentDocMapForwards = docMapForwards;
98125
this.shardSegmentDocMapBackwards = docMapBackwards;
99126
}
@@ -269,7 +296,7 @@ public DocVector filter(int... positions) {
269296
filteredShards = shards.filter(positions);
270297
filteredSegments = segments.filter(positions);
271298
filteredDocs = docs.filter(positions);
272-
result = new DocVector(shardRefCounters, filteredShards, filteredSegments, filteredDocs, null);
299+
result = new DocVector(refCounteds, filteredShards, filteredSegments, filteredDocs, null);
273300
return result;
274301
} finally {
275302
if (result == null) {
@@ -288,7 +315,7 @@ public DocVector deepCopy(BlockFactory blockFactory) {
288315
filteredShards = shards.deepCopy(blockFactory);
289316
filteredSegments = segments.deepCopy(blockFactory);
290317
filteredDocs = docs.deepCopy(blockFactory);
291-
result = new DocVector(shardRefCounters, filteredShards, filteredSegments, filteredDocs, null);
318+
result = new DocVector(refCounteds, filteredShards, filteredSegments, filteredDocs, null);
292319
return result;
293320
} finally {
294321
if (result == null) {
@@ -331,6 +358,16 @@ public boolean equals(Object obj) {
331358
return shards.equals(other.shards) && segments.equals(other.segments) && docs.equals(other.docs);
332359
}
333360

361+
@Override
362+
public String toString() {
363+
final StringBuffer sb = new StringBuffer("DocVector[");
364+
sb.append("shards=").append(shards);
365+
sb.append(", segments=").append(segments);
366+
sb.append(", docs=").append(docs);
367+
sb.append(']');
368+
return sb.toString();
369+
}
370+
334371
private static long ramBytesOrZero(int[] array) {
335372
return array == null ? 0 : RamUsageEstimator.shallowSizeOf(array);
336373
}
@@ -372,13 +409,13 @@ public void closeInternal() {
372409

373410
private void forEachShardRefCounter(Consumer<RefCounted> consumer) {
374411
switch (shards) {
375-
case ConstantIntVector constantIntVector -> consumer.accept(shardRefCounters.get(constantIntVector.getInt(0)));
412+
case ConstantIntVector constantIntVector -> consumer.accept(refCounteds.get(constantIntVector.getInt(0)));
376413
case ConstantNullVector ignored -> {
377414
// Noop
378415
}
379416
default -> {
380417
for (int i = 0; i < shards.getPositionCount(); i++) {
381-
consumer.accept(shardRefCounters.get(shards.getInt(i)));
418+
consumer.accept(refCounteds.get(shards.getInt(i)));
382419
}
383420
}
384421
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene;
9+
10+
import org.elasticsearch.core.RefCounted;
11+
12+
import java.util.Collection;
13+
import java.util.List;
14+
import java.util.function.Function;
15+
16+
/**
17+
* An implementation which always returns {@link RefCounted#ALWAYS_REFERENCED} for any shard ID. Used by tests, but defined here so it could
18+
* also be used by the benchmarks.
19+
*/
20+
public class AlwaysReferencedIndexedByShardId implements IndexedByShardId<RefCounted> {
21+
public static final AlwaysReferencedIndexedByShardId INSTANCE = new AlwaysReferencedIndexedByShardId();
22+
23+
private AlwaysReferencedIndexedByShardId() {}
24+
25+
@Override
26+
public RefCounted get(int shardId) {
27+
return RefCounted.ALWAYS_REFERENCED;
28+
}
29+
30+
@Override
31+
public Collection<? extends RefCounted> collection() {
32+
return List.of(RefCounted.ALWAYS_REFERENCED);
33+
}
34+
35+
@Override
36+
public <S> IndexedByShardId<S> map(Function<RefCounted, S> mapper) {
37+
throw new UnsupportedOperationException();
38+
}
39+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene;
9+
10+
import java.util.Collection;
11+
import java.util.function.Function;
12+
13+
public class EmptyIndexedByShardId {
14+
@SuppressWarnings("unchecked")
15+
public static <T> IndexedByShardId<T> instance() {
16+
return (IndexedByShardId<T>) EMPTY;
17+
}
18+
19+
private static IndexedByShardId<?> EMPTY = new IndexedByShardId<>() {
20+
@Override
21+
public Object get(int shardId) {
22+
throw new IndexOutOfBoundsException("no shards");
23+
}
24+
25+
@Override
26+
public Collection<?> collection() {
27+
return java.util.List.of();
28+
}
29+
30+
@SuppressWarnings("unchecked")
31+
@Override
32+
public <S> IndexedByShardId<S> map(Function<Object, S> mapper) {
33+
return (IndexedByShardId<S>) this;
34+
}
35+
};
36+
}

0 commit comments

Comments
 (0)