Skip to content

Commit 3c4a92c

Browse files
committed
Add wire test
1 parent fe22ef3 commit 3c4a92c

File tree

3 files changed

+224
-1
lines changed

3 files changed

+224
-1
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
import java.io.UncheckedIOException;
4545
import java.util.Arrays;
4646
import java.util.List;
47+
import java.util.Map;
4748
import java.util.Objects;
49+
import java.util.Set;
4850

4951
public final class TimeSeriesSourceOperator extends LuceneOperator {
5052

@@ -611,6 +613,40 @@ public static class Status extends LuceneOperator.Status {
611613
this.valuesLoaded = valuesLoaded;
612614
}
613615

616+
Status(
617+
int processedSlices,
618+
Set<String> processedQueries,
619+
Set<String> processedShards,
620+
long processNanos,
621+
int sliceIndex,
622+
int totalSlices,
623+
int pagesEmitted,
624+
int sliceMin,
625+
int sliceMax,
626+
int current,
627+
long rowsEmitted,
628+
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies,
629+
long tsidLoaded,
630+
long valuesLoaded
631+
) {
632+
super(
633+
processedSlices,
634+
processedQueries,
635+
processedShards,
636+
processNanos,
637+
sliceIndex,
638+
totalSlices,
639+
pagesEmitted,
640+
sliceMin,
641+
sliceMax,
642+
current,
643+
rowsEmitted,
644+
partitioningStrategies
645+
);
646+
this.tsidLoaded = tsidLoaded;
647+
this.valuesLoaded = valuesLoaded;
648+
}
649+
614650
Status(StreamInput in) throws IOException {
615651
super(in);
616652
this.tsidLoaded = in.readVLong();
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.lucene;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
13+
import org.elasticsearch.test.ESTestCase;
14+
15+
import java.util.HashMap;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.Set;
19+
import java.util.TreeSet;
20+
21+
import static org.hamcrest.Matchers.equalTo;
22+
23+
public class TimeSeriesSourceOperatorStatusTests extends AbstractWireSerializingTestCase<TimeSeriesSourceOperator.Status> {
24+
public static TimeSeriesSourceOperator.Status simple() {
25+
return new TimeSeriesSourceOperator.Status(
26+
2,
27+
Set.of("*:*"),
28+
new TreeSet<>(List.of("a:0", "a:1")),
29+
1002,
30+
0,
31+
1,
32+
5,
33+
123,
34+
99990,
35+
8000,
36+
222,
37+
Map.of("b:0", LuceneSliceQueue.PartitioningStrategy.SHARD, "a:1", LuceneSliceQueue.PartitioningStrategy.DOC),
38+
250,
39+
28000
40+
);
41+
}
42+
43+
public static String simpleToJson() {
44+
return """
45+
{
46+
"processed_slices" : 2,
47+
"processed_queries" : [
48+
"*:*"
49+
],
50+
"processed_shards" : [
51+
"a:0",
52+
"a:1"
53+
],
54+
"process_nanos" : 1002,
55+
"process_time" : "1micros",
56+
"slice_index" : 0,
57+
"total_slices" : 1,
58+
"pages_emitted" : 5,
59+
"slice_min" : 123,
60+
"slice_max" : 99990,
61+
"current" : 8000,
62+
"rows_emitted" : 222,
63+
"partitioning_strategies" : {
64+
"a:1" : "DOC",
65+
"b:0" : "SHARD"
66+
},
67+
"tsid_loaded" : 250,
68+
"values_loaded" : 28000
69+
}""";
70+
}
71+
72+
public void testToXContent() {
73+
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
74+
}
75+
76+
@Override
77+
protected Writeable.Reader<TimeSeriesSourceOperator.Status> instanceReader() {
78+
return TimeSeriesSourceOperator.Status::new;
79+
}
80+
81+
@Override
82+
public TimeSeriesSourceOperator.Status createTestInstance() {
83+
return new TimeSeriesSourceOperator.Status(
84+
randomNonNegativeInt(),
85+
randomProcessedQueries(),
86+
randomProcessedShards(),
87+
randomNonNegativeLong(),
88+
randomNonNegativeInt(),
89+
randomNonNegativeInt(),
90+
randomNonNegativeInt(),
91+
randomNonNegativeInt(),
92+
randomNonNegativeInt(),
93+
randomNonNegativeInt(),
94+
randomNonNegativeLong(),
95+
randomPartitioningStrategies(),
96+
randomNonNegativeLong(),
97+
randomNonNegativeLong()
98+
);
99+
}
100+
101+
private static Set<String> randomProcessedQueries() {
102+
int size = between(0, 10);
103+
Set<String> set = new TreeSet<>();
104+
while (set.size() < size) {
105+
set.add(randomAlphaOfLength(5));
106+
}
107+
return set;
108+
}
109+
110+
private static Set<String> randomProcessedShards() {
111+
int size = between(0, 10);
112+
Set<String> set = new TreeSet<>();
113+
while (set.size() < size) {
114+
set.add(randomAlphaOfLength(3) + ":" + between(0, 10));
115+
}
116+
return set;
117+
}
118+
119+
private static Map<String, LuceneSliceQueue.PartitioningStrategy> randomPartitioningStrategies() {
120+
int size = between(0, 10);
121+
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies = new HashMap<>();
122+
while (partitioningStrategies.size() < size) {
123+
partitioningStrategies.put(
124+
randomAlphaOfLength(3) + ":" + between(0, 10),
125+
randomFrom(LuceneSliceQueue.PartitioningStrategy.values())
126+
);
127+
}
128+
return partitioningStrategies;
129+
}
130+
131+
@Override
132+
protected TimeSeriesSourceOperator.Status mutateInstance(TimeSeriesSourceOperator.Status instance) {
133+
int processedSlices = instance.processedSlices();
134+
Set<String> processedQueries = instance.processedQueries();
135+
Set<String> processedShards = instance.processedShards();
136+
long processNanos = instance.processNanos();
137+
int sliceIndex = instance.sliceIndex();
138+
int totalSlices = instance.totalSlices();
139+
int pagesEmitted = instance.pagesEmitted();
140+
int sliceMin = instance.sliceMin();
141+
int sliceMax = instance.sliceMax();
142+
int current = instance.current();
143+
long rowsEmitted = instance.rowsEmitted();
144+
long tsidLoaded = instance.tsidLoaded();
145+
long valuesLoaded = instance.valuesLoaded();
146+
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies = instance.partitioningStrategies();
147+
switch (between(0, 13)) {
148+
case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt);
149+
case 1 -> processedQueries = randomValueOtherThan(
150+
processedQueries,
151+
TimeSeriesSourceOperatorStatusTests::randomProcessedQueries
152+
);
153+
case 2 -> processedShards = randomValueOtherThan(processedShards, TimeSeriesSourceOperatorStatusTests::randomProcessedShards);
154+
case 3 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
155+
case 4 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
156+
case 5 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
157+
case 6 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
158+
case 7 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
159+
case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
160+
case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
161+
case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
162+
case 11 -> partitioningStrategies = randomValueOtherThan(
163+
partitioningStrategies,
164+
TimeSeriesSourceOperatorStatusTests::randomPartitioningStrategies
165+
);
166+
case 12 -> tsidLoaded = randomValueOtherThan(tsidLoaded, ESTestCase::randomNonNegativeLong);
167+
case 13 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong);
168+
default -> throw new UnsupportedOperationException();
169+
}
170+
return new TimeSeriesSourceOperator.Status(
171+
processedSlices,
172+
processedQueries,
173+
processedShards,
174+
processNanos,
175+
sliceIndex,
176+
totalSlices,
177+
pagesEmitted,
178+
sliceMin,
179+
sliceMax,
180+
current,
181+
rowsEmitted,
182+
partitioningStrategies,
183+
tsidLoaded,
184+
valuesLoaded
185+
);
186+
}
187+
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ protected Matcher<String> expectedDescriptionOfSimple() {
343343

344344
@Override
345345
protected Matcher<String> expectedToStringOfSimple() {
346-
return equalTo("TimeSeriesSourceOperator[maxPageSize=1, remainingDocs=1]");
346+
return equalTo("TimeSeriesSourceOperator[shards = [test], maxPageSize = 1[maxPageSize=1, remainingDocs=1]]");
347347
}
348348

349349
List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {

0 commit comments

Comments
 (0)