Skip to content

Commit 96e2514

Browse files
dnhatnomricohenn
authored andcommitted
Fix channels in TimeSeriesAggregationOperator (elastic#125736)
Fix the channel in TimeSeriesAggregationOperator. Relates elastic#125537
1 parent 6f58842 commit 96e2514

File tree

3 files changed

+43
-42
lines changed

3 files changed

+43
-42
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.compute.aggregation.blockhash;
99

1010
import org.apache.lucene.util.BytesRef;
11-
import org.apache.lucene.util.RamUsageEstimator;
1211
import org.elasticsearch.common.unit.ByteSizeValue;
1312
import org.elasticsearch.common.util.BigArrays;
1413
import org.elasticsearch.common.util.BitArray;
@@ -22,7 +21,6 @@
2221
import org.elasticsearch.compute.data.BytesRefVector;
2322
import org.elasticsearch.compute.data.IntBlock;
2423
import org.elasticsearch.compute.data.IntVector;
25-
import org.elasticsearch.compute.data.LongBigArrayVector;
2624
import org.elasticsearch.compute.data.LongBlock;
2725
import org.elasticsearch.compute.data.LongVector;
2826
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
@@ -52,15 +50,13 @@ public final class TimeSeriesBlockHash extends BlockHash {
5250
private int currentTimestampCount;
5351
private final IntArrayWithSize perTsidCountArray;
5452

55-
int groupOrdinal = -1;
56-
5753
public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) {
5854
super(blockFactory);
5955
this.tsHashChannel = tsHashChannel;
6056
this.timestampIntervalChannel = timestampIntervalChannel;
61-
this.tsidArray = new BytesRefArrayWithSize();
62-
this.timestampArray = new LongArrayWithSize();
63-
this.perTsidCountArray = new IntArrayWithSize();
57+
this.tsidArray = new BytesRefArrayWithSize(blockFactory);
58+
this.timestampArray = new LongArrayWithSize(blockFactory);
59+
this.perTsidCountArray = new IntArrayWithSize(blockFactory);
6460
}
6561

6662
@Override
@@ -90,8 +86,8 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
9086

9187
private int addOnePosition(BytesRef tsid, long timestamp) {
9288
boolean newGroup = false;
93-
if (groupOrdinal == -1 || lastTsid.equals(tsid) == false) {
94-
assert groupOrdinal == -1 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
89+
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90+
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
9591
endTsidGroup();
9692
tsidArray.append(tsid);
9793
tsidArray.get(tsidArray.count - 1, lastTsid);
@@ -101,10 +97,9 @@ private int addOnePosition(BytesRef tsid, long timestamp) {
10197
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
10298
timestampArray.append(timestamp);
10399
lastTimestamp = timestamp;
104-
groupOrdinal++;
105100
currentTimestampCount++;
106101
}
107-
return groupOrdinal;
102+
return positionCount() - 1;
108103
}
109104

110105
private void endTsidGroup() {
@@ -177,7 +172,7 @@ private BytesRefBlock buildTsidBlock() {
177172
}
178173

179174
private int positionCount() {
180-
return groupOrdinal + 1;
175+
return timestampArray.count;
181176
}
182177

183178
@Override
@@ -197,15 +192,17 @@ public String toString() {
197192
+ "], LongKey[channel="
198193
+ timestampIntervalChannel
199194
+ "]], entries="
200-
+ groupOrdinal
195+
+ positionCount()
201196
+ "b}";
202197
}
203198

204-
private class LongArrayWithSize implements Releasable {
199+
private static class LongArrayWithSize implements Releasable {
200+
private final BlockFactory blockFactory;
205201
private LongArray array;
206202
private int count = 0;
207203

208-
LongArrayWithSize() {
204+
LongArrayWithSize(BlockFactory blockFactory) {
205+
this.blockFactory = blockFactory;
209206
this.array = blockFactory.bigArrays().newLongArray(1, false);
210207
}
211208

@@ -216,10 +213,12 @@ void append(long value) {
216213
}
217214

218215
LongBlock toBlock() {
219-
LongBlock block = new LongBigArrayVector(array, count, blockFactory).asBlock();
220-
blockFactory.adjustBreaker(block.ramBytesUsed() - RamUsageEstimator.sizeOf(array));
221-
array = null;
222-
return block;
216+
try (var builder = blockFactory.newLongVectorFixedBuilder(count)) {
217+
for (int i = 0; i < count; i++) {
218+
builder.appendLong(array.get(i));
219+
}
220+
return builder.build().asBlock();
221+
}
223222
}
224223

225224
@Override
@@ -228,11 +227,13 @@ public void close() {
228227
}
229228
}
230229

231-
private class IntArrayWithSize implements Releasable {
230+
private static class IntArrayWithSize implements Releasable {
231+
private final BlockFactory blockFactory;
232232
private IntArray array;
233233
private int count = 0;
234234

235-
IntArrayWithSize() {
235+
IntArrayWithSize(BlockFactory blockFactory) {
236+
this.blockFactory = blockFactory;
236237
this.array = blockFactory.bigArrays().newIntArray(1, false);
237238
}
238239

@@ -248,11 +249,13 @@ public void close() {
248249
}
249250
}
250251

251-
private class BytesRefArrayWithSize implements Releasable {
252-
private final BytesRefArray array;
252+
private static class BytesRefArrayWithSize implements Releasable {
253+
private final BlockFactory blockFactory;
254+
private BytesRefArray array;
253255
private int count = 0;
254256

255-
BytesRefArrayWithSize() {
257+
BytesRefArrayWithSize(BlockFactory blockFactory) {
258+
this.blockFactory = blockFactory;
256259
this.array = new BytesRefArray(1, blockFactory.bigArrays());
257260
}
258261

@@ -266,8 +269,9 @@ void get(int index, BytesRef dest) {
266269
}
267270

268271
BytesRefVector toVector() {
269-
BytesRefVector vector = blockFactory.newBytesRefArrayVector(tsidArray.array, tsidArray.count);
270-
blockFactory.adjustBreaker(vector.ramBytesUsed() - tsidArray.array.bigArraysRamBytesUsed());
272+
BytesRefVector vector = blockFactory.newBytesRefArrayVector(array, count);
273+
blockFactory.adjustBreaker(vector.ramBytesUsed() - array.bigArraysRamBytesUsed());
274+
array = null;
271275
return vector;
272276
}
273277

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.compute.aggregation.AggregatorMode;
1212
import org.elasticsearch.compute.aggregation.GroupingAggregator;
1313
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
14-
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1514

1615
import java.util.List;
1716
import java.util.function.Supplier;
@@ -32,18 +31,17 @@ public record Factory(
3231
) implements OperatorFactory {
3332
@Override
3433
public Operator get(DriverContext driverContext) {
35-
return new HashAggregationOperator(aggregators, () -> {
36-
if (aggregatorMode.isInputPartial()) {
37-
return BlockHash.build(
38-
List.of(tsidGroup, timestampGroup),
39-
driverContext.blockFactory(),
40-
maxPageSize,
41-
true // we can enable optimizations as the inputs are vectors
42-
);
43-
} else {
44-
return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory());
45-
}
46-
}, driverContext);
34+
// TODO: use TimeSeriesBlockHash when possible
35+
return new HashAggregationOperator(
36+
aggregators,
37+
() -> BlockHash.build(
38+
List.of(tsidGroup, timestampGroup),
39+
driverContext.blockFactory(),
40+
maxPageSize,
41+
true // we can enable optimizations as the inputs are vectors
42+
),
43+
driverContext
44+
);
4745
}
4846

4947
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import org.elasticsearch.xpack.esql.core.expression.Expression;
2828
import org.elasticsearch.xpack.esql.core.expression.Expressions;
2929
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
30+
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
3031
import org.elasticsearch.xpack.esql.core.expression.NameId;
3132
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
32-
import org.elasticsearch.xpack.esql.core.type.DataType;
3333
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
3434
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
3535
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
@@ -176,8 +176,7 @@ else if (aggregatorMode.isOutputPartial()) {
176176
// time-series aggregation
177177
if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator)
178178
&& groupSpecs.size() == 2
179-
&& groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE
180-
&& groupSpecs.get(1).attribute.dataType() == DataType.LONG) {
179+
&& groupSpecs.get(0).attribute.name().equals(MetadataAttribute.TSID_FIELD)) {
181180
operatorFactory = new TimeSeriesAggregationOperator.Factory(
182181
groupSpecs.get(0).toHashGroupSpec(),
183182
groupSpecs.get(1).toHashGroupSpec(),

0 commit comments

Comments
 (0)