Skip to content

Commit 8223ca8

Browse files
committed
Add unit test
1 parent c513e83 commit 8223ca8

File tree

14 files changed

+173
-41
lines changed

14 files changed

+173
-41
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Deque;
2525
import java.util.Iterator;
2626
import java.util.List;
27+
import java.util.Optional;
2728

2829
/**
2930
* Find spikes, dips and change point in a list of values.
@@ -36,21 +37,21 @@ public class ChangePointOperator implements Operator {
3637

3738
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;
3839

39-
public record Factory(int metricChannel, int partitionChannel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
40+
public record Factory(int metricChannel, Optional<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
4041
@Override
4142
public Operator get(DriverContext driverContext) {
4243
return new ChangePointOperator(driverContext, metricChannel, partitionChannel, sourceText, sourceLine, sourceColumn);
4344
}
4445

4546
@Override
4647
public String describe() {
47-
return "ChangePointOperator[metricChannel=" + metricChannel + "]";
48+
return ChangePointOperator.describe(metricChannel, partitionChannel);
4849
}
4950
}
5051

5152
private final DriverContext driverContext;
5253
private final int metricChannel;
53-
private final int partitionChannel;
54+
private final Optional<Integer> partitionChannel;
5455
private final String sourceText;
5556
private final int sourceLine;
5657
private final int sourceColumn;
@@ -62,7 +63,7 @@ public String describe() {
6263

6364
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6465
// (by modularizing esql-core) and use that instead of the individual fields.
65-
public ChangePointOperator(DriverContext driverContext, int metricChannel, int partitionChannel, String sourceText, int sourceLine, int sourceColumn) {
66+
public ChangePointOperator(DriverContext driverContext, int metricChannel, Optional<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) {
6667
this.driverContext = driverContext;
6768
this.metricChannel = metricChannel;
6869
this.partitionChannel = partitionChannel;
@@ -126,9 +127,9 @@ private void createOutputPages() {
126127
private int checkValueCounts() {
127128
int maxValuesCount = 0;
128129
int valuesCount = 0;
129-
Block lastPartitionFieldValue = null;
130+
String lastPartitionFieldValue = null;
130131
for (Page inputPage : inputPages) {
131-
Block currentPartitionFieldValue = getCurrentPartitionFieldValue(inputPage);
132+
String currentPartitionFieldValue = getCurrentPartitionFieldValue(inputPage);
132133
if (lastPartitionFieldValue != null) {
133134
if (currentPartitionFieldValue.equals(lastPartitionFieldValue) == false) {
134135
valuesCount = 0;
@@ -154,9 +155,9 @@ private List<MlAggsHelper.DoubleBucketValues> checkNullAndMultivalued(int maxVal
154155
List<Integer> bucketIndexes = new ArrayList<>(maxValuesCount);
155156
boolean hasNulls = false;
156157
boolean hasMultivalued = false;
157-
Block lastPartitionFieldValue = null;
158+
String lastPartitionFieldValue = null;
158159
for (Page inputPage : inputPages) {
159-
Block currentPartitionFieldValue = getCurrentPartitionFieldValue(inputPage);
160+
String currentPartitionFieldValue = getCurrentPartitionFieldValue(inputPage);
160161
if (lastPartitionFieldValue != null) {
161162
if (currentPartitionFieldValue.equals(lastPartitionFieldValue) == false) {
162163
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
@@ -212,15 +213,15 @@ private void insertChangePoints(Iterable<ChangeType> changeTypes) {
212213
ChangeType changeType = changeTypesIterator.next();
213214
BlockFactory blockFactory = driverContext.blockFactory();
214215
int pageStartIndex = 0;
215-
Block lastPartitionFieldValue = null;
216+
String lastPartitionFieldValue = null;
216217
while (inputPages.isEmpty() == false) {
217218
Page inputPage = inputPages.peek();
218219
Page outputPage;
219220
Block changeTypeBlock = null;
220221
Block changePvalueBlock = null;
221222
boolean success = false;
222223

223-
Block currentPartitionFieldValue = getCurrentPartitionFieldValue(inputPage);
224+
String currentPartitionFieldValue = getCurrentPartitionFieldValue(inputPage);
224225
if (lastPartitionFieldValue != null) {
225226
if (currentPartitionFieldValue.equals(lastPartitionFieldValue) == false) {
226227
pageStartIndex = 0;
@@ -268,8 +269,13 @@ private void insertChangePoints(Iterable<ChangeType> changeTypes) {
268269
}
269270
}
270271

271-
private Block getCurrentPartitionFieldValue(Page inputPage) {
272-
return inputPage.getBlock(partitionChannel).filter(0);
272+
private String getCurrentPartitionFieldValue(Page inputPage) {
273+
assert partitionChannel.isPresent();
274+
assert inputPage.getPositionCount() > 0;
275+
try (var block = inputPage.getBlock(partitionChannel.get()).filter(0)) {
276+
BytesRef partition = ((BytesRefBlock) block).getBytesRef(0, new BytesRef());
277+
return partition.utf8ToString();
278+
}
273279
}
274280

275281
@Override
@@ -284,7 +290,11 @@ public void close() {
284290

285291
@Override
286292
public String toString() {
287-
return "ChangePointOperator[metricChannel=" + metricChannel + ", partitionChannel=" + partitionChannel + "]";
293+
return describe(metricChannel, partitionChannel);
294+
}
295+
296+
private static String describe(int metricChannel, Optional<Integer> partitionChannel) {
297+
return "ChangePointOperator[metricChannel=" + metricChannel + (partitionChannel.isPresent() ? ", partitionChannel=" + partitionChannel.get() : "") + "]";
288298
}
289299

290300
private Warnings warnings(boolean onlyWarnings) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.collect.Iterators;
1616
import org.elasticsearch.compute.data.Block;
1717
import org.elasticsearch.compute.data.BlockFactory;
18+
import org.elasticsearch.compute.data.BytesRefBlock;
1819
import org.elasticsearch.compute.data.ElementType;
1920
import org.elasticsearch.compute.data.Page;
2021
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
@@ -397,7 +398,7 @@ public void addInput(Page page) {
397398
spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
398399
spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
399400

400-
String partition = partitionByChannel.isPresent() ? page.getBlock(partitionByChannel.get()).asVector().filter(i).toString() : "-default-";
401+
String partition = partitionByChannel.isPresent() ? getCurrentPartitionFieldValue(page, i) : "-default-";
401402
Queue inputQueue = inputQueues.get(partition);
402403
if (inputQueue == null) {
403404
inputQueues.put(partition, inputQueue = new Queue(topCount));
@@ -411,6 +412,15 @@ public void addInput(Page page) {
411412
}
412413
}
413414

415+
private String getCurrentPartitionFieldValue(Page page, int i) {
416+
assert partitionByChannel.isPresent();
417+
assert page.getPositionCount() > 0;
418+
try (var block = page.getBlock(partitionByChannel.get()).filter(i)) {
419+
BytesRef partition = ((BytesRefBlock) block).getBytesRef(i, new BytesRef());
420+
return partition.utf8ToString();
421+
}
422+
}
423+
414424
@Override
415425
public void finish() {
416426
if (output == null) {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.BytesRefBlock;
13+
import org.elasticsearch.compute.data.DoubleBlock;
14+
import org.elasticsearch.compute.data.LongBlock;
15+
import org.elasticsearch.compute.data.Page;
16+
import org.elasticsearch.compute.test.OperatorTestCase;
17+
import org.elasticsearch.core.Tuple;
18+
import org.hamcrest.Matcher;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.hasSize;
26+
import static org.hamcrest.Matchers.lessThan;
27+
28+
public class ChangePointByOperatorTests extends OperatorTestCase {
29+
30+
@Override
31+
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
32+
// size must be in [25, 1000] for ChangePoint to function correctly and detect the step change.
33+
size = 25;//Math.clamp(size, 25, 1000);
34+
List<Tuple<Long, BytesRef>> data = new ArrayList<>(size);
35+
for (int i = 0; i < size; i++) { // step change 0 -> 100
36+
data.add(Tuple.tuple(i < size / 2 ? randomLongBetween(0, 3) : randomLongBetween(100, 103), new BytesRef("prod")));
37+
}
38+
for (int i = 0; i < size; i++) { // step change 300 -> 200
39+
data.add(Tuple.tuple(i < size / 2 ? randomLongBetween(300, 303) : randomLongBetween(200, 203), new BytesRef("staging")));
40+
}
41+
for (int i = 0; i < size; i++) { // spike 50 -> 500 -> 50
42+
data.add(Tuple.tuple(i == 2 * size / 3 ? randomLongBetween(500, 503) : randomLongBetween(50, 53), new BytesRef("qa")));
43+
}
44+
return new LongBytesRefTupleBlockSourceOperator(blockFactory, data, size);
45+
}
46+
47+
@Override
48+
protected void assertSimpleOutput(List<Page> input, List<Page> output) {
49+
assertThat(output, hasSize(input.size()));
50+
int rowCount = 0;
51+
List<Tuple<Integer, String>> actualChangePoints = new ArrayList<>();
52+
for (int i = 0; i < output.size(); i++) {
53+
Page inputPage = input.get(i);
54+
Page outputPage = output.get(i);
55+
assertThat(outputPage.getPositionCount(), equalTo(inputPage.getPositionCount()));
56+
assertThat(outputPage.getBlockCount(), equalTo(4));
57+
for (int j = 0; j < outputPage.getPositionCount(); j++) {
58+
long inputValue = ((LongBlock) inputPage.getBlock(0)).getLong(j);
59+
long outputValue = ((LongBlock) outputPage.getBlock(0)).getLong(j);
60+
String inputPartition = ((BytesRefBlock) inputPage.getBlock(1)).getBytesRef(j, new BytesRef()).utf8ToString();
61+
String outputPartition = ((BytesRefBlock) outputPage.getBlock(1)).getBytesRef(j, new BytesRef()).utf8ToString();
62+
assertThat(outputValue, equalTo(inputValue));
63+
assertThat(outputPartition, equalTo(inputPartition));
64+
if (outputPage.getBlock(2).isNull(j) == false) { // change point detected at this position
65+
String type = (((BytesRefBlock) outputPage.getBlock(2)).getBytesRef(j, new BytesRef())).utf8ToString();
66+
double pvalue = ((DoubleBlock) outputPage.getBlock(3)).getDouble(j);
67+
assertThat(pvalue, lessThan(1E-9));
68+
actualChangePoints.add(Tuple.tuple(rowCount, type));
69+
} else { // no change point at this position
70+
assertThat(outputPage.getBlock(3).isNull(j), equalTo(true));
71+
}
72+
rowCount++;
73+
}
74+
}
75+
assertThat(rowCount, equalTo(75));
76+
assertThat(
77+
actualChangePoints,
78+
equalTo(List.of(Tuple.tuple(12, "step_change"), Tuple.tuple(37, "step_change"), Tuple.tuple(66, "spike")))
79+
);
80+
}
81+
82+
@Override
83+
protected Operator.OperatorFactory simple() {
84+
return new ChangePointOperator.Factory(0, Optional.of(1), null,0, 0);
85+
}
86+
87+
@Override
88+
protected Matcher<String> expectedDescriptionOfSimple() {
89+
return equalTo("ChangePointOperator[metricChannel=0, partitionChannel=1]");
90+
}
91+
92+
@Override
93+
protected Matcher<String> expectedToStringOfSimple() {
94+
return equalTo("ChangePointOperator[metricChannel=0, partitionChannel=1]");
95+
}
96+
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.Optional;
2223

2324
import static org.hamcrest.Matchers.equalTo;
2425
import static org.hamcrest.Matchers.hasSize;
@@ -71,7 +72,7 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {
7172

7273
@Override
7374
protected Operator.OperatorFactory simple() {
74-
return new ChangePointOperator.Factory(0, null, 0, 0);
75+
return new ChangePointOperator.Factory(0, Optional.empty(), null, 0, 0);
7576
}
7677

7778
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.LinkedHashSet;
5353
import java.util.List;
5454
import java.util.Map;
55+
import java.util.Optional;
5556
import java.util.Set;
5657
import java.util.function.Function;
5758
import java.util.stream.Collectors;
@@ -134,6 +135,7 @@ protected TopNOperator.TopNOperatorFactory simple() {
134135
4,
135136
List.of(LONG),
136137
List.of(DEFAULT_UNSORTABLE),
138+
Optional.empty(),
137139
List.of(new TopNOperator.SortOrder(0, true, false)),
138140
pageSize
139141
);
@@ -215,6 +217,7 @@ public long accumulateObject(Object o, long shallowSize, Map<Field, Object> fiel
215217
topCount,
216218
List.of(LONG),
217219
List.of(DEFAULT_UNSORTABLE),
220+
Optional.empty(),
218221
List.of(new TopNOperator.SortOrder(0, true, false)),
219222
pageSize
220223
).get(context)
@@ -553,6 +556,7 @@ public void testCollectAllValues() {
553556
topCount,
554557
elementTypes,
555558
encoders,
559+
Optional.empty(),
556560
List.of(new TopNOperator.SortOrder(0, false, false)),
557561
randomPageSize()
558562
)
@@ -642,6 +646,7 @@ public void testCollectAllValues_RandomMultiValues() {
642646
topCount,
643647
elementTypes,
644648
encoders,
649+
Optional.empty(),
645650
List.of(new TopNOperator.SortOrder(0, false, false)),
646651
randomPageSize()
647652
)
@@ -676,6 +681,7 @@ private List<Tuple<Long, Long>> topNTwoColumns(
676681
limit,
677682
elementTypes,
678683
encoder,
684+
Optional.empty(),
679685
sortOrders,
680686
randomPageSize()
681687
)
@@ -703,6 +709,7 @@ public void testTopNManyDescriptionAndToString() {
703709
10,
704710
List.of(BYTES_REF, BYTES_REF),
705711
List.of(UTF8, new FixedLengthTopNEncoder(fixedLength)),
712+
Optional.empty(),
706713
List.of(new TopNOperator.SortOrder(1, false, false), new TopNOperator.SortOrder(3, false, true)),
707714
randomPageSize()
708715
);
@@ -945,6 +952,7 @@ private void assertSortingOnMV(
945952
topCount,
946953
List.of(blockType),
947954
List.of(encoder),
955+
Optional.empty(),
948956
List.of(sortOrders),
949957
randomPageSize()
950958
)
@@ -1075,6 +1083,7 @@ public void testRandomMultiValuesTopN() {
10751083
topCount,
10761084
elementTypes,
10771085
encoders,
1086+
Optional.empty(),
10781087
uniqueOrders.stream().toList(),
10791088
rows
10801089
),
@@ -1118,6 +1127,7 @@ public void testIPSortingSingleValue() throws UnknownHostException {
11181127
ips.size(),
11191128
List.of(BYTES_REF),
11201129
List.of(TopNEncoder.IP),
1130+
Optional.empty(),
11211131
List.of(new TopNOperator.SortOrder(0, asc, randomBoolean())),
11221132
randomPageSize()
11231133
)
@@ -1244,6 +1254,7 @@ private void assertIPSortingOnMultiValues(
12441254
ips.size(),
12451255
List.of(BYTES_REF),
12461256
List.of(TopNEncoder.IP),
1257+
Optional.empty(),
12471258
List.of(new TopNOperator.SortOrder(0, asc, nullsFirst)),
12481259
randomPageSize()
12491260
)
@@ -1331,6 +1342,7 @@ public void testZeroByte() {
13311342
2,
13321343
List.of(BYTES_REF, INT),
13331344
List.of(TopNEncoder.UTF8, DEFAULT_UNSORTABLE),
1345+
Optional.empty(),
13341346
List.of(
13351347
new TopNOperator.SortOrder(0, true, randomBoolean()),
13361348
new TopNOperator.SortOrder(1, randomBoolean(), randomBoolean())
@@ -1370,6 +1382,7 @@ public void testErrorBeforeFullyDraining() {
13701382
topCount,
13711383
List.of(LONG),
13721384
List.of(DEFAULT_UNSORTABLE),
1385+
Optional.empty(),
13731386
List.of(new TopNOperator.SortOrder(0, true, randomBoolean())),
13741387
maxPageSize
13751388
)
@@ -1405,6 +1418,7 @@ public void testCloseWithoutCompleting() {
14051418
2,
14061419
List.of(INT),
14071420
List.of(DEFAULT_UNSORTABLE),
1421+
Optional.empty(),
14081422
List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())),
14091423
randomPageSize()
14101424
)
@@ -1428,6 +1442,7 @@ public void testRowResizes() {
14281442
10,
14291443
types,
14301444
encoders,
1445+
Optional.empty(),
14311446
List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())),
14321447
randomPageSize()
14331448
)

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/AbstractBlockSourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final Page getOutput() {
4747
finish();
4848
return null;
4949
}
50-
int length = Math.min(ESTestCase.randomInt(maxPagePositions), remaining());
50+
int length = Math.min(maxPagePositions, remaining()); // ESTestCase.randomInt(maxPagePositions)
5151
return createPage(currentPosition, length);
5252
}
5353

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ public PlanFactory visitChangePointCommand(EsqlBaseParser.ChangePointCommandCont
479479
Source src = source(ctx);
480480
Attribute value = visitQualifiedName(ctx.value);
481481
Attribute key = ctx.key == null ? new UnresolvedAttribute(src, "@timestamp") : visitQualifiedName(ctx.key);
482-
Attribute partitionField = ctx.partitionField == null ? new UnresolvedAttribute(src, "partitionField") : visitQualifiedName(ctx.partitionField);
482+
Attribute partitionField = ctx.partitionField == null ? new UnresolvedAttribute(src, "partition") : visitQualifiedName(ctx.partitionField);
483483
Attribute targetType = new ReferenceAttribute(
484484
src,
485485
ctx.targetType == null ? "type" : visitQualifiedName(ctx.targetType).name(),

0 commit comments

Comments
 (0)