Skip to content

Commit b5d267d

Browse files
committed
agg metric block
1 parent 7ddc8d9 commit b5d267d

34 files changed

+483
-127
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ static TransportVersion def(int id) {
228228
public static final TransportVersion DENSE_VECTOR_OFF_HEAP_STATS = def(9_062_00_0);
229229
public static final TransportVersion RANDOM_SAMPLER_QUERY_BUILDER = def(9_063_0_00);
230230
public static final TransportVersion SETTINGS_IN_DATA_STREAMS = def(9_064_0_00);
231+
public static final TransportVersion AGGREGATE_METRIC_DOUBLE_BLOCK = def(9_065_0_00);
231232

232233
/*
233234
* STOP! READ THIS FIRST! No, really,
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.data;
9+
10+
import org.apache.lucene.util.Accountable;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.unit.ByteSizeValue;
14+
import org.elasticsearch.core.ReleasableIterator;
15+
import org.elasticsearch.core.Releasables;
16+
17+
import java.io.IOException;
18+
import java.util.List;
19+
import java.util.Objects;
20+
import java.util.stream.Stream;
21+
22+
import static org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.Metric.SUM;
23+
24+
public final class AggregateMetricDoubleBlock extends AbstractNonThreadSafeRefCounted implements Block {
25+
private final DoubleBlock minBlock;
26+
private final DoubleBlock maxBlock;
27+
private final DoubleBlock sumBlock;
28+
private final IntBlock countBlock;
29+
private final int positionCount;
30+
31+
public AggregateMetricDoubleBlock(DoubleBlock minBlock, DoubleBlock maxBlock, DoubleBlock sumBlock, IntBlock countBlock) {
32+
this.minBlock = minBlock;
33+
this.maxBlock = maxBlock;
34+
this.sumBlock = sumBlock;
35+
this.countBlock = countBlock;
36+
this.positionCount = minBlock.getPositionCount();
37+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
38+
if (b.getPositionCount() != positionCount) {
39+
assert false : "expected positionCount=" + positionCount + " but was " + b;
40+
throw new IllegalArgumentException("expected positionCount=" + positionCount + " but was " + b);
41+
}
42+
if (b.isReleased()) {
43+
assert false : "can't build aggregate_metric_double block out of released blocks but [" + b + "] was released";
44+
throw new IllegalArgumentException(
45+
"can't build aggregate_metric_double block out of released blocks but [" + b + "] was released"
46+
);
47+
}
48+
}
49+
}
50+
51+
public static AggregateMetricDoubleBlock fromCompositeBlock(CompositeBlock block) {
52+
assert block.getBlockCount() == 4
53+
: "Can't make AggregateMetricDoubleBlock out of CompositeBlock with " + block.getBlockCount() + " blocks";
54+
DoubleBlock min = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
55+
DoubleBlock max = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
56+
DoubleBlock sum = block.getBlock(SUM.getIndex());
57+
IntBlock count = block.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
58+
return new AggregateMetricDoubleBlock(min, max, sum, count);
59+
}
60+
61+
@Override
62+
protected void closeInternal() {
63+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
64+
}
65+
66+
@Override
67+
public Vector asVector() {
68+
return null;
69+
}
70+
71+
@Override
72+
public int getTotalValueCount() {
73+
int totalValueCount = 0;
74+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
75+
totalValueCount += b.getTotalValueCount();
76+
}
77+
return totalValueCount;
78+
}
79+
80+
@Override
81+
public int getPositionCount() {
82+
return positionCount;
83+
}
84+
85+
@Override
86+
public int getFirstValueIndex(int position) {
87+
return minBlock.getFirstValueIndex(position);
88+
}
89+
90+
@Override
91+
public int getValueCount(int position) {
92+
int max = 0;
93+
for (Block b : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
94+
max = Math.max(max, b.getValueCount(position));
95+
}
96+
return max;
97+
}
98+
99+
@Override
100+
public ElementType elementType() {
101+
return ElementType.AGGREGATE_METRIC_DOUBLE;
102+
}
103+
104+
@Override
105+
public BlockFactory blockFactory() {
106+
return minBlock.blockFactory();
107+
}
108+
109+
@Override
110+
public void allowPassingToDifferentDriver() {
111+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
112+
block.allowPassingToDifferentDriver();
113+
}
114+
}
115+
116+
@Override
117+
public boolean isNull(int position) {
118+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
119+
if (block.isNull(position) == false) {
120+
return false;
121+
}
122+
}
123+
return true;
124+
}
125+
126+
@Override
127+
public boolean mayHaveNulls() {
128+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveNulls);
129+
}
130+
131+
@Override
132+
public boolean areAllValuesNull() {
133+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).allMatch(Block::areAllValuesNull);
134+
}
135+
136+
@Override
137+
public boolean mayHaveMultivaluedFields() {
138+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields);
139+
}
140+
141+
@Override
142+
public boolean doesHaveMultivaluedFields() {
143+
if (false == Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::mayHaveMultivaluedFields)) {
144+
return false;
145+
}
146+
return Stream.of(minBlock, maxBlock, sumBlock, countBlock).anyMatch(Block::doesHaveMultivaluedFields);
147+
}
148+
149+
@Override
150+
public Block filter(int... positions) {
151+
AggregateMetricDoubleBlock result = null;
152+
DoubleBlock newMinBlock = null;
153+
DoubleBlock newMaxBlock = null;
154+
DoubleBlock newSumBlock = null;
155+
IntBlock newCountBlock = null;
156+
try {
157+
newMinBlock = minBlock.filter(positions);
158+
newMaxBlock = maxBlock.filter(positions);
159+
newSumBlock = sumBlock.filter(positions);
160+
newCountBlock = countBlock.filter(positions);
161+
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
162+
return result;
163+
} finally {
164+
if (result == null) {
165+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
166+
}
167+
}
168+
}
169+
170+
@Override
171+
public Block keepMask(BooleanVector mask) {
172+
AggregateMetricDoubleBlock result = null;
173+
DoubleBlock newMinBlock = null;
174+
DoubleBlock newMaxBlock = null;
175+
DoubleBlock newSumBlock = null;
176+
IntBlock newCountBlock = null;
177+
try {
178+
newMinBlock = minBlock.keepMask(mask);
179+
newMaxBlock = maxBlock.keepMask(mask);
180+
newSumBlock = sumBlock.keepMask(mask);
181+
newCountBlock = countBlock.keepMask(mask);
182+
result = new AggregateMetricDoubleBlock(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
183+
return result;
184+
} finally {
185+
if (result == null) {
186+
Releasables.close(newMinBlock, newMaxBlock, newSumBlock, newCountBlock);
187+
}
188+
}
189+
}
190+
191+
@Override
192+
public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
193+
// TODO: support
194+
throw new UnsupportedOperationException("can't lookup values from AggregateMetricDoubleBlock");
195+
}
196+
197+
@Override
198+
public MvOrdering mvOrdering() {
199+
return MvOrdering.UNORDERED;
200+
}
201+
202+
@Override
203+
public Block expand() {
204+
throw new UnsupportedOperationException("AggregateMetricDoubleBlock");
205+
}
206+
207+
@Override
208+
public void writeTo(StreamOutput out) throws IOException {
209+
for (Block block : List.of(minBlock, maxBlock, sumBlock, countBlock)) {
210+
Block.writeTypedBlock(block, out);
211+
}
212+
}
213+
214+
@Override
215+
public long ramBytesUsed() {
216+
return minBlock.ramBytesUsed() + maxBlock.ramBytesUsed() + sumBlock.ramBytesUsed() + countBlock.ramBytesUsed();
217+
}
218+
219+
@Override
220+
public boolean equals(Object o) {
221+
if (this == o) return true;
222+
if (o == null || getClass() != o.getClass()) return false;
223+
AggregateMetricDoubleBlock that = (AggregateMetricDoubleBlock) o;
224+
return positionCount == that.positionCount
225+
&& minBlock.equals(that.minBlock)
226+
&& maxBlock.equals(that.maxBlock)
227+
&& sumBlock.equals(that.sumBlock)
228+
&& countBlock.equals(that.countBlock);
229+
}
230+
231+
@Override
232+
public int hashCode() {
233+
return Objects.hash(
234+
DoubleBlock.hash(minBlock),
235+
DoubleBlock.hash(maxBlock),
236+
DoubleBlock.hash(sumBlock),
237+
IntBlock.hash(countBlock),
238+
positionCount
239+
);
240+
}
241+
242+
public static Block readFrom(StreamInput in) throws IOException {
243+
boolean success = false;
244+
DoubleBlock minBlock = null;
245+
DoubleBlock maxBlock = null;
246+
DoubleBlock sumBlock = null;
247+
IntBlock countBlock = null;
248+
BlockStreamInput blockStreamInput = (BlockStreamInput) in;
249+
try {
250+
minBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
251+
maxBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
252+
sumBlock = (DoubleBlock) Block.readTypedBlock(blockStreamInput);
253+
countBlock = (IntBlock) Block.readTypedBlock(blockStreamInput);
254+
AggregateMetricDoubleBlock result = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
255+
success = true;
256+
return result;
257+
} finally {
258+
if (success == false) {
259+
Releasables.close(minBlock, maxBlock, sumBlock, countBlock);
260+
}
261+
}
262+
}
263+
264+
public DoubleBlock minBlock() {
265+
return minBlock;
266+
}
267+
268+
public DoubleBlock maxBlock() {
269+
return maxBlock;
270+
}
271+
272+
public DoubleBlock sumBlock() {
273+
return sumBlock;
274+
}
275+
276+
public IntBlock countBlock() {
277+
return countBlock;
278+
}
279+
280+
public Block getMetricBlock(int index) {
281+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()) {
282+
return minBlock;
283+
}
284+
if (index == AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()) {
285+
return maxBlock;
286+
}
287+
if (index == AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()) {
288+
return sumBlock;
289+
}
290+
if (index == AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()) {
291+
return countBlock;
292+
}
293+
throw new UnsupportedOperationException("Received an index (" + index + ") outside of range for AggregateMetricDoubleBlock.");
294+
}
295+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateMetricDoubleBlockBuilder.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,22 @@ protected int elementSize() {
5959
}
6060

6161
@Override
62-
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
62+
public Block.Builder copyFrom(Block b, int beginInclusive, int endExclusive) {
6363
Block minBlock;
6464
Block maxBlock;
6565
Block sumBlock;
6666
Block countBlock;
67-
if (block.areAllValuesNull()) {
68-
minBlock = block;
69-
maxBlock = block;
70-
sumBlock = block;
71-
countBlock = block;
67+
if (b.areAllValuesNull()) {
68+
minBlock = b;
69+
maxBlock = b;
70+
sumBlock = b;
71+
countBlock = b;
7272
} else {
73-
CompositeBlock composite = (CompositeBlock) block;
74-
minBlock = composite.getBlock(Metric.MIN.getIndex());
75-
maxBlock = composite.getBlock(Metric.MAX.getIndex());
76-
sumBlock = composite.getBlock(Metric.SUM.getIndex());
77-
countBlock = composite.getBlock(Metric.COUNT.getIndex());
73+
AggregateMetricDoubleBlock block = (AggregateMetricDoubleBlock) b;
74+
minBlock = block.minBlock();
75+
maxBlock = block.maxBlock();
76+
sumBlock = block.sumBlock();
77+
countBlock = block.countBlock();
7878
}
7979
minBuilder.copyFrom(minBlock, beginInclusive, endExclusive);
8080
maxBuilder.copyFrom(maxBlock, beginInclusive, endExclusive);
@@ -103,20 +103,23 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
103103

104104
@Override
105105
public Block build() {
106-
Block[] blocks = new Block[4];
106+
DoubleBlock minBlock = null;
107+
DoubleBlock maxBlock = null;
108+
DoubleBlock sumBlock = null;
109+
IntBlock countBlock = null;
107110
boolean success = false;
108111
try {
109112
finish();
110-
blocks[Metric.MIN.getIndex()] = minBuilder.build();
111-
blocks[Metric.MAX.getIndex()] = maxBuilder.build();
112-
blocks[Metric.SUM.getIndex()] = sumBuilder.build();
113-
blocks[Metric.COUNT.getIndex()] = countBuilder.build();
114-
CompositeBlock block = new CompositeBlock(blocks);
113+
minBlock = minBuilder.build();
114+
maxBlock = maxBuilder.build();
115+
sumBlock = sumBuilder.build();
116+
countBlock = countBuilder.build();
117+
AggregateMetricDoubleBlock block = new AggregateMetricDoubleBlock(minBlock, maxBlock, sumBlock, countBlock);
115118
success = true;
116119
return block;
117120
} finally {
118121
if (success == false) {
119-
Releasables.closeExpectNoException(blocks);
122+
Releasables.closeExpectNoException(minBlock, maxBlock, sumBlock, countBlock);
120123
}
121124
}
122125
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.util.Accountable;
1111
import org.apache.lucene.util.RamUsageEstimator;
12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.common.io.stream.Writeable;
1415
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -343,8 +344,14 @@ static Block[] buildAll(Block.Builder... builders) {
343344
* This should be paired with {@link #readTypedBlock(BlockStreamInput)}
344345
*/
345346
static void writeTypedBlock(Block block, StreamOutput out) throws IOException {
346-
block.elementType().writeTo(out);
347-
block.writeTo(out);
347+
if (out.getTransportVersion().before(TransportVersions.AGGREGATE_METRIC_DOUBLE_BLOCK)
348+
&& block.elementType() == ElementType.AGGREGATE_METRIC_DOUBLE) {
349+
ElementType.COMPOSITE.writeTo(out);
350+
CompositeBlock.fromAggregateMetricDoubleBlock((AggregateMetricDoubleBlock) block).writeTo(out);
351+
} else {
352+
block.elementType().writeTo(out);
353+
block.writeTo(out);
354+
}
348355
}
349356

350357
/**

0 commit comments

Comments
 (0)