Skip to content

Commit 8c18619

Browse files
authored
[8.19] [ES|QL] Introduce AggregateMetricDoubleBlock (#127299) (#127623)
* [8.19] [ES|QL] Introduce AggregateMetricDoubleBlock (#127299) Backports the following commits to 8.19: - [ES|QL] Introduce AggregateMetricDoubleBlock (#127299) * borrow test skip from b563145
1 parent 98b0dc1 commit 8c18619

File tree

40 files changed

+511
-131
lines changed

40 files changed

+511
-131
lines changed

docs/changelog/127299.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127299
2+
summary: Introduce `AggregateMetricDoubleBlock`
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ static TransportVersion def(int id) {
210210
public static final TransportVersion ML_INFERENCE_SAGEMAKER_8_19 = def(8_841_0_21);
211211
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES_BACKPORT_8_19 = def(8_841_0_22);
212212
public static final TransportVersion PINNED_RETRIEVER_8_19 = def(8_841_0_23);
213+
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_BLOCK_8_19 = def(8_841_0_24);
213214

214215
/*
215216
* STOP! READ THIS FIRST! No, really,
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.data;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.core.ReleasableIterator;
14+
import org.elasticsearch.core.Releasables;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Objects;
19+
import java.util.stream.Stream;
20+
21+
public final class AggregateMetricDoubleBlock extends AbstractNonThreadSafeRefCounted implements Block {
22+
private final DoubleBlock minBlock;
23+
private final DoubleBlock maxBlock;
24+
private final DoubleBlock sumBlock;
25+
private final IntBlock countBlock;
26+
private final int positionCount;
27+
28+
public AggregateMetricDoubleBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
29+
this.minBlock = minBlock;
30+
this.maxBlock = maxBlock;
31+
this.sumBlock = sumBlock;
32+
this.countBlock = countBlock;
33+
this.positionCount = minBlock.getPositionCount();
34+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
35+
if (b.getPositionCount() != positionCount) {
36+
assert false : "expected positionCount=" + positionCount + " but was " + b;
37+
throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b);
38+
}
39+
if (b.isReleased()) {
40+
assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released";
41+
throw new IllegalArgumentException(
42+
"can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"
43+
);
44+
}
45+
}
46+
}
47+
48+
public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block) {
49+
assert block.getBlockCount() == 4
50+
: "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks";
51+
DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
52+
DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
53+
DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
54+
IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
55+
return new AggregateMetricDoubleBlock(min, max, sum, count);
56+
}
57+
58+
public CompositeBlock asCompositeBlock() {
59+
final Block[] blocks = new Block[4];
60+
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = minBlock;
61+
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = maxBlock;
62+
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = sumBlock;
63+
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = countBlock;
64+
return new CompositeBlock(blocks);
65+
}
66+
67+
@Override
68+
protected void closeInternal() {
69+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
70+
}
71+
72+
@Override
73+
public Vector asVector() {
74+
return null;
75+
}
76+
77+
@Override
78+
public int getTotalValueCount() {
79+
int totalValueCount = 0;
80+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
81+
totalValueCount += b.getTotalValueCount();
82+
}
83+
return totalValueCount;
84+
}
85+
86+
@Override
87+
public int getPositionCount() {
88+
return positionCount;
89+
}
90+
91+
@Override
92+
public int getFirstValueIndex(int position) {
93+
return minBlock.getFirstValueIndex(position);
94+
}
95+
96+
@Override
97+
public int getValueCount(int position) {
98+
int max = 0;
99+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
100+
max = Math.max(max, b.getValueCount(position));
101+
}
102+
return max;
103+
}
104+
105+
@Override
106+
public ElementType elementType() {
107+
return ElementType.AGGREGATE_METRIC_DOUBLE;
108+
}
109+
110+
@Override
111+
public BlockFactory blockFactory() {
112+
return minBlock.blockFactory();
113+
}
114+
115+
@Override
116+
public void allowPassingToDifferentDriver() {
117+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
118+
block.allowPassingToDifferentDriver();
119+
}
120+
}
121+
122+
@Override
123+
public boolean isNull(int position) {
124+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
125+
if (block.isNull(position) == false) {
126+
return false;
127+
}
128+
}
129+
return true;
130+
}
131+
132+
@Override
133+
public boolean mayHaveNulls() {
134+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls);
135+
}
136+
137+
@Override
138+
public boolean areAllValuesNull() {
139+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull);
140+
}
141+
142+
@Override
143+
public boolean mayHaveMultivaluedFields() {
144+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields);
145+
}
146+
147+
@Override
148+
public boolean doesHaveMultivaluedFields() {
149+
if (Stream.of(minBlock, maxBlock, sumBlock, countBlock).noneMatch(Block::mayHaveMultivaluedFields)) {
150+
return false;
151+
}
152+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
153+
}
154+
155+
@Override
156+
public Block filter(int... positions) {
157+
AggregateMetricDoubleBlock result = null;
158+
DoubleBlock newMinBlock = null;
159+
DoubleBlock newMaxBlock = null;
160+
DoubleBlock newSumBlock = null;
161+
IntBlock newCountBlock = null;
162+
try {
163+
newMinBlock = minBlock.filter(positions);
164+
newMaxBlock = maxBlock.filter(positions);
165+
newSumBlock = sumBlock.filter(positions);
166+
newCountBlock = countBlock.filter(positions);
167+
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
168+
return result;
169+
} finally {
170+
if (result == null) {
171+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
172+
}
173+
}
174+
}
175+
176+
@Override
177+
public Block keepMask(BooleanVector mask) {
178+
AggregateMetricDoubleBlock result = null;
179+
DoubleBlock newMinBlock = null;
180+
DoubleBlock newMaxBlock = null;
181+
DoubleBlock newSumBlock = null;
182+
IntBlock newCountBlock = null;
183+
try {
184+
newMinBlock = minBlock.keepMask(mask);
185+
newMaxBlock = maxBlock.keepMask(mask);
186+
newSumBlock = sumBlock.keepMask(mask);
187+
newCountBlock = countBlock.keepMask(mask);
188+
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
189+
return result;
190+
} finally {
191+
if (result == null) {
192+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
193+
}
194+
}
195+
}
196+
197+
@Override
198+
public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
199+
// TODO: support
200+
throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock");
201+
}
202+
203+
@Override
204+
public MvOrdering mvOrdering() {
205+
// TODO: determine based on sub-blocks
206+
return MvOrdering.UNORDERED;
207+
}
208+
209+
@Override
210+
public Block expand() {
211+
// TODO: support
212+
throw new UnsupportedOperationException("AggregateMetricDoubleBlock");
213+
}
214+
215+
@Override
216+
public void writeTo(StreamOutput out) throws IOException {
217+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
218+
block.writeTo(out);
219+
}
220+
}
221+
222+
public static Block readFrom(StreamInput in) throws IOException {
223+
boolean success = false;
224+
DoubleBlock minBlock = null;
225+
DoubleBlock maxBlock = null;
226+
DoubleBlock sumBlock = null;
227+
IntBlock countBlock = null;
228+
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
229+
try {
230+
minBlock = DoubleBlock.readFrom(blockStreamInput);
231+
maxBlock = DoubleBlock.readFrom(blockStreamInput);
232+
sumBlock = DoubleBlock.readFrom(blockStreamInput);
233+
countBlock = IntBlock.readFrom(blockStreamInput);
234+
AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
235+
success = true;
236+
return result;
237+
} finally {
238+
if (success == false) {
239+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
240+
}
241+
}
242+
}
243+
244+
@Override
245+
public long ramBytesUsed() {
246+
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
247+
}
248+
249+
@Override
250+
public boolean equals(Object o) {
251+
if (this == o) return true;
252+
if (o == null || getClass() != o.getClass()) return false;
253+
AggregateMetricDoubleBlock that = (AggregateMetricDoubleBlock) o;
254+
return positionCount == that.positionCount
255+
&& minBlock.equals(that.minBlock)
256+
&& maxBlock.equals(that.maxBlock)
257+
&& sumBlock.equals(that.sumBlock)
258+
&& countBlock.equals(that.countBlock);
259+
}
260+
261+
@Override
262+
public int hashCode() {
263+
return Objects.hash(
264+
DoubleBlock.hash(minBlock),
265+
DoubleBlock.hash(maxBlock),
266+
DoubleBlock.hash(sumBlock),
267+
IntBlock.hash(countBlock),
268+
positionCount
269+
);
270+
}
271+
272+
public DoubleBlock minBlock() {
273+
return minBlock;
274+
}
275+
276+
public DoubleBlock maxBlock() {
277+
return maxBlock;
278+
}
279+
280+
public DoubleBlock sumBlock() {
281+
return sumBlock;
282+
}
283+
284+
public IntBlock countBlock() {
285+
return countBlock;
286+
}
287+
288+
public Block getMetricBlock(int index) {
289+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) {
290+
return minBlock;
291+
}
292+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) {
293+
return maxBlock;
294+
}
295+
if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) {
296+
return sumBlock;
297+
}
298+
if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) {
299+
return countBlock;
300+
}
301+
throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock.");
302+
}
303+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,22 @@ protected int elementSize() {
5959
}
6060

6161
@Override
62-
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
62+
public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) {
6363
Block minBlock;
6464
Block maxBlock;
6565
Block sumBlock;
6666
Block countBlock;
67-
if (block.areAllValuesNull()) {
68-
minBlock = block;
69-
maxBlock = block;
70-
sumBlock = block;
71-
countBlock = block;
67+
if (b.areAllValuesNull()) {
68+
minBlock = b;
69+
maxBlock = b;
70+
sumBlock = b;
71+
countBlock = b;
7272
} else {
73-
CompositeBlock composite = (CompositeBlock) block;
74-
minBlock = composite.getBlock(Metric.MIN.getIndex());
75-
maxBlock = composite.getBlock(Metric.MAX.getIndex());
76-
sumBlock = composite.getBlock(Metric.SUM.getIndex());
77-
countBlock = composite.getBlock(Metric.COUNT.getIndex());
73+
AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b;
74+
minBlock = block.minBlock();
75+
maxBlock = block.maxBlock();
76+
sumBlock = block.sumBlock();
77+
countBlock = block.countBlock();
7878
}
7979
minBuilder.copyFrom(minBlock, beginInclusive, endExclusive);
8080
maxBuilder.copyFrom(maxBlock, beginInclusive, endExclusive);
@@ -103,20 +103,23 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
103103

104104
@Override
105105
public Block build() {
106-
Block[] blocks = new Block[4];
106+
DoubleBlock minBlock = null;
107+
DoubleBlock maxBlock = null;
108+
DoubleBlock sumBlock = null;
109+
IntBlock countBlock = null;
107110
boolean success = false;
108111
try {
109112
finish();
110-
blocks[Metric.MIN.getIndex()] = minBuilder.build();
111-
blocks[Metric.MAX.getIndex()] = maxBuilder.build();
112-
blocks[Metric.SUM.getIndex()] = sumBuilder.build();
113-
blocks[Metric.COUNT.getIndex()] = countBuilder.build();
114-
CompositeBlock block = new CompositeBlock(blocks);
113+
minBlock = minBuilder.build();
114+
maxBlock = maxBuilder.build();
115+
sumBlock = sumBuilder.build();
116+
countBlock = countBuilder.build();
117+
AggregateMetricDoubleBlock block = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
115118
success = true;
116119
return block;
117120
} finally {
118121
if (success == false) {
119-
Releasables.closeExpectNoException(blocks);
122+
Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock);
120123
}
121124
}
122125
}

0 commit comments

Comments
 (0)