Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
Expand All @@ -22,7 +21,6 @@
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBigArrayVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
Expand Down Expand Up @@ -52,15 +50,13 @@ public final class TimeSeriesBlockHash extends BlockHash {
private int currentTimestampCount;
private final IntArrayWithSize perTsidCountArray;

int groupOrdinal = -1;

public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) {
super(blockFactory);
this.tsHashChannel = tsHashChannel;
this.timestampIntervalChannel = timestampIntervalChannel;
this.tsidArray = new BytesRefArrayWithSize();
this.timestampArray = new LongArrayWithSize();
this.perTsidCountArray = new IntArrayWithSize();
this.tsidArray = new BytesRefArrayWithSize(blockFactory);
this.timestampArray = new LongArrayWithSize(blockFactory);
this.perTsidCountArray = new IntArrayWithSize(blockFactory);
}

@Override
Expand Down Expand Up @@ -90,8 +86,8 @@ public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {

private int addOnePosition(BytesRef tsid, long timestamp) {
boolean newGroup = false;
if (groupOrdinal == -1 || lastTsid.equals(tsid) == false) {
assert groupOrdinal == -1 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
endTsidGroup();
tsidArray.append(tsid);
tsidArray.get(tsidArray.count - 1, lastTsid);
Expand All @@ -101,10 +97,9 @@ private int addOnePosition(BytesRef tsid, long timestamp) {
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
timestampArray.append(timestamp);
lastTimestamp = timestamp;
groupOrdinal++;
currentTimestampCount++;
}
return groupOrdinal;
return positionCount() - 1;
}

private void endTsidGroup() {
Expand Down Expand Up @@ -177,7 +172,7 @@ private BytesRefBlock buildTsidBlock() {
}

private int positionCount() {
return groupOrdinal + 1;
return timestampArray.count;
}

@Override
Expand All @@ -197,15 +192,17 @@ public String toString() {
+ "], LongKey[channel="
+ timestampIntervalChannel
+ "]], entries="
+ groupOrdinal
+ positionCount()
+ "b}";
}

private class LongArrayWithSize implements Releasable {
private static class LongArrayWithSize implements Releasable {
private final BlockFactory blockFactory;
private LongArray array;
private int count = 0;

LongArrayWithSize() {
LongArrayWithSize(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
this.array = blockFactory.bigArrays().newLongArray(1, false);
}

Expand All @@ -216,10 +213,12 @@ void append(long value) {
}

LongBlock toBlock() {
LongBlock block = new LongBigArrayVector(array, count, blockFactory).asBlock();
blockFactory.adjustBreaker(block.ramBytesUsed() - RamUsageEstimator.sizeOf(array));
array = null;
return block;
try (var builder = blockFactory.newLongVectorFixedBuilder(count)) {
for (int i = 0; i < count; i++) {
builder.appendLong(array.get(i));
}
return builder.build().asBlock();
}
}

@Override
Expand All @@ -228,11 +227,13 @@ public void close() {
}
}

private class IntArrayWithSize implements Releasable {
private static class IntArrayWithSize implements Releasable {
private final BlockFactory blockFactory;
private IntArray array;
private int count = 0;

IntArrayWithSize() {
IntArrayWithSize(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
this.array = blockFactory.bigArrays().newIntArray(1, false);
}

Expand All @@ -248,11 +249,13 @@ public void close() {
}
}

private class BytesRefArrayWithSize implements Releasable {
private final BytesRefArray array;
private static class BytesRefArrayWithSize implements Releasable {
private final BlockFactory blockFactory;
private BytesRefArray array;
private int count = 0;

BytesRefArrayWithSize() {
BytesRefArrayWithSize(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
this.array = new BytesRefArray(1, blockFactory.bigArrays());
}

Expand All @@ -266,8 +269,9 @@ void get(int index, BytesRef dest) {
}

BytesRefVector toVector() {
BytesRefVector vector = blockFactory.newBytesRefArrayVector(tsidArray.array, tsidArray.count);
blockFactory.adjustBreaker(vector.ramBytesUsed() - tsidArray.array.bigArraysRamBytesUsed());
BytesRefVector vector = blockFactory.newBytesRefArrayVector(array, count);
blockFactory.adjustBreaker(vector.ramBytesUsed() - array.bigArraysRamBytesUsed());
array = null;
return vector;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.GroupingAggregator;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;

import java.util.List;
import java.util.function.Supplier;
Expand All @@ -32,18 +31,17 @@ public record Factory(
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new HashAggregationOperator(aggregators, () -> {
if (aggregatorMode.isInputPartial()) {
return BlockHash.build(
List.of(tsidGroup, timestampGroup),
driverContext.blockFactory(),
maxPageSize,
true // we can enable optimizations as the inputs are vectors
);
} else {
return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory());
}
}, driverContext);
// TODO: use TimeSeriesBlockHash when possible
return new HashAggregationOperator(
aggregators,
() -> BlockHash.build(
List.of(tsidGroup, timestampGroup),
driverContext.blockFactory(),
maxPageSize,
true // we can enable optimizations as the inputs are vectors
),
driverContext
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
Expand Down Expand Up @@ -176,8 +176,7 @@ else if (aggregatorMode.isOutputPartial()) {
// time-series aggregation
if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator)
&& groupSpecs.size() == 2
&& groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE
&& groupSpecs.get(1).attribute.dataType() == DataType.LONG) {
&& groupSpecs.get(0).attribute.name().equals(MetadataAttribute.TSID_FIELD)) {
operatorFactory = new TimeSeriesAggregationOperator.Factory(
groupSpecs.get(0).toHashGroupSpec(),
groupSpecs.get(1).toHashGroupSpec(),
Expand Down