Skip to content

Commit 55641fb

Browse files
Merge branch 'main' into fix/127466
2 parents ee2f458 + 34ebf8b commit 55641fb

File tree

8 files changed

+47
-253
lines changed

8 files changed

+47
-253
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java

Lines changed: 30 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.elasticsearch.core.ReleasableIterator;
3131
import org.elasticsearch.core.Releasables;
3232

33+
import java.util.Objects;
34+
3335
/**
3436
* An optimized block hash that receives two blocks: tsid and timestamp, which are sorted.
3537
* Since the incoming data is sorted, this block hash appends the incoming data to the internal arrays without lookup.
@@ -39,7 +41,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
3941
private final int tsHashChannel;
4042
private final int timestampIntervalChannel;
4143

42-
private int lastTsidPosition = 0;
44+
private final BytesRef lastTsid = new BytesRef();
4345
private final BytesRefArrayWithSize tsidArray;
4446

4547
private long lastTimestamp;
@@ -62,77 +64,44 @@ public void close() {
6264
Releasables.close(tsidArray, timestampArray, perTsidCountArray);
6365
}
6466

65-
private OrdinalBytesRefVector getTsidVector(Page page) {
66-
BytesRefBlock block = page.getBlock(tsHashChannel);
67-
var ordinalBlock = block.asOrdinals();
68-
if (ordinalBlock == null) {
69-
throw new IllegalStateException("expected ordinal block for tsid");
70-
}
71-
var ordinalVector = ordinalBlock.asVector();
72-
if (ordinalVector == null) {
73-
throw new IllegalStateException("expected ordinal vector for tsid");
74-
}
75-
return ordinalVector;
76-
}
77-
78-
private LongVector getTimestampVector(Page page) {
79-
final LongBlock timestampsBlock = page.getBlock(timestampIntervalChannel);
80-
LongVector timestampsVector = timestampsBlock.asVector();
81-
if (timestampsVector == null) {
82-
throw new IllegalStateException("expected long vector for timestamp");
83-
}
84-
return timestampsVector;
85-
}
86-
8767
@Override
8868
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
89-
final BytesRefVector tsidDict;
90-
final IntVector tsidOrdinals;
91-
{
92-
final var tsidVector = getTsidVector(page);
93-
tsidDict = tsidVector.getDictionaryVector();
94-
tsidOrdinals = tsidVector.getOrdinalsVector();
95-
}
96-
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidOrdinals.getPositionCount())) {
69+
final BytesRefBlock tsidBlock = page.getBlock(tsHashChannel);
70+
final BytesRefVector tsidVector = Objects.requireNonNull(tsidBlock.asVector(), "tsid input must be a vector");
71+
final LongBlock timestampBlock = page.getBlock(timestampIntervalChannel);
72+
final LongVector timestampVector = Objects.requireNonNull(timestampBlock.asVector(), "timestamp input must be a vector");
73+
try (var ordsBuilder = blockFactory.newIntVectorBuilder(tsidVector.getPositionCount())) {
9774
final BytesRef spare = new BytesRef();
98-
final BytesRef lastTsid = new BytesRef();
99-
final LongVector timestampVector = getTimestampVector(page);
100-
int lastOrd = -1;
101-
for (int i = 0; i < tsidOrdinals.getPositionCount(); i++) {
102-
final int newOrd = tsidOrdinals.getInt(i);
103-
boolean newGroup = false;
104-
if (lastOrd != newOrd) {
105-
final var newTsid = tsidDict.getBytesRef(newOrd, spare);
106-
if (positionCount() == 0) {
107-
newGroup = true;
108-
} else if (lastOrd == -1) {
109-
tsidArray.get(lastTsidPosition, lastTsid);
110-
newGroup = lastTsid.equals(newTsid) == false;
111-
} else {
112-
newGroup = true;
113-
}
114-
if (newGroup) {
115-
endTsidGroup();
116-
lastTsidPosition = tsidArray.count;
117-
tsidArray.append(newTsid);
118-
}
119-
lastOrd = newOrd;
120-
}
75+
// TODO: optimize incoming ordinal block
76+
for (int i = 0; i < tsidVector.getPositionCount(); i++) {
77+
final BytesRef tsid = tsidVector.getBytesRef(i, spare);
12178
final long timestamp = timestampVector.getLong(i);
122-
if (newGroup || timestamp != lastTimestamp) {
123-
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
124-
timestampArray.append(timestamp);
125-
lastTimestamp = timestamp;
126-
currentTimestampCount++;
127-
}
128-
ordsBuilder.appendInt(timestampArray.count - 1);
79+
ordsBuilder.appendInt(addOnePosition(tsid, timestamp));
12980
}
13081
try (var ords = ordsBuilder.build()) {
13182
addInput.add(0, ords);
13283
}
13384
}
13485
}
13586

87+
private int addOnePosition(BytesRef tsid, long timestamp) {
88+
boolean newGroup = false;
89+
if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
90+
assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
91+
endTsidGroup();
92+
tsidArray.append(tsid);
93+
tsidArray.get(tsidArray.count - 1, lastTsid);
94+
newGroup = true;
95+
}
96+
if (newGroup || timestamp != lastTimestamp) {
97+
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
98+
timestampArray.append(timestamp);
99+
lastTimestamp = timestamp;
100+
currentTimestampCount++;
101+
}
102+
return positionCount() - 1;
103+
}
104+
136105
private void endTsidGroup() {
137106
if (currentTimestampCount > 0) {
138107
perTsidCountArray.append(currentTimestampCount);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
7575
}
7676

7777
@Override
78-
public OrdinalBytesRefVector asVector() {
78+
public BytesRefVector asVector() {
7979
IntVector vector = ordinals.asVector();
8080
if (vector != null) {
8181
return new OrdinalBytesRefVector(vector, bytes);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
1515
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
1616
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
17-
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1817
import org.elasticsearch.compute.data.Block;
1918
import org.elasticsearch.compute.data.ElementType;
2019
import org.elasticsearch.compute.data.LongBlock;
@@ -31,7 +30,6 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
3130

3231
public record Factory(
3332
Rounding.Prepared timeBucket,
34-
boolean sortedInput,
3533
List<BlockHash.GroupSpec> groups,
3634
AggregatorMode aggregatorMode,
3735
List<GroupingAggregator.Factory> aggregators,
@@ -40,18 +38,17 @@ public record Factory(
4038
@Override
4139
public Operator get(DriverContext driverContext) {
4240
// TODO: use TimeSeriesBlockHash when possible
43-
return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> {
44-
if (sortedInput && groups.size() == 2) {
45-
return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory());
46-
} else {
47-
return BlockHash.build(
48-
groups,
49-
driverContext.blockFactory(),
50-
maxPageSize,
51-
true // we can enable optimizations as the inputs are vectors
52-
);
53-
}
54-
}, driverContext);
41+
return new TimeSeriesAggregationOperator(
42+
timeBucket,
43+
aggregators,
44+
() -> BlockHash.build(
45+
groups,
46+
driverContext.blockFactory(),
47+
maxPageSize,
48+
true // we can enable optimizations as the inputs are vectors
49+
),
50+
driverContext
51+
);
5552
}
5653

5754
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

Lines changed: 0 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,19 @@
2222
import org.elasticsearch.compute.data.IntBlock;
2323
import org.elasticsearch.compute.data.IntVector;
2424
import org.elasticsearch.compute.data.LongBlock;
25-
import org.elasticsearch.compute.data.LongVector;
2625
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
2726
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
2827
import org.elasticsearch.compute.data.Page;
2928
import org.elasticsearch.compute.test.TestBlockFactory;
3029
import org.elasticsearch.core.Releasable;
3130
import org.elasticsearch.core.ReleasableIterator;
3231
import org.elasticsearch.core.Releasables;
33-
import org.elasticsearch.xpack.esql.core.util.Holder;
3432
import org.junit.After;
3533

3634
import java.util.ArrayList;
3735
import java.util.Arrays;
3836
import java.util.HashSet;
3937
import java.util.List;
40-
import java.util.Locale;
4138
import java.util.Set;
4239
import java.util.function.Consumer;
4340
import java.util.stream.IntStream;
@@ -1329,115 +1326,6 @@ public void close() {
13291326
}
13301327
}
13311328

1332-
public void testTimeSeriesBlockHash() {
1333-
long endTime = randomLongBetween(10_000_000, 20_000_000);
1334-
var hash1 = new TimeSeriesBlockHash(0, 1, blockFactory);
1335-
var hash2 = BlockHash.build(
1336-
List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.LONG)),
1337-
blockFactory,
1338-
32 * 1024,
1339-
forcePackedHash
1340-
);
1341-
int numPages = between(1, 100);
1342-
int globalTsid = -1;
1343-
long timestamp = endTime;
1344-
try (hash1; hash2) {
1345-
for (int p = 0; p < numPages; p++) {
1346-
int numRows = between(1, 1000);
1347-
if (randomBoolean()) {
1348-
timestamp -= between(0, 100);
1349-
}
1350-
try (
1351-
BytesRefVector.Builder dictBuilder = blockFactory.newBytesRefVectorBuilder(numRows);
1352-
IntVector.Builder ordinalBuilder = blockFactory.newIntVectorBuilder(numRows);
1353-
LongVector.Builder timestampsBuilder = blockFactory.newLongVectorBuilder(numRows)
1354-
) {
1355-
int perPageOrd = -1;
1356-
for (int i = 0; i < numRows; i++) {
1357-
boolean newGroup = globalTsid == -1 || randomInt(100) < 10;
1358-
if (newGroup) {
1359-
globalTsid++;
1360-
timestamp = endTime;
1361-
if (randomBoolean()) {
1362-
timestamp -= between(0, 1000);
1363-
}
1364-
}
1365-
if (perPageOrd == -1 || newGroup) {
1366-
perPageOrd++;
1367-
dictBuilder.appendBytesRef(new BytesRef(String.format(Locale.ROOT, "id-%06d", globalTsid)));
1368-
}
1369-
ordinalBuilder.appendInt(perPageOrd);
1370-
if (randomInt(100) < 20) {
1371-
timestamp -= between(1, 10);
1372-
}
1373-
timestampsBuilder.appendLong(timestamp);
1374-
}
1375-
try (
1376-
var tsidBlock = new OrdinalBytesRefVector(ordinalBuilder.build(), dictBuilder.build()).asBlock();
1377-
var timestampBlock = timestampsBuilder.build().asBlock()
1378-
) {
1379-
Page page = new Page(tsidBlock, timestampBlock);
1380-
Holder<IntVector> ords1 = new Holder<>();
1381-
hash1.add(page, new GroupingAggregatorFunction.AddInput() {
1382-
@Override
1383-
public void add(int positionOffset, IntBlock groupIds) {
1384-
throw new AssertionError("time-series block hash should emit a vector");
1385-
}
1386-
1387-
@Override
1388-
public void add(int positionOffset, IntVector groupIds) {
1389-
groupIds.incRef();
1390-
ords1.set(groupIds);
1391-
}
1392-
1393-
@Override
1394-
public void close() {
1395-
1396-
}
1397-
});
1398-
Holder<IntVector> ords2 = new Holder<>();
1399-
hash2.add(page, new GroupingAggregatorFunction.AddInput() {
1400-
@Override
1401-
public void add(int positionOffset, IntBlock groupIds) {
1402-
// TODO: check why PackedValuesBlockHash doesn't emit a vector?
1403-
IntVector vector = groupIds.asVector();
1404-
assertNotNull("should emit a vector", vector);
1405-
vector.incRef();
1406-
ords2.set(vector);
1407-
}
1408-
1409-
@Override
1410-
public void add(int positionOffset, IntVector groupIds) {
1411-
groupIds.incRef();
1412-
ords2.set(groupIds);
1413-
}
1414-
1415-
@Override
1416-
public void close() {
1417-
1418-
}
1419-
});
1420-
try {
1421-
assertThat("input=" + page, ords1.get(), equalTo(ords2.get()));
1422-
} finally {
1423-
Releasables.close(ords1.get(), ords2.get());
1424-
}
1425-
}
1426-
}
1427-
}
1428-
Block[] keys1 = null;
1429-
Block[] keys2 = null;
1430-
try {
1431-
keys1 = hash1.getKeys();
1432-
keys2 = hash2.getKeys();
1433-
assertThat(keys1, equalTo(keys2));
1434-
} finally {
1435-
Releasables.close(keys1);
1436-
Releasables.close(keys2);
1437-
}
1438-
}
1439-
}
1440-
14411329
record OrdsAndKeys(String description, int positionOffset, IntBlock ords, Block[] keys, IntVector nonEmpty) {}
14421330

14431331
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.compute.operator.EvalOperator;
1919
import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
2020
import org.elasticsearch.compute.operator.Operator;
21+
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
2122
import org.elasticsearch.index.analysis.AnalysisRegistry;
2223
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
2324
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -174,12 +175,12 @@ else if (aggregatorMode.isOutputPartial()) {
174175
);
175176
// time-series aggregation
176177
if (aggregateExec instanceof TimeSeriesAggregateExec ts) {
177-
operatorFactory = timeSeriesAggregatorOperatorFactory(
178-
ts,
178+
operatorFactory = new TimeSeriesAggregationOperator.Factory(
179+
ts.timeBucketRounding(context.foldCtx()),
180+
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
179181
aggregatorMode,
180182
aggregatorFactories,
181-
groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(),
182-
context
183+
context.pageSize(aggregateExec.estimatedRowSize())
183184
);
184185
// ordinal grouping
185186
} else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
@@ -378,12 +379,4 @@ public abstract Operator.OperatorFactory ordinalGroupingOperatorFactory(
378379
ElementType groupType,
379380
LocalExecutionPlannerContext context
380381
);
381-
382-
public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactory(
383-
TimeSeriesAggregateExec ts,
384-
AggregatorMode aggregatorMode,
385-
List<GroupingAggregator.Factory> aggregatorFactories,
386-
List<BlockHash.GroupSpec> groupSpecs,
387-
LocalExecutionPlannerContext context
388-
);
389382
}

0 commit comments

Comments
 (0)