Skip to content

Commit daba3fc

Browse files
committed
agg metric block
1 parent 7ddc8d9 commit daba3fc

34 files changed

+481
-127
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ static TransportVersion def(int id) {
228228
public static final TransportVersion DENSE_VECTOR_OFF_HEAP_STATS = def(9_062_00_0);
229229
public static final TransportVersion RANDOM_SAMPLER_QUERY_BUILDER = def(9_063_0_00);
230230
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_0_00);
231+
public static final TransportVersion AGGREGATE_METRIC_DOUBLE_BLOCK = def(9_065_0_00);
231232

232233
/*
233234
* STOP! READ THIS FIRST! No, really,
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.data;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
13+
import org.elasticsearch.core.ReleasableIterator;
14+
import org.elasticsearch.core.Releasables;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Objects;
19+
import java.util.stream.Stream;
20+
21+
public final class AggregateMetricDoubleBlock extends AbstractNonThreadSafeRefCounted implements Block {
22+
private final DoubleBlock minBlock;
23+
private final DoubleBlock maxBlock;
24+
private final DoubleBlock sumBlock;
25+
private final IntBlock countBlock;
26+
private final int positionCount;
27+
28+
public AggregateMetricDoubleBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
29+
this.minBlock = minBlock;
30+
this.maxBlock = maxBlock;
31+
this.sumBlock = sumBlock;
32+
this.countBlock = countBlock;
33+
this.positionCount = minBlock.getPositionCount();
34+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
35+
if (b.getPositionCount() != positionCount) {
36+
assert false : "expected positionCount=" + positionCount + " but was " + b;
37+
throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b);
38+
}
39+
if (b.isReleased()) {
40+
assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released";
41+
throw new IllegalArgumentException(
42+
"can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"
43+
);
44+
}
45+
}
46+
}
47+
48+
public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block) {
49+
assert block.getBlockCount() == 4
50+
: "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks";
51+
DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
52+
DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
53+
DoubleBlock sum = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
54+
IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
55+
return new AggregateMetricDoubleBlock(min, max, sum, count);
56+
}
57+
58+
@Override
59+
protected void closeInternal() {
60+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
61+
}
62+
63+
@Override
64+
public Vector asVector() {
65+
return null;
66+
}
67+
68+
@Override
69+
public int getTotalValueCount() {
70+
int totalValueCount = 0;
71+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
72+
totalValueCount += b.getTotalValueCount();
73+
}
74+
return totalValueCount;
75+
}
76+
77+
@Override
78+
public int getPositionCount() {
79+
return positionCount;
80+
}
81+
82+
@Override
83+
public int getFirstValueIndex(int position) {
84+
return minBlock.getFirstValueIndex(position);
85+
}
86+
87+
@Override
88+
public int getValueCount(int position) {
89+
int max = 0;
90+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
91+
max = Math.max(max, b.getValueCount(position));
92+
}
93+
return max;
94+
}
95+
96+
@Override
97+
public ElementType elementType() {
98+
return ElementType.AGGREGATE_METRIC_DOUBLE;
99+
}
100+
101+
@Override
102+
public BlockFactory blockFactory() {
103+
return minBlock.blockFactory();
104+
}
105+
106+
@Override
107+
public void allowPassingToDifferentDriver() {
108+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
109+
block.allowPassingToDifferentDriver();
110+
}
111+
}
112+
113+
@Override
114+
public boolean isNull(int position) {
115+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
116+
if (block.isNull(position) == false) {
117+
return false;
118+
}
119+
}
120+
return true;
121+
}
122+
123+
@Override
124+
public boolean mayHaveNulls() {
125+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls);
126+
}
127+
128+
@Override
129+
public boolean areAllValuesNull() {
130+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull);
131+
}
132+
133+
@Override
134+
public boolean mayHaveMultivaluedFields() {
135+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields);
136+
}
137+
138+
@Override
139+
public boolean doesHaveMultivaluedFields() {
140+
if (false == Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields)) {
141+
return false;
142+
}
143+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
144+
}
145+
146+
@Override
147+
public Block filter(int... positions) {
148+
AggregateMetricDoubleBlock result = null;
149+
DoubleBlock newMinBlock = null;
150+
DoubleBlock newMaxBlock = null;
151+
DoubleBlock newSumBlock = null;
152+
IntBlock newCountBlock = null;
153+
try {
154+
newMinBlock = minBlock.filter(positions);
155+
newMaxBlock = maxBlock.filter(positions);
156+
newSumBlock = sumBlock.filter(positions);
157+
newCountBlock = countBlock.filter(positions);
158+
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
159+
return result;
160+
} finally {
161+
if (result == null) {
162+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
163+
}
164+
}
165+
}
166+
167+
@Override
168+
public Block keepMask(BooleanVector mask) {
169+
AggregateMetricDoubleBlock result = null;
170+
DoubleBlock newMinBlock = null;
171+
DoubleBlock newMaxBlock = null;
172+
DoubleBlock newSumBlock = null;
173+
IntBlock newCountBlock = null;
174+
try {
175+
newMinBlock = minBlock.keepMask(mask);
176+
newMaxBlock = maxBlock.keepMask(mask);
177+
newSumBlock = sumBlock.keepMask(mask);
178+
newCountBlock = countBlock.keepMask(mask);
179+
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
180+
return result;
181+
} finally {
182+
if (result == null) {
183+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
184+
}
185+
}
186+
}
187+
188+
@Override
189+
public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
190+
// TODO: support
191+
throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock");
192+
}
193+
194+
@Override
195+
public MvOrdering mvOrdering() {
196+
return MvOrdering.UNORDERED;
197+
}
198+
199+
@Override
200+
public Block expand() {
201+
throw new UnsupportedOperationException("AggregateMetricDoubleBlock");
202+
}
203+
204+
@Override
205+
public void writeTo(StreamOutput out) throws IOException {
206+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
207+
Block.writeTypedBlock(block, out);
208+
}
209+
}
210+
211+
@Override
212+
public long ramBytesUsed() {
213+
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
214+
}
215+
216+
@Override
217+
public boolean equals(Object o) {
218+
if (this == o) return true;
219+
if (o == null || getClass() != o.getClass()) return false;
220+
AggregateMetricDoubleBlock that = (AggregateMetricDoubleBlock) o;
221+
return positionCount == that.positionCount
222+
&& minBlock.equals(that.minBlock)
223+
&& maxBlock.equals(that.maxBlock)
224+
&& sumBlock.equals(that.sumBlock)
225+
&& countBlock.equals(that.countBlock);
226+
}
227+
228+
@Override
229+
public int hashCode() {
230+
return Objects.hash(
231+
DoubleBlock.hash(minBlock),
232+
DoubleBlock.hash(maxBlock),
233+
DoubleBlock.hash(sumBlock),
234+
IntBlock.hash(countBlock),
235+
positionCount
236+
);
237+
}
238+
239+
public static Block readFrom(StreamInput in) throws IOException {
240+
boolean success = false;
241+
DoubleBlock minBlock = null;
242+
DoubleBlock maxBlock = null;
243+
DoubleBlock sumBlock = null;
244+
IntBlock countBlock = null;
245+
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
246+
try {
247+
minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
248+
maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
249+
sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
250+
countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput);
251+
AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
252+
success = true;
253+
return result;
254+
} finally {
255+
if (success == false) {
256+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
257+
}
258+
}
259+
}
260+
261+
public DoubleBlock minBlock() {
262+
return minBlock;
263+
}
264+
265+
public DoubleBlock maxBlock() {
266+
return maxBlock;
267+
}
268+
269+
public DoubleBlock sumBlock() {
270+
return sumBlock;
271+
}
272+
273+
public IntBlock countBlock() {
274+
return countBlock;
275+
}
276+
277+
public Block getMetricBlock(int index) {
278+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) {
279+
return minBlock;
280+
}
281+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) {
282+
return maxBlock;
283+
}
284+
if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) {
285+
return sumBlock;
286+
}
287+
if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) {
288+
return countBlock;
289+
}
290+
throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock.");
291+
}
292+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,22 @@ protected int elementSize() {
5959
}
6060

6161
@Override
62-
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
62+
public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) {
6363
Block minBlock;
6464
Block maxBlock;
6565
Block sumBlock;
6666
Block countBlock;
67-
if (block.areAllValuesNull()) {
68-
minBlock = block;
69-
maxBlock = block;
70-
sumBlock = block;
71-
countBlock = block;
67+
if (b.areAllValuesNull()) {
68+
minBlock = b;
69+
maxBlock = b;
70+
sumBlock = b;
71+
countBlock = b;
7272
} else {
73-
CompositeBlock composite = (CompositeBlock) block;
74-
minBlock = composite.getBlock(Metric.MIN.getIndex());
75-
maxBlock = composite.getBlock(Metric.MAX.getIndex());
76-
sumBlock = composite.getBlock(Metric.SUM.getIndex());
77-
countBlock = composite.getBlock(Metric.COUNT.getIndex());
73+
AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b;
74+
minBlock = block.minBlock();
75+
maxBlock = block.maxBlock();
76+
sumBlock = block.sumBlock();
77+
countBlock = block.countBlock();
7878
}
7979
minBuilder.copyFrom(minBlock, beginInclusive, endExclusive);
8080
maxBuilder.copyFrom(maxBlock, beginInclusive, endExclusive);
@@ -103,20 +103,23 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
103103

104104
@Override
105105
public Block build() {
106-
Block[] blocks = new Block[4];
106+
DoubleBlock minBlock = null;
107+
DoubleBlock maxBlock = null;
108+
DoubleBlock sumBlock = null;
109+
IntBlock countBlock = null;
107110
boolean success = false;
108111
try {
109112
finish();
110-
blocks[Metric.MIN.getIndex()] = minBuilder.build();
111-
blocks[Metric.MAX.getIndex()] = maxBuilder.build();
112-
blocks[Metric.SUM.getIndex()] = sumBuilder.build();
113-
blocks[Metric.COUNT.getIndex()] = countBuilder.build();
114-
CompositeBlock block = new CompositeBlock(blocks);
113+
minBlock = minBuilder.build();
114+
maxBlock = maxBuilder.build();
115+
sumBlock = sumBuilder.build();
116+
countBlock = countBuilder.build();
117+
AggregateMetricDoubleBlock block = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
115118
success = true;
116119
return block;
117120
} finally {
118121
if (success == false) {
119-
Releasables.closeExpectNoException(blocks);
122+
Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock);
120123
}
121124
}
122125
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.util.Accountable;
1111
import org.apache.lucene.util.RamUsageEstimator;
12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.common.io.stream.Writeable;
1415
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -343,8 +344,14 @@ static Block[] buildAll(Block.Builder... builders) {
343344
* This should be paired with {@link #readTypedBlock(BlockStreamInput)}
344345
*/
345346
static void writeTypedBlock(Block block, StreamOutput out) throws IOException {
346-
block.elementType().writeTo(out);
347-
block.writeTo(out);
347+
if (out.getTransportVersion().before(TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK)
348+
&& block.elementType() == ElementType.AGGREGATE_METRIC_DOUBLE) {
349+
ElementType.COMPOSITE.writeTo(out);
350+
CompositeBlock.fromAggregateMetricDoubleBlock((AggregateMetricDoubleBlock) block).writeTo(out);
351+
} else {
352+
block.elementType().writeTo(out);
353+
block.writeTo(out);
354+
}
348355
}
349356

350357
/**

0 commit comments

Comments
 (0)