Skip to content

Commit b45c672

Browse files
committed
ESQL: Added status to OrdinalsGroupingOperator
1 parent ec5254b commit b45c672

File tree

4 files changed

+238
-6
lines changed

4 files changed

+238
-6
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ static TransportVersion def(int id) {
329329
public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00);
330330
public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00);
331331
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00);
332+
public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS = def(9_116_0_00);
332333
/*
333334
* STOP! READ THIS FIRST! No, really,
334335
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public String describe() {
113113
/**
114114
* Total nanos for emitting the output
115115
*/
116-
protected long emitNanos;
116+
private long emitNanos;
117117

118118
@SuppressWarnings("this-escape")
119119
public HashAggregationOperator(

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 169 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,13 @@
1313
import org.apache.lucene.util.BytesRef;
1414
import org.apache.lucene.util.BytesRefBuilder;
1515
import org.apache.lucene.util.PriorityQueue;
16+
import org.elasticsearch.TransportVersion;
17+
import org.elasticsearch.TransportVersions;
1618
import org.elasticsearch.common.CheckedSupplier;
19+
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
21+
import org.elasticsearch.common.io.stream.StreamInput;
22+
import org.elasticsearch.common.io.stream.StreamOutput;
1723
import org.elasticsearch.common.util.BigArrays;
1824
import org.elasticsearch.common.util.BitArray;
1925
import org.elasticsearch.compute.Describable;
@@ -36,7 +42,9 @@
3642
import org.elasticsearch.core.RefCounted;
3743
import org.elasticsearch.core.Releasable;
3844
import org.elasticsearch.core.Releasables;
45+
import org.elasticsearch.core.TimeValue;
3946
import org.elasticsearch.index.mapper.BlockLoader;
47+
import org.elasticsearch.xcontent.XContentBuilder;
4048

4149
import java.io.IOException;
4250
import java.io.UncheckedIOException;
@@ -100,6 +108,23 @@ public String describe() {
100108

101109
private boolean finished = false;
102110

111+
/**
112+
* Nanoseconds this operator has spent processing the rows.
113+
*/
114+
private long processNanos;
115+
/**
116+
* Count of pages this operator has processed.
117+
*/
118+
private int pagesProcessed;
119+
/**
120+
* Count of rows this operator has received.
121+
*/
122+
private long rowsReceived;
123+
/**
124+
* Count of rows this operator has emitted.
125+
*/
126+
private long rowsEmitted;
127+
103128
// used to extract and aggregate values
104129
private final int maxPageSize;
105130
private ValuesAggregator valuesAggregator;
@@ -135,6 +160,7 @@ public boolean needsInput() {
135160
public void addInput(Page page) {
136161
checkState(needsInput(), "Operator is already finishing");
137162
requireNonNull(page, "page is null");
163+
long start = System.nanoTime();
138164
DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
139165
final int shardIndex = docVector.shards().getInt(0);
140166
RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex);
@@ -184,6 +210,9 @@ public void addInput(Page page) {
184210
if (pagePassed == false) {
185211
Releasables.closeExpectNoException(page::releaseBlocks);
186212
}
213+
pagesProcessed++;
214+
rowsReceived += page.getPositionCount();
215+
processNanos += System.nanoTime() - start;
187216
}
188217
}
189218

@@ -208,25 +237,28 @@ public Page getOutput() {
208237
if (finished == false) {
209238
return null;
210239
}
240+
Page page = null;
211241
if (valuesAggregator != null) {
212242
try {
213-
return valuesAggregator.getOutput();
243+
page = valuesAggregator.getOutput();
214244
} finally {
215245
final ValuesAggregator aggregator = this.valuesAggregator;
216246
this.valuesAggregator = null;
217247
Releasables.close(aggregator);
218248
}
219-
}
220-
if (ordinalAggregators.isEmpty() == false) {
249+
} else if (ordinalAggregators.isEmpty() == false) {
221250
try {
222-
return mergeOrdinalsSegmentResults();
251+
page = mergeOrdinalsSegmentResults();
223252
} catch (IOException e) {
224253
throw new UncheckedIOException(e);
225254
} finally {
226255
Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear);
227256
}
228257
}
229-
return null;
258+
if (page != null) {
259+
rowsEmitted += page.getPositionCount();
260+
}
261+
return page;
230262
}
231263

232264
@Override
@@ -322,6 +354,11 @@ public void close() {
322354
Releasables.close(() -> Releasables.close(ordinalAggregators.values()), valuesAggregator);
323355
}
324356

357+
@Override
358+
public Operator.Status status() {
359+
return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
360+
}
361+
325362
private static void checkState(boolean condition, String msg) {
326363
if (condition == false) {
327364
throw new IllegalArgumentException(msg);
@@ -337,6 +374,133 @@ public String toString() {
337374
return this.getClass().getSimpleName() + "[" + "aggregators=[" + aggregatorDescriptions + "]]";
338375
}
339376

377+
public static class Status implements Operator.Status {
378+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
379+
Operator.Status.class,
380+
"ordinals_grouping",
381+
Status::new
382+
);
383+
384+
/**
385+
* Nanoseconds this operator has spent processing the rows.
386+
*/
387+
private final long processNanos;
388+
/**
389+
* Count of pages this operator has processed.
390+
*/
391+
private final int pagesProcessed;
392+
/**
393+
* Count of rows this operator has received.
394+
*/
395+
private final long rowsReceived;
396+
/**
397+
* Count of rows this operator has emitted.
398+
*/
399+
private final long rowsEmitted;
400+
401+
/**
402+
* Build.
403+
* @param processNanos Nanoseconds this operator has spent processing the rows.
404+
* @param pagesProcessed Count of pages this operator has processed.
405+
* @param rowsReceived Count of rows this operator has received.
406+
* @param rowsEmitted Count of rows this operator has emitted.
407+
*/
408+
public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
409+
this.processNanos = processNanos;
410+
this.pagesProcessed = pagesProcessed;
411+
this.rowsReceived = rowsReceived;
412+
this.rowsEmitted = rowsEmitted;
413+
}
414+
415+
protected Status(StreamInput in) throws IOException {
416+
processNanos = in.readVLong();
417+
pagesProcessed = in.readVInt();
418+
rowsReceived = in.readVLong();
419+
rowsEmitted = in.readVLong();
420+
}
421+
422+
@Override
423+
public void writeTo(StreamOutput out) throws IOException {
424+
out.writeVLong(processNanos);
425+
out.writeVInt(pagesProcessed);
426+
out.writeVLong(rowsReceived);
427+
out.writeVLong(rowsEmitted);
428+
}
429+
430+
@Override
431+
public String getWriteableName() {
432+
return ENTRY.name;
433+
}
434+
435+
/**
436+
* Nanoseconds this operator has spent processing the rows.
437+
*/
438+
public long processNanos() {
439+
return processNanos;
440+
}
441+
442+
/**
443+
* Count of pages this operator has processed.
444+
*/
445+
public int pagesProcessed() {
446+
return pagesProcessed;
447+
}
448+
449+
/**
450+
* Count of rows this operator has received.
451+
*/
452+
public long rowsReceived() {
453+
return rowsReceived;
454+
}
455+
456+
/**
457+
* Count of rows this operator has emitted.
458+
*/
459+
public long rowsEmitted() {
460+
return rowsEmitted;
461+
}
462+
463+
@Override
464+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
465+
builder.startObject();
466+
builder.field("process_nanos", processNanos);
467+
if (builder.humanReadable()) {
468+
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
469+
}
470+
builder.field("pages_processed", pagesProcessed);
471+
builder.field("rows_received", rowsReceived);
472+
builder.field("rows_emitted", rowsEmitted);
473+
return builder.endObject();
474+
475+
}
476+
477+
@Override
478+
public boolean equals(Object o) {
479+
if (this == o) return true;
480+
if (o == null || getClass() != o.getClass()) return false;
481+
Status status = (Status) o;
482+
return processNanos == status.processNanos
483+
&& pagesProcessed == status.pagesProcessed
484+
&& rowsReceived == status.rowsReceived
485+
&& rowsEmitted == status.rowsEmitted;
486+
}
487+
488+
@Override
489+
public int hashCode() {
490+
return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
491+
}
492+
493+
@Override
494+
public String toString() {
495+
return Strings.toString(this);
496+
}
497+
498+
@Override
499+
public TransportVersion getMinimalSupportedVersion() {
500+
return TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS;
501+
}
502+
}
503+
340504
record SegmentID(int shardIndex, int segmentIndex) {
341505

342506
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
13+
import org.elasticsearch.test.ESTestCase;
14+
15+
import static org.hamcrest.Matchers.equalTo;
16+
17+
public class OrdinalsGroupingOperatorStatusTests extends AbstractWireSerializingTestCase<OrdinalsGroupingOperator.Status> {
18+
public static OrdinalsGroupingOperator.Status simple() {
19+
return new OrdinalsGroupingOperator.Status(200012, 123, 111, 222);
20+
}
21+
22+
public static String simpleToJson() {
23+
return """
24+
{
25+
"process_nanos" : 200012,
26+
"process_time" : "200micros",
27+
"pages_processed" : 123,
28+
"rows_received" : 111,
29+
"rows_emitted" : 222
30+
}""";
31+
}
32+
33+
public void testToXContent() {
34+
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
35+
}
36+
37+
@Override
38+
protected Writeable.Reader<OrdinalsGroupingOperator.Status> instanceReader() {
39+
return OrdinalsGroupingOperator.Status::new;
40+
}
41+
42+
@Override
43+
public OrdinalsGroupingOperator.Status createTestInstance() {
44+
return new OrdinalsGroupingOperator.Status(
45+
randomNonNegativeLong(),
46+
randomNonNegativeInt(),
47+
randomNonNegativeLong(),
48+
randomNonNegativeLong()
49+
);
50+
}
51+
52+
@Override
53+
protected OrdinalsGroupingOperator.Status mutateInstance(OrdinalsGroupingOperator.Status instance) {
54+
long processNanos = instance.processNanos();
55+
int pagesProcessed = instance.pagesProcessed();
56+
long rowsReceived = instance.rowsReceived();
57+
long rowsEmitted = instance.rowsEmitted();
58+
switch (between(0, 3)) {
59+
case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
60+
case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
61+
case 2 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
62+
case 3 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
63+
default -> throw new UnsupportedOperationException();
64+
}
65+
return new OrdinalsGroupingOperator.Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
66+
}
67+
}

0 commit comments

Comments
 (0)