Skip to content

Commit 1609bb0

Browse files
authored
Add emit time to hash aggregation status (elastic#127988)
The hash aggregation operator may take time to emit the output pages, including keys and aggregated values. This change adds an emit_time field to the status. While I considered including this in hash_nanos and aggregation_nanos, having a separate section feels more natural. I am open to suggestions.
1 parent dc90751 commit 1609bb0

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

docs/changelog/127988.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127988
2+
summary: Add emit time to hash aggregation status
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ static TransportVersion def(int id) {
253253
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00);
254254
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00);
255255
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);
256+
public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME = def(9_077_0_00);
256257

257258
/*
258259
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public String describe() {
110110
*/
111111
private long rowsEmitted;
112112

113+
/**
114+
* Total nanos for emitting the output
115+
*/
116+
protected long emitNanos;
117+
113118
@SuppressWarnings("this-escape")
114119
public HashAggregationOperator(
115120
List<GroupingAggregator.Factory> aggregators,
@@ -223,6 +228,7 @@ public void finish() {
223228
finished = true;
224229
Block[] blocks = null;
225230
IntVector selected = null;
231+
long startInNanos = System.nanoTime();
226232
boolean success = false;
227233
try {
228234
selected = blockHash.nonEmpty();
@@ -247,6 +253,7 @@ public void finish() {
247253
if (success == false && blocks != null) {
248254
Releasables.closeExpectNoException(blocks);
249255
}
256+
emitNanos += System.nanoTime() - startInNanos;
250257
}
251258
}
252259

@@ -269,7 +276,7 @@ public void close() {
269276

270277
@Override
271278
public Operator.Status status() {
272-
return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
279+
return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
273280
}
274281

275282
protected static void checkState(boolean condition, String msg) {
@@ -320,20 +327,24 @@ public static class Status implements Operator.Status {
320327
*/
321328
private final long rowsEmitted;
322329

330+
private final long emitNanos;
331+
323332
/**
324333
* Build.
325334
* @param hashNanos Nanoseconds this operator has spent hashing grouping keys.
326335
* @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
327336
* @param pagesProcessed Count of pages this operator has processed.
328337
* @param rowsReceived Count of rows this operator has received.
329338
* @param rowsEmitted Count of rows this operator has emitted.
339+
* @param emitNanos Nanoseconds this operator has spent emitting the output.
330340
*/
331-
public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
341+
public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted, long emitNanos) {
332342
this.hashNanos = hashNanos;
333343
this.aggregationNanos = aggregationNanos;
334344
this.pagesProcessed = pagesProcessed;
335345
this.rowsReceived = rowsReceived;
336346
this.rowsEmitted = rowsEmitted;
347+
this.emitNanos = emitNanos;
337348
}
338349

339350
protected Status(StreamInput in) throws IOException {
@@ -348,6 +359,11 @@ protected Status(StreamInput in) throws IOException {
348359
rowsReceived = 0;
349360
rowsEmitted = 0;
350361
}
362+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME)) {
363+
emitNanos = in.readVLong();
364+
} else {
365+
emitNanos = 0;
366+
}
351367
}
352368

353369
@Override
@@ -360,6 +376,9 @@ public void writeTo(StreamOutput out) throws IOException {
360376
out.writeVLong(rowsReceived);
361377
out.writeVLong(rowsEmitted);
362378
}
379+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME)) {
380+
out.writeVLong(emitNanos);
381+
}
363382
}
364383

365384
@Override
@@ -402,6 +421,13 @@ public long rowsEmitted() {
402421
return rowsEmitted;
403422
}
404423

424+
/**
425+
* Nanoseconds this operator has spent emitting the output.
426+
*/
427+
public long emitNanos() {
428+
return emitNanos;
429+
}
430+
405431
@Override
406432
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
407433
builder.startObject();
@@ -416,6 +442,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
416442
builder.field("pages_processed", pagesProcessed);
417443
builder.field("rows_received", rowsReceived);
418444
builder.field("rows_emitted", rowsEmitted);
445+
builder.field("emit_nanos", emitNanos);
446+
if (builder.humanReadable()) {
447+
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos));
448+
}
419449
return builder.endObject();
420450

421451
}
@@ -429,12 +459,13 @@ public boolean equals(Object o) {
429459
&& aggregationNanos == status.aggregationNanos
430460
&& pagesProcessed == status.pagesProcessed
431461
&& rowsReceived == status.rowsReceived
432-
&& rowsEmitted == status.rowsEmitted;
462+
&& rowsEmitted == status.rowsEmitted
463+
&& emitNanos == status.emitNanos;
433464
}
434465

435466
@Override
436467
public int hashCode() {
437-
return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
468+
return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
438469
}
439470

440471
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
public class HashAggregationOperatorStatusTests extends AbstractWireSerializingTestCase<HashAggregationOperator.Status> {
1818
public static HashAggregationOperator.Status simple() {
19-
return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222);
19+
return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222, 180017);
2020
}
2121

2222
public static String simpleToJson() {
@@ -28,7 +28,9 @@ public static String simpleToJson() {
2828
"aggregation_time" : "200micros",
2929
"pages_processed" : 123,
3030
"rows_received" : 111,
31-
"rows_emitted" : 222
31+
"rows_emitted" : 222,
32+
"emit_nanos" : 180017,
33+
"emit_time" : "180micros"
3234
}""";
3335
}
3436

@@ -48,6 +50,7 @@ public HashAggregationOperator.Status createTestInstance() {
4850
randomNonNegativeLong(),
4951
randomNonNegativeInt(),
5052
randomNonNegativeLong(),
53+
randomNonNegativeLong(),
5154
randomNonNegativeLong()
5255
);
5356
}
@@ -59,14 +62,16 @@ protected HashAggregationOperator.Status mutateInstance(HashAggregationOperator.
5962
int pagesProcessed = instance.pagesProcessed();
6063
long rowsReceived = instance.rowsReceived();
6164
long rowsEmitted = instance.rowsEmitted();
62-
switch (between(0, 4)) {
65+
long emitNanos = instance.emitNanos();
66+
switch (between(0, 5)) {
6367
case 0 -> hashNanos = randomValueOtherThan(hashNanos, ESTestCase::randomNonNegativeLong);
6468
case 1 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
6569
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
6670
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
6771
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
72+
case 5 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
6873
default -> throw new UnsupportedOperationException();
6974
}
70-
return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
75+
return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
7176
}
7277
}

0 commit comments

Comments
 (0)