Skip to content

Commit 767fdfd

Browse files
committed
Enable time-series block hash
1 parent d65f34d commit 767fdfd

File tree

7 files changed

+187
-46
lines changed

7 files changed

+187
-46
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

Lines changed: 107 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@
2626
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
2727
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2828
import org.elasticsearch.compute.data.Page;
29+
import org.elasticsearch.core.Assertions;
2930
import org.elasticsearch.core.Releasable;
3031
import org.elasticsearch.core.ReleasableIterator;
3132
import org.elasticsearch.core.Releasables;
3233

33-
import java.util.Objects;
34-
3534
/**
3635
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
3736
* Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -41,7 +40,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
4140
private final int tsHashChannel;
4241
private final int timestampIntervalChannel;
4342

44-
private final BytesRef lastTsid = new BytesRef();
43+
private int lastTsidPosition = 0;
4544
private final BytesRefArrayWithSize tsidArray;
4645

4746
private long lastTimestamp;
@@ -50,56 +49,116 @@ public final class TimeSeriesBlockHash extends BlockHash {
5049
private int currentTimestampCount;
5150
private final IntArrayWithSize perTsidCountArray;
5251

52+
private final BytesRefLongBlockHash assertingHash;
53+
5354
public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) {
5455
super(blockFactory);
5556
this.tsHashChannel = tsHashChannel;
5657
this.timestampIntervalChannel = timestampIntervalChannel;
5758
this.tsidArray = new BytesRefArrayWithSize(blockFactory);
5859
this.timestampArray = new LongArrayWithSize(blockFactory);
5960
this.perTsidCountArray = new IntArrayWithSize(blockFactory);
61+
if (Assertions.ENABLED) {
62+
assertingHash = new BytesRefLongBlockHash(blockFactory, tsHashChannel, timestampIntervalChannel, false, Integer.MAX_VALUE);
63+
} else {
64+
assertingHash = null;
65+
}
6066
}
6167

6268
@Override
6369
public void close() {
64-
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
70+
Releasables.close(tsidArray, timestampArray, perTsidCountArray, assertingHash);
71+
}
72+
73+
private OrdinalBytesRefVector getTsidVector(Page page) {
74+
BytesRefBlock block = page.getBlock(tsHashChannel);
75+
var ordinalBlock = block.asOrdinals();
76+
if (ordinalBlock == null) {
77+
throw new IllegalStateException("expected ordinal block for tsid");
78+
}
79+
var ordinalVector = ordinalBlock.asVector();
80+
if (ordinalVector == null) {
81+
throw new IllegalStateException("expected ordinal vector for tsid");
82+
}
83+
return ordinalVector;
84+
}
85+
86+
private LongVector getTimestampVector(Page page) {
87+
final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
88+
LongVector timestampsVector = timestampsBlock.asVector();
89+
if (timestampsVector == null) {
90+
throw new IllegalStateException("expected long vector for timestamp");
91+
}
92+
return timestampsVector;
6593
}
6694

6795
@Override
6896
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
69-
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
70-
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
71-
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
72-
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
73-
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
97+
final BytesRefVector tsidDict;
98+
final IntVector tsidOrdinals;
99+
{
100+
final var tsidVector = getTsidVector(page);
101+
tsidDict = tsidVector.getDictionaryVector();
102+
tsidOrdinals = tsidVector.getOrdinalsVector();
103+
}
104+
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
74105
final BytesRef spare = new BytesRef();
75-
// TODO: optimize incoming ordinal block
76-
for (int i = 0; i < tsidVector.getPositionCount(); i++) {
77-
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
106+
final LongVector timestampVector = getTimestampVector(page);
107+
int lastOrd = -1;
108+
for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
109+
final int newOrd = tsidOrdinals.getInt(i);
110+
boolean newGroup = false;
111+
if (lastOrd != newOrd) {
112+
final var newTsid = tsidDict.getBytesRef(newOrd, spare);
113+
if (positionCount() == 0) {
114+
newGroup = true;
115+
} else if (lastOrd == -1) {
116+
newGroup = lastTsid().equals(newTsid) == false;
117+
} else {
118+
newGroup = true;
119+
}
120+
if (newGroup) {
121+
endTsidGroup();
122+
lastTsidPosition = tsidArray.count;
123+
tsidArray.append(newTsid);
124+
}
125+
lastOrd = newOrd;
126+
}
78127
final long timestamp = timestampVector.getLong(i);
79-
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
128+
if (newGroup || timestamp != lastTimestamp) {
129+
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
130+
timestampArray.append(timestamp);
131+
lastTimestamp = timestamp;
132+
currentTimestampCount++;
133+
}
134+
ordsBuilder.appendInt(timestampArray.count - 1);
80135
}
81136
try (var ords = ordsBuilder.build()) {
82137
addInput.add(0, ords);
138+
assert assertingAddInputPage(page, ords);
83139
}
84140
}
85141
}
86142

87-
private int addOnePosition(BytesRef tsid, long timestamp) {
88-
boolean newGroup = false;
89-
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90-
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
91-
endTsidGroup();
92-
tsidArray.append(tsid);
93-
tsidArray.get(tsidArray.count - 1, lastTsid);
94-
newGroup = true;
95-
}
96-
if (newGroup || timestamp != lastTimestamp) {
97-
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
98-
timestampArray.append(timestamp);
99-
lastTimestamp = timestamp;
100-
currentTimestampCount++;
101-
}
102-
return positionCount() - 1;
143+
private boolean assertingAddInputPage(Page page, IntVector actualIds) {
144+
assert assertingHash != null;
145+
assertingHash.add(page, new GroupingAggregatorFunction.AddInput() {
146+
@Override
147+
public void add(int positionOffset, IntBlock groupIds) {
148+
assert false : "add(IntBlock) shouldn't be called";
149+
}
150+
151+
@Override
152+
public void add(int positionOffset, IntVector expectedIds) {
153+
assert expectedIds.equals(actualIds) : "actual=" + actualIds + " vs expected = " + expectedIds;
154+
}
155+
156+
@Override
157+
public void close() {
158+
159+
}
160+
});
161+
return true;
103162
}
104163

105164
private void endTsidGroup() {
@@ -109,6 +168,12 @@ private void endTsidGroup() {
109168
}
110169
}
111170

171+
private BytesRef lastTsid() {
172+
final BytesRef bytesRef = new BytesRef();
173+
tsidArray.get(lastTsidPosition, bytesRef);
174+
return bytesRef;
175+
}
176+
112177
@Override
113178
public ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
114179
throw new UnsupportedOperationException("TODO");
@@ -125,6 +190,7 @@ public Block[] getKeys() {
125190
blocks[0] = buildTsidBlock();
126191
}
127192
blocks[1] = timestampArray.toBlock();
193+
assert assertingKeys(blocks);
128194
return blocks;
129195
} finally {
130196
if (blocks[blocks.length - 1] == null) {
@@ -133,6 +199,18 @@ public Block[] getKeys() {
133199
}
134200
}
135201

202+
private boolean assertingKeys(Block[] actualKeys) {
203+
assert assertingHash != null;
204+
Block[] expectedKeys = assertingHash.getKeys();
205+
try {
206+
assert expectedKeys[0].equals(actualKeys[0]) : "actual=" + actualKeys[0] + " vs expected = " + expectedKeys[0];
207+
assert expectedKeys[1].equals(actualKeys[1]) : "actual=" + actualKeys[1] + " vs expected = " + expectedKeys[1];
208+
} finally {
209+
Releasables.close(expectedKeys);
210+
}
211+
return true;
212+
}
213+
136214
private BytesRefBlock buildTsidBlockWithOrdinal() {
137215
try (IntVector.FixedBuilder ordinalBuilder = blockFactory.newIntVectorFixedBuilder(positionCount())) {
138216
for (int i = 0; i < tsidArray.count; i++) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
7575
}
7676

7777
@Override
78-
public BytesRefVector asVector() {
78+
public OrdinalBytesRefVector asVector() {
7979
IntVector vector = ordinals.asVector();
8080
if (vector != null) {
8181
return new OrdinalBytesRefVector(vector, bytes);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
1515
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
1616
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
17+
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1718
import org.elasticsearch.compute.data.Block;
1819
import org.elasticsearch.compute.data.ElementType;
1920
import org.elasticsearch.compute.data.LongBlock;
@@ -30,6 +31,7 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
3031

3132
public record Factory(
3233
Rounding.Prepared timeBucket,
34+
boolean sortedInput,
3335
List<BlockHash.GroupSpec> groups,
3436
AggregatorMode aggregatorMode,
3537
List<GroupingAggregator.Factory> aggregators,
@@ -38,17 +40,18 @@ public record Factory(
3840
@Override
3941
public Operator get(DriverContext driverContext) {
4042
// TODO: use TimeSeriesBlockHash when possible
41-
return new TimeSeriesAggregationOperator(
42-
timeBucket,
43-
aggregators,
44-
() -> BlockHash.build(
45-
groups,
46-
driverContext.blockFactory(),
47-
maxPageSize,
48-
true // we can enable optimizations as the inputs are vectors
49-
),
50-
driverContext
51-
);
43+
return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
44+
if (sortedInput && groups.size() == 2) {
45+
return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
46+
} else {
47+
return BlockHash.build(
48+
groups,
49+
driverContext.blockFactory(),
50+
maxPageSize,
51+
true // we can enable optimizations as the inputs are vectors
52+
);
53+
}
54+
}, driverContext);
5255
}
5356

5457
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.EvalOperator;
1919
import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
2020
import org.elasticsearch.compute.operator.Operator;
21-
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
2221
import org.elasticsearch.index.analysis.AnalysisRegistry;
2322
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
2423
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -175,12 +174,12 @@ else if (aggregatorMode.isOutputPartial()) {
175174
);
176175
// time-series aggregation
177176
if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
178-
operatorFactory = new TimeSeriesAggregationOperator.Factory(
179-
ts.timeBucketRounding(context.foldCtx()),
180-
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
177+
operatorFactory = timeSeriesAggregatorOperatorFactor(
178+
ts,
181179
aggregatorMode,
182180
aggregatorFactories,
183-
context.pageSize(aggregateExec.estimatedRowSize())
181+
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
182+
context
184183
);
185184
// ordinal grouping
186185
} else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -379,4 +378,12 @@ public abstract Operator.OperatorFactory ordinalGroupingOperatorFactory(
379378
ElementType groupType,
380379
LocalExecutionPlannerContext context
381380
);
381+
382+
public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
383+
TimeSeriesAggregateExec ts,
384+
AggregatorMode aggregatorMode,
385+
List<GroupingAggregator.Factory> aggregatorFactories,
386+
List<BlockHash.GroupSpec> groupSpecs,
387+
LocalExecutionPlannerContext context
388+
);
382389
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.apache.lucene.search.IndexSearcher;
1515
import org.apache.lucene.search.Query;
1616
import org.elasticsearch.common.logging.HeaderWarning;
17+
import org.elasticsearch.compute.aggregation.AggregatorMode;
1718
import org.elasticsearch.compute.aggregation.GroupingAggregator;
19+
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1820
import org.elasticsearch.compute.data.Block;
1921
import org.elasticsearch.compute.data.ElementType;
2022
import org.elasticsearch.compute.lucene.DataPartitioning;
@@ -27,6 +29,7 @@
2729
import org.elasticsearch.compute.operator.Operator;
2830
import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
2931
import org.elasticsearch.compute.operator.SourceOperator;
32+
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
3033
import org.elasticsearch.core.Nullable;
3134
import org.elasticsearch.index.IndexMode;
3235
import org.elasticsearch.index.IndexSettings;
@@ -61,6 +64,7 @@
6164
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
6265
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
6366
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
67+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
6468
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
6569
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
6670
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
@@ -299,6 +303,24 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory(
299303
);
300304
}
301305

306+
@Override
307+
public Operator.OperatorFactory timeSeriesAggregatorOperatorFactor(
308+
TimeSeriesAggregateExec ts,
309+
AggregatorMode aggregatorMode,
310+
List<GroupingAggregator.Factory> aggregatorFactories,
311+
List<BlockHash.GroupSpec> groupSpecs,
312+
LocalExecutionPlannerContext context
313+
) {
314+
return new TimeSeriesAggregationOperator.Factory(
315+
ts.timeBucketRounding(context.foldCtx()),
316+
shardContexts.size() == 1 && ts.anyMatch(p -> p instanceof TimeSeriesSourceExec),
317+
groupSpecs,
318+
aggregatorMode,
319+
aggregatorFactories,
320+
context.pageSize(ts.estimatedRowSize())
321+
);
322+
}
323+
302324
public static class DefaultShardContext implements ShardContext {
303325
private final int index;
304326
private final SearchExecutionContext ctx;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,16 @@ public static String[] planOriginalIndices(PhysicalPlan plan) {
157157
return indices.toArray(String[]::new);
158158
}
159159

160+
public static boolean requireTimeSeriesSource(PhysicalPlan plan) {
161+
return plan.anyMatch(p -> {
162+
if (p instanceof FragmentExec f) {
163+
return f.fragment().anyMatch(l -> l instanceof EsRelation s && s.indexMode() == IndexMode.TIME_SERIES);
164+
} else {
165+
return false;
166+
}
167+
});
168+
}
169+
160170
private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
161171
plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
162172
if (r.indexMode() != IndexMode.LOOKUP) {

0 commit comments

Comments
 (0)