Skip to content

Commit a1b1842

Browse files
Merge branch 'main' into svilen/126603
2 parents ebbc0b6 + 5964ad7 commit a1b1842

File tree

25 files changed

+470
-288
lines changed

25 files changed

+470
-288
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.util.NumericUtils;
2626
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
2727
import org.elasticsearch.common.lucene.Lucene;
28+
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.util.BigArrays;
2930
import org.elasticsearch.compute.data.BlockFactory;
3031
import org.elasticsearch.compute.data.BytesRefBlock;
@@ -50,6 +51,7 @@
5051
import org.elasticsearch.index.mapper.MappedFieldType;
5152
import org.elasticsearch.index.mapper.NumberFieldMapper;
5253
import org.elasticsearch.search.lookup.SearchLookup;
54+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5355
import org.openjdk.jmh.annotations.Benchmark;
5456
import org.openjdk.jmh.annotations.BenchmarkMode;
5557
import org.openjdk.jmh.annotations.Fork;
@@ -335,7 +337,7 @@ public void benchmark() {
335337
fields(name),
336338
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
337339
throw new UnsupportedOperationException("can't load _source here");
338-
})),
340+
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
339341
0
340342
);
341343
long sum = 0;

docs/changelog/127348.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127348
2+
summary: Speed loading stored fields
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/changelog/127527.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127527
2+
summary: "No, line noise isn't a valid ip"
3+
area: ES|QL
4+
type: bug
5+
issues: []

docs/reference/elasticsearch/index-settings/index-modules.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ $$$index-codec$$$ `index.codec`
4949

5050
$$$index-mode-setting$$$ `index.mode`
5151
: The `index.mode` setting is used to control settings applied in specific domains like ingestion of time series data or logs. Different mutually exclusive modes exist, which are used to apply settings or default values controlling indexing of documents, sorting and other parameters whose value affects indexing or query performance.
52-
52+
5353
**Example**
5454

5555
```console
@@ -248,3 +248,8 @@ $$$index-final-pipeline$$$
248248

249249
$$$index-hidden$$$ `index.hidden`
250250
: Indicates whether the index should be hidden by default. Hidden indices are not returned by default when using a wildcard expression. This behavior is controlled per request through the use of the `expand_wildcards` parameter. Possible values are `true` and `false` (default).
251+
252+
$$$index-esql-stored-fields-sequential-proportion$$$
253+
254+
`index.esql.stored_fields_sequential_proportion`
255+
: Tuning parameter for deciding when {{esql}} will load [Stored fields](/reference/elasticsearch/rest-apis/retrieve-selected-fields.md#stored-fields) using a strategy tuned for loading dense sequence of documents. Allows values between 0.0 and 1.0 and defaults to 0.2. Indices with documents smaller than 10kb may see speed improvements loading `text` fields by setting this lower.

muted-tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,15 @@ tests:
444444
- class: org.elasticsearch.action.admin.cluster.state.TransportClusterStateActionDisruptionIT
445445
method: testLocalRequestWaitsForMetadata
446446
issue: https://github.com/elastic/elasticsearch/issues/127466
447+
- class: org.elasticsearch.xpack.esql.type.EsqlDataTypeConverterTests
448+
method: testSuggestedCast
449+
issue: https://github.com/elastic/elasticsearch/issues/127535
450+
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
451+
method: test
452+
issue: https://github.com/elastic/elasticsearch/issues/127536
453+
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
454+
method: test {union_types.MultiIndexSortIpStringEval ASYNC}
455+
issue: https://github.com/elastic/elasticsearch/issues/127537
447456

448457
# Examples:
449458
#

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

Lines changed: 30 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.elasticsearch.core.ReleasableIterator;
3131
import org.elasticsearch.core.Releasables;
3232

33+
import java.util.Objects;
34+
3335
/**
3436
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
3537
* Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -39,7 +41,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
3941
private final int tsHashChannel;
4042
private final int timestampIntervalChannel;
4143

42-
private int lastTsidPosition = 0;
44+
private final BytesRef lastTsid = new BytesRef();
4345
private final BytesRefArrayWithSize tsidArray;
4446

4547
private long lastTimestamp;
@@ -62,77 +64,44 @@ public void close() {
6264
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
6365
}
6466

65-
private OrdinalBytesRefVector getTsidVector(Page page) {
66-
BytesRefBlock block = page.getBlock(tsHashChannel);
67-
var ordinalBlock = block.asOrdinals();
68-
if (ordinalBlock == null) {
69-
throw new IllegalStateException("expected ordinal block for tsid");
70-
}
71-
var ordinalVector = ordinalBlock.asVector();
72-
if (ordinalVector == null) {
73-
throw new IllegalStateException("expected ordinal vector for tsid");
74-
}
75-
return ordinalVector;
76-
}
77-
78-
private LongVector getTimestampVector(Page page) {
79-
final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
80-
LongVector timestampsVector = timestampsBlock.asVector();
81-
if (timestampsVector == null) {
82-
throw new IllegalStateException("expected long vector for timestamp");
83-
}
84-
return timestampsVector;
85-
}
86-
8767
@Override
8868
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
89-
final BytesRefVector tsidDict;
90-
final IntVector tsidOrdinals;
91-
{
92-
final var tsidVector = getTsidVector(page);
93-
tsidDict = tsidVector.getDictionaryVector();
94-
tsidOrdinals = tsidVector.getOrdinalsVector();
95-
}
96-
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
69+
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
70+
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
71+
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
72+
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
73+
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
9774
final BytesRef spare = new BytesRef();
98-
final BytesRef lastTsid = new BytesRef();
99-
final LongVector timestampVector = getTimestampVector(page);
100-
int lastOrd = -1;
101-
for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
102-
final int newOrd = tsidOrdinals.getInt(i);
103-
boolean newGroup = false;
104-
if (lastOrd != newOrd) {
105-
final var newTsid = tsidDict.getBytesRef(newOrd, spare);
106-
if (positionCount() == 0) {
107-
newGroup = true;
108-
} else if (lastOrd == -1) {
109-
tsidArray.get(lastTsidPosition, lastTsid);
110-
newGroup = lastTsid.equals(newTsid) == false;
111-
} else {
112-
newGroup = true;
113-
}
114-
if (newGroup) {
115-
endTsidGroup();
116-
lastTsidPosition = tsidArray.count;
117-
tsidArray.append(newTsid);
118-
}
119-
lastOrd = newOrd;
120-
}
75+
// TODO: optimize incoming ordinal block
76+
for (int i = 0; i < tsidVector.getPositionCount(); i++) {
77+
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
12178
final long timestamp = timestampVector.getLong(i);
122-
if (newGroup || timestamp != lastTimestamp) {
123-
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
124-
timestampArray.append(timestamp);
125-
lastTimestamp = timestamp;
126-
currentTimestampCount++;
127-
}
128-
ordsBuilder.appendInt(timestampArray.count - 1);
79+
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
12980
}
13081
try (var ords = ordsBuilder.build()) {
13182
addInput.add(0, ords);
13283
}
13384
}
13485
}
13586

87+
private int addOnePosition(BytesRef tsid, long timestamp) {
88+
boolean newGroup = false;
89+
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90+
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
91+
endTsidGroup();
92+
tsidArray.append(tsid);
93+
tsidArray.get(tsidArray.count - 1, lastTsid);
94+
newGroup = true;
95+
}
96+
if (newGroup || timestamp != lastTimestamp) {
97+
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
98+
timestampArray.append(timestamp);
99+
lastTimestamp = timestamp;
100+
currentTimestampCount++;
101+
}
102+
return positionCount() - 1;
103+
}
104+
136105
private void endTsidGroup() {
137106
if (currentTimestampCount > 0) {
138107
perTsidCountArray.append(currentTimestampCount);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
7575
}
7676

7777
@Override
78-
public OrdinalBytesRefVector asVector() {
78+
public BytesRefVector asVector() {
7979
IntVector vector = ordinals.asVector();
8080
if (vector != null) {
8181
return new OrdinalBytesRefVector(vector, bytes);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public String describe() {
107107
*/
108108
public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
109109

110-
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader) {}
110+
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
111111

112112
private final FieldWork[] fields;
113113
private final List<ShardContext> shardContexts;
@@ -247,8 +247,9 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
247247
}
248248

249249
SourceLoader sourceLoader = null;
250+
ShardContext shardContext = shardContexts.get(shard);
250251
if (storedFieldsSpec.requiresSource()) {
251-
sourceLoader = shardContexts.get(shard).newSourceLoader.get();
252+
sourceLoader = shardContext.newSourceLoader.get();
252253
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
253254
}
254255

@@ -261,7 +262,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
261262
);
262263
}
263264
StoredFieldLoader storedFieldLoader;
264-
if (useSequentialStoredFieldsReader(docs)) {
265+
if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
265266
storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
266267
trackStoredFields(storedFieldsSpec, true);
267268
} else {
@@ -438,9 +439,13 @@ public void close() {
438439
* Is it more efficient to use a sequential stored field reader
439440
* when reading stored fields for the documents contained in {@code docIds}?
440441
*/
441-
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) {
442+
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
442443
int count = docs.count();
443-
return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1;
444+
if (count < SEQUENTIAL_BOUNDARY) {
445+
return false;
446+
}
447+
int range = docs.get(count - 1) - docs.get(0);
448+
return range * storedFieldsSequentialProportion <= count;
444449
}
445450

446451
private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
1515
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
1616
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
17-
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1817
import org.elasticsearch.compute.data.Block;
1918
import org.elasticsearch.compute.data.ElementType;
2019
import org.elasticsearch.compute.data.LongBlock;
@@ -31,7 +30,6 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
3130

3231
public record Factory(
3332
Rounding.Prepared timeBucket,
34-
boolean sortedInput,
3533
List<BlockHash.GroupSpec> groups,
3634
AggregatorMode aggregatorMode,
3735
List<GroupingAggregator.Factory> aggregators,
@@ -40,18 +38,17 @@ public record Factory(
4038
@Override
4139
public Operator get(DriverContext driverContext) {
4240
// TODO: use TimeSeriesBlockHash when possible
43-
return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
44-
if (sortedInput && groups.size() == 2) {
45-
return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
46-
} else {
47-
return BlockHash.build(
48-
groups,
49-
driverContext.blockFactory(),
50-
maxPageSize,
51-
true // we can enable optimizations as the inputs are vectors
52-
);
53-
}
54-
}, driverContext);
41+
return new TimeSeriesAggregationOperator(
42+
timeBucket,
43+
aggregators,
44+
() -> BlockHash.build(
45+
groups,
46+
driverContext.blockFactory(),
47+
maxPageSize,
48+
true // we can enable optimizations as the inputs are vectors
49+
),
50+
driverContext
51+
);
5552
}
5653

5754
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public String toString() {
198198
operators.add(
199199
new OrdinalsGroupingOperator(
200200
shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(mockBlContext()),
201-
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
201+
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
202202
ElementType.BYTES_REF,
203203
0,
204204
gField,

0 commit comments

Comments
 (0)