Skip to content

Commit 439ee19

Browse files
committed
Enable time-series block hash
1 parent d65f34d commit 439ee19

File tree

7 files changed

+145
-47
lines changed

7 files changed

+145
-47
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.elasticsearch.core.ReleasableIterator;
3131
import org.elasticsearch.core.Releasables;
3232

33-
import java.util.Objects;
34-
3533
/**
3634
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
3735
* Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -41,7 +39,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
4139
private final int tsHashChannel;
4240
private final int timestampIntervalChannel;
4341

44-
private final BytesRef lastTsid = new BytesRef();
42+
private int lastTsidPosition = 0;
4543
private final BytesRefArrayWithSize tsidArray;
4644

4745
private long lastTimestamp;
@@ -64,51 +62,88 @@ public void close() {
6462
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
6563
}
6664

65+
private OrdinalBytesRefVector getTsidVector(Page page) {
66+
BytesRefBlock block = page.getBlock(tsHashChannel);
67+
var ordinalBlock = block.asOrdinals();
68+
if (ordinalBlock == null) {
69+
throw new IllegalStateException("expected ordinal block for tsid");
70+
}
71+
var ordinalVector = ordinalBlock.asVector();
72+
if (ordinalVector == null) {
73+
throw new IllegalStateException("expected ordinal vector for tsid");
74+
}
75+
return ordinalVector;
76+
}
77+
78+
private LongVector getTimestampVector(Page page) {
79+
final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
80+
LongVector timestampsVector = timestampsBlock.asVector();
81+
if (timestampsVector == null) {
82+
throw new IllegalStateException("expected long vector for timestamp");
83+
}
84+
return timestampsVector;
85+
}
86+
6787
@Override
6888
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
69-
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
70-
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
71-
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
72-
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
73-
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
89+
final BytesRefVector tsidDict;
90+
final IntVector tsidOrdinals;
91+
{
92+
final var tsidVector = getTsidVector(page);
93+
tsidDict = tsidVector.getDictionaryVector();
94+
tsidOrdinals = tsidVector.getOrdinalsVector();
95+
}
96+
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
7497
final BytesRef spare = new BytesRef();
75-
// TODO: optimize incoming ordinal block
76-
for (int i = 0; i < tsidVector.getPositionCount(); i++) {
77-
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
98+
final LongVector timestampVector = getTimestampVector(page);
99+
int lastOrd = -1;
100+
for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
101+
final int newOrd = tsidOrdinals.getInt(i);
102+
boolean newGroup = false;
103+
if (lastOrd != newOrd) {
104+
final var newTsid = tsidDict.getBytesRef(newOrd, spare);
105+
if (positionCount() == 0) {
106+
newGroup = true;
107+
} else if (lastOrd == -1) {
108+
newGroup = lastTsid().equals(newTsid) == false;
109+
} else {
110+
newGroup = true;
111+
}
112+
if (newGroup) {
113+
endTsidGroup();
114+
lastTsidPosition = tsidArray.count;
115+
tsidArray.append(newTsid);
116+
}
117+
lastOrd = newOrd;
118+
}
78119
final long timestamp = timestampVector.getLong(i);
79-
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
120+
if (newGroup || timestamp != lastTimestamp) {
121+
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
122+
timestampArray.append(timestamp);
123+
lastTimestamp = timestamp;
124+
currentTimestampCount++;
125+
}
126+
ordsBuilder.appendInt(timestampArray.count - 1);
80127
}
81128
try (var ords = ordsBuilder.build()) {
82129
addInput.add(0, ords);
83130
}
84131
}
85132
}
86133

87-
private int addOnePosition(BytesRef tsid, long timestamp) {
88-
boolean newGroup = false;
89-
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90-
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
91-
endTsidGroup();
92-
tsidArray.append(tsid);
93-
tsidArray.get(tsidArray.count - 1, lastTsid);
94-
newGroup = true;
95-
}
96-
if (newGroup || timestamp != lastTimestamp) {
97-
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
98-
timestampArray.append(timestamp);
99-
lastTimestamp = timestamp;
100-
currentTimestampCount++;
101-
}
102-
return positionCount() - 1;
103-
}
104-
105134
private void endTsidGroup() {
106135
if (currentTimestampCount > 0) {
107136
perTsidCountArray.append(currentTimestampCount);
108137
currentTimestampCount = 0;
109138
}
110139
}
111140

141+
private BytesRef lastTsid() {
142+
final BytesRef bytesRef = new BytesRef();
143+
tsidArray.get(lastTsidPosition, bytesRef);
144+
return bytesRef;
145+
}
146+
112147
@Override
113148
public ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
114149
throw new UnsupportedOperationException("TODO");

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
7575
}
7676

7777
@Override
78-
public BytesRefVector asVector() {
78+
public OrdinalBytesRefVector asVector() {
7979
IntVector vector = ordinals.asVector();
8080
if (vector != null) {
8181
return new OrdinalBytesRefVector(vector, bytes);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
1515
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
1616
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
17+
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1718
import org.elasticsearch.compute.data.Block;
1819
import org.elasticsearch.compute.data.ElementType;
1920
import org.elasticsearch.compute.data.LongBlock;
@@ -30,6 +31,7 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
3031

3132
public record Factory(
3233
Rounding.Prepared timeBucket,
34+
boolean sortedInput,
3335
List<BlockHash.GroupSpec> groups,
3436
AggregatorMode aggregatorMode,
3537
List<GroupingAggregator.Factory> aggregators,
@@ -38,17 +40,18 @@ public record Factory(
3840
@Override
3941
public Operator get(DriverContext driverContext) {
4042
// TODO: use TimeSeriesBlockHash when possible
41-
return new TimeSeriesAggregationOperator(
42-
timeBucket,
43-
aggregators,
44-
() -> BlockHash.build(
45-
groups,
46-
driverContext.blockFactory(),
47-
maxPageSize,
48-
true // we can enable optimizations as the inputs are vectors
49-
),
50-
driverContext
51-
);
43+
return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
44+
if (sortedInput && groups.size() == 2) {
45+
return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
46+
} else {
47+
return BlockHash.build(
48+
groups,
49+
driverContext.blockFactory(),
50+
maxPageSize,
51+
true // we can enable optimizations as the inputs are vectors
52+
);
53+
}
54+
}, driverContext);
5255
}
5356

5457
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.EvalOperator;
1919
import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
2020
import org.elasticsearch.compute.operator.Operator;
21-
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
2221
import org.elasticsearch.index.analysis.AnalysisRegistry;
2322
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
2423
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -175,12 +174,12 @@ else if (aggregatorMode.isOutputPartial()) {
175174
);
176175
// time-series aggregation
177176
if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
178-
operatorFactory = new TimeSeriesAggregationOperator.Factory(
179-
ts.timeBucketRounding(context.foldCtx()),
180-
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
177+
operatorFactory = timeSeriesAggregatorOperatorFactor(
178+
ts,
181179
aggregatorMode,
182180
aggregatorFactories,
183-
context.pageSize(aggregateExec.estimatedRowSize())
181+
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
182+
context
184183
);
185184
// ordinal grouping
186185
} else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -379,4 +378,12 @@ public abstract Operator.OperatorFactory ordinalGroupingOperatorFactory(
379378
ElementType groupType,
380379
LocalExecutionPlannerContext context
381380
);
381+
382+
public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
383+
TimeSeriesAggregateExec ts,
384+
AggregatorMode aggregatorMode,
385+
List<GroupingAggregator.Factory> aggregatorFactories,
386+
List<BlockHash.GroupSpec> groupSpecs,
387+
LocalExecutionPlannerContext context
388+
);
382389
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.apache.lucene.search.IndexSearcher;
1515
import org.apache.lucene.search.Query;
1616
import org.elasticsearch.common.logging.HeaderWarning;
17+
import org.elasticsearch.compute.aggregation.AggregatorMode;
1718
import org.elasticsearch.compute.aggregation.GroupingAggregator;
19+
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1820
import org.elasticsearch.compute.data.Block;
1921
import org.elasticsearch.compute.data.ElementType;
2022
import org.elasticsearch.compute.lucene.DataPartitioning;
@@ -27,6 +29,7 @@
2729
import org.elasticsearch.compute.operator.Operator;
2830
import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
2931
import org.elasticsearch.compute.operator.SourceOperator;
32+
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
3033
import org.elasticsearch.core.Nullable;
3134
import org.elasticsearch.index.IndexMode;
3235
import org.elasticsearch.index.IndexSettings;
@@ -61,6 +64,7 @@
6164
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
6265
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
6366
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
67+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
6468
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
6569
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
6670
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
@@ -299,6 +303,24 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory(
299303
);
300304
}
301305

306+
@Override
307+
public Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
308+
TimeSeriesAggregateExec ts,
309+
AggregatorMode aggregatorMode,
310+
List<GroupingAggregator.Factory> aggregatorFactories,
311+
List<BlockHash.GroupSpec> groupSpecs,
312+
LocalExecutionPlannerContext context
313+
) {
314+
return new TimeSeriesAggregationOperator.Factory(
315+
ts.timeBucketRounding(context.foldCtx()),
316+
shardContexts.size() == 1 && ts.anyMatch(p -> p instanceof TimeSeriesSourceExec),
317+
groupSpecs,
318+
aggregatorMode,
319+
aggregatorFactories,
320+
context.pageSize(ts.estimatedRowSize())
321+
);
322+
}
323+
302324
public static class DefaultShardContext implements ShardContext {
303325
private final int index;
304326
private final SearchExecutionContext ctx;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,16 @@ public static String[] planOriginalIndices(PhysicalPlan plan) {
157157
return indices.toArray(String[]::new);
158158
}
159159

160+
public static boolean requireTimeSeriesSource(PhysicalPlan plan) {
161+
return plan.anyMatch(p -> {
162+
if (p instanceof FragmentExec f) {
163+
return f.fragment().anyMatch(l -> l instanceof EsRelation s && s.indexMode() == IndexMode.TIME_SERIES);
164+
} else {
165+
return false;
166+
}
167+
});
168+
}
169+
160170
private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
161171
plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
162172
if (r.indexMode() != IndexMode.LOOKUP) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.util.BigArrays;
1515
import org.elasticsearch.compute.Describable;
16+
import org.elasticsearch.compute.aggregation.AggregatorMode;
1617
import org.elasticsearch.compute.aggregation.GroupingAggregator;
1718
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1819
import org.elasticsearch.compute.data.Block;
@@ -31,6 +32,7 @@
3132
import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
3233
import org.elasticsearch.compute.operator.SourceOperator;
3334
import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
35+
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
3436
import org.elasticsearch.compute.test.TestBlockFactory;
3537
import org.elasticsearch.core.Nullable;
3638
import org.elasticsearch.env.Environment;
@@ -58,6 +60,7 @@
5860
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
5961
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
6062
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
63+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
6164
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
6265
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
6366
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
@@ -153,6 +156,24 @@ public Operator.OperatorFactory ordinalGroupingOperatorFactory(
153156
);
154157
}
155158

159+
@Override
160+
public Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
161+
TimeSeriesAggregateExec ts,
162+
AggregatorMode aggregatorMode,
163+
List<GroupingAggregator.Factory> aggregatorFactories,
164+
List<BlockHash.GroupSpec> groupSpecs,
165+
LocalExecutionPlannerContext context
166+
) {
167+
return new TimeSeriesAggregationOperator.Factory(
168+
ts.timeBucketRounding(context.foldCtx()),
169+
false,
170+
groupSpecs,
171+
aggregatorMode,
172+
aggregatorFactories,
173+
context.pageSize(ts.estimatedRowSize())
174+
);
175+
}
176+
156177
private class TestSourceOperator extends SourceOperator {
157178
private int index = 0;
158179
private final DriverContext driverContext;

0 commit comments

Comments
 (0)