Skip to content

Commit a521e13

Browse files
authored
Return slice and future timestamp in time-series source (#134767)
With this change, the time-series source operator will return two additional constant blocks: one for the slice index and one for the future max timestamp. 1. For the slice index, the rate can keep the last slice index and flush the rate buffer when the slice index changes. This allows partitioning the time-series source by time periods. 2. For the future max timestamp, we are currently returning a dummy Long.MAX_VALUE, but in a follow-up, we will return the max timestamp from segments. This will enable rate aggregation to reduce the outstanding buffer and avoid buffering unnecessary data points. This change is a preparation for future work. Relates #134324
1 parent d235a74 commit a521e13

File tree

19 files changed

+385
-81
lines changed

19 files changed

+385
-81
lines changed

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateGroupingAggregatorFunction.java.st

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre
126126
assert false : "expected timestamp vector in time-series aggregation";
127127
throw new IllegalStateException("expected timestamp vector in time-series aggregation");
128128
}
129+
IntVector sliceIndices = ((IntBlock) page.getBlock(channels.get(2))).asVector();
130+
assert sliceIndices != null : "expected slice indices vector in time-series aggregation";
131+
LongVector futureMaxTimestamps = ((LongBlock) page.getBlock(channels.get(3))).asVector();
132+
assert futureMaxTimestamps != null : "expected future max timestamps vector in time-series aggregation";
133+
129134
return new AddInput() {
130135
@Override
131136
public void add(int positionOffset, IntArrayBlock groupIds) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public abstract class LuceneOperator extends SourceOperator {
6565
final Set<Query> processedQueries = new HashSet<>();
6666
final Set<String> processedShards = new HashSet<>();
6767

68-
private LuceneSlice currentSlice;
68+
protected LuceneSlice currentSlice;
6969
private int sliceIndex;
7070

7171
private LuceneScorer currentScorer;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ public class LuceneSourceOperator extends LuceneOperator {
6060
private final int minPageSize;
6161

6262
public static class Factory extends LuceneOperator.Factory {
63-
private final List<? extends RefCounted> contexts;
64-
private final int maxPageSize;
65-
private final Limiter limiter;
63+
protected final List<? extends RefCounted> contexts;
64+
protected final int maxPageSize;
65+
protected final Limiter limiter;
6666

6767
public Factory(
6868
List<? extends ShardContext> contexts,
@@ -327,7 +327,8 @@ public Page getCheckedOutput() throws IOException {
327327
IntVector shard = null;
328328
IntVector leaf = null;
329329
IntVector docs = null;
330-
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
330+
int metadataBlocks = numMetadataBlocks();
331+
Block[] blocks = new Block[1 + metadataBlocks + scorer.tags().size()];
331332
currentPagePos -= discardedDocs;
332333
try {
333334
int shardId = scorer.shardContext().index();
@@ -340,10 +341,8 @@ public Page getCheckedOutput() throws IOException {
340341
shard = null;
341342
leaf = null;
342343
docs = null;
343-
if (scoreBuilder != null) {
344-
blocks[b++] = buildScoresVector(currentPagePos).asBlock();
345-
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
346-
}
344+
buildMetadataBlocks(blocks, b, currentPagePos);
345+
b += metadataBlocks;
347346
for (Object e : scorer.tags()) {
348347
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
349348
}
@@ -393,6 +392,17 @@ private DoubleVector buildScoresVector(int upToPositions) {
393392
}
394393
}
395394

395+
protected int numMetadataBlocks() {
396+
return scoreBuilder != null ? 1 : 0;
397+
}
398+
399+
protected void buildMetadataBlocks(Block[] blocks, int offset, int currentPagePos) {
400+
if (scoreBuilder != null) {
401+
blocks[offset] = buildScoresVector(currentPagePos).asBlock();
402+
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
403+
}
404+
}
405+
396406
@Override
397407
public void additionalClose() {
398408
Releasables.close(docsBuilder, scoreBuilder);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene;
9+
10+
import org.elasticsearch.compute.data.Block;
11+
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.operator.DriverContext;
13+
import org.elasticsearch.compute.operator.Limiter;
14+
import org.elasticsearch.compute.operator.SourceOperator;
15+
import org.elasticsearch.core.RefCounted;
16+
17+
import java.util.List;
18+
import java.util.function.Function;
19+
20+
/**
21+
* Extension of {@link LuceneSourceOperator} for time-series aggregation that inserts metadata blocks,
22+
* such as slice index and future max timestamp, to allow downstream operators to optimize processing.
23+
*/
24+
public final class TimeSeriesSourceOperator extends LuceneSourceOperator {
25+
26+
public static final class Factory extends LuceneSourceOperator.Factory {
27+
public Factory(
28+
List<? extends ShardContext> contexts,
29+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
30+
int taskConcurrency,
31+
int maxPageSize,
32+
int limit
33+
) {
34+
super(
35+
contexts,
36+
queryFunction,
37+
DataPartitioning.SHARD,
38+
query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
39+
taskConcurrency,
40+
maxPageSize,
41+
limit,
42+
false
43+
);
44+
}
45+
46+
@Override
47+
public SourceOperator get(DriverContext driverContext) {
48+
return new TimeSeriesSourceOperator(contexts, driverContext.blockFactory(), maxPageSize, sliceQueue, limit, limiter);
49+
}
50+
51+
@Override
52+
public String describe() {
53+
return "TimeSeriesSourceOperator[maxPageSize = " + maxPageSize + ", limit = " + limit + "]";
54+
}
55+
}
56+
57+
public TimeSeriesSourceOperator(
58+
List<? extends RefCounted> shardContextCounters,
59+
BlockFactory blockFactory,
60+
int maxPageSize,
61+
LuceneSliceQueue sliceQueue,
62+
int limit,
63+
Limiter limiter
64+
) {
65+
super(shardContextCounters, blockFactory, maxPageSize, sliceQueue, limit, limiter, false);
66+
}
67+
68+
@Override
69+
protected int numMetadataBlocks() {
70+
// See EsQueryExec#TIME_SERIES_SOURCE_FIELDS
71+
return 2;
72+
}
73+
74+
@Override
75+
protected void buildMetadataBlocks(Block[] blocks, int offset, int currentPagePos) {
76+
blocks[offset] = blockFactory.newConstantIntVector(currentSlice.slicePosition(), currentPagePos).asBlock();
77+
blocks[offset + 1] = blockFactory.newConstantLongVector(Long.MAX_VALUE, currentPagePos).asBlock();
78+
}
79+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)