Skip to content

Commit fbcdb8d

Browse files
authored
Add emit time to hash aggregation status (elastic#127988) (elastic#128225)
The hash aggregation operator may take time to emit the output pages, including keys and aggregated values. This change adds an emit_time field to the status. While I considered including this in hash_nanos and aggregation_nanos, having a separate section feels more natural.
1 parent bbf568c commit fbcdb8d

File tree

4 files changed

+50
-8
lines changed

4 files changed

+50
-8
lines changed

docs/changelog/127988.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127988
2+
summary: Add emit time to hash aggregation status
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ static TransportVersion def(int id) {
222222
public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED_8_19 = def(8_841_0_31);
223223
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(8_841_0_32);
224224
public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM_BACKPORT_8_19 = def(8_841_0_33);
225+
public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME_8_19 = def(8_841_0_34);
225226

226227
/*
227228
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public String describe() {
108108
*/
109109
private long rowsEmitted;
110110

111+
/**
112+
* Total nanos for emitting the output
113+
*/
114+
protected long emitNanos;
115+
111116
@SuppressWarnings("this-escape")
112117
public HashAggregationOperator(
113118
List<GroupingAggregator.Factory> aggregators,
@@ -217,6 +222,7 @@ public void finish() {
217222
finished = true;
218223
Block[] blocks = null;
219224
IntVector selected = null;
225+
long startInNanos = System.nanoTime();
220226
boolean success = false;
221227
try {
222228
selected = blockHash.nonEmpty();
@@ -240,6 +246,7 @@ public void finish() {
240246
if (success == false && blocks != null) {
241247
Releasables.closeExpectNoException(blocks);
242248
}
249+
emitNanos += System.nanoTime() - startInNanos;
243250
}
244251
}
245252

@@ -258,7 +265,7 @@ public void close() {
258265

259266
@Override
260267
public Operator.Status status() {
261-
return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
268+
return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
262269
}
263270

264271
protected static void checkState(boolean condition, String msg) {
@@ -309,20 +316,24 @@ public static class Status implements Operator.Status {
309316
*/
310317
private final long rowsEmitted;
311318

319+
private final long emitNanos;
320+
312321
/**
313322
* Build.
314323
* @param hashNanos Nanoseconds this operator has spent hashing grouping keys.
315324
* @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
316325
* @param pagesProcessed Count of pages this operator has processed.
317326
* @param rowsReceived Count of rows this operator has received.
318327
* @param rowsEmitted Count of rows this operator has emitted.
328+
* @param emitNanos Nanoseconds this operator has spent emitting the output.
319329
*/
320-
public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
330+
public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted, long emitNanos) {
321331
this.hashNanos = hashNanos;
322332
this.aggregationNanos = aggregationNanos;
323333
this.pagesProcessed = pagesProcessed;
324334
this.rowsReceived = rowsReceived;
325335
this.rowsEmitted = rowsEmitted;
336+
this.emitNanos = emitNanos;
326337
}
327338

328339
protected Status(StreamInput in) throws IOException {
@@ -337,6 +348,11 @@ protected Status(StreamInput in) throws IOException {
337348
rowsReceived = 0;
338349
rowsEmitted = 0;
339350
}
351+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME_8_19)) {
352+
emitNanos = in.readVLong();
353+
} else {
354+
emitNanos = 0;
355+
}
340356
}
341357

342358
@Override
@@ -349,6 +365,9 @@ public void writeTo(StreamOutput out) throws IOException {
349365
out.writeVLong(rowsReceived);
350366
out.writeVLong(rowsEmitted);
351367
}
368+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME_8_19)) {
369+
out.writeVLong(emitNanos);
370+
}
352371
}
353372

354373
@Override
@@ -391,6 +410,13 @@ public long rowsEmitted() {
391410
return rowsEmitted;
392411
}
393412

413+
/**
414+
* Nanoseconds this operator has spent emitting the output.
415+
*/
416+
public long emitNanos() {
417+
return emitNanos;
418+
}
419+
394420
@Override
395421
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
396422
builder.startObject();
@@ -405,6 +431,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
405431
builder.field("pages_processed", pagesProcessed);
406432
builder.field("rows_received", rowsReceived);
407433
builder.field("rows_emitted", rowsEmitted);
434+
builder.field("emit_nanos", emitNanos);
435+
if (builder.humanReadable()) {
436+
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos));
437+
}
408438
return builder.endObject();
409439

410440
}
@@ -418,12 +448,13 @@ public boolean equals(Object o) {
418448
&& aggregationNanos == status.aggregationNanos
419449
&& pagesProcessed == status.pagesProcessed
420450
&& rowsReceived == status.rowsReceived
421-
&& rowsEmitted == status.rowsEmitted;
451+
&& rowsEmitted == status.rowsEmitted
452+
&& emitNanos == status.emitNanos;
422453
}
423454

424455
@Override
425456
public int hashCode() {
426-
return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
457+
return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
427458
}
428459

429460
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
public class HashAggregationOperatorStatusTests extends AbstractWireSerializingTestCase<HashAggregationOperator.Status> {
1818
public static HashAggregationOperator.Status simple() {
19-
return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222);
19+
return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222, 180017);
2020
}
2121

2222
public static String simpleToJson() {
@@ -28,7 +28,9 @@ public static String simpleToJson() {
2828
"aggregation_time" : "200micros",
2929
"pages_processed" : 123,
3030
"rows_received" : 111,
31-
"rows_emitted" : 222
31+
"rows_emitted" : 222,
32+
"emit_nanos" : 180017,
33+
"emit_time" : "180micros"
3234
}""";
3335
}
3436

@@ -48,6 +50,7 @@ public HashAggregationOperator.Status createTestInstance() {
4850
randomNonNegativeLong(),
4951
randomNonNegativeInt(),
5052
randomNonNegativeLong(),
53+
randomNonNegativeLong(),
5154
randomNonNegativeLong()
5255
);
5356
}
@@ -59,14 +62,16 @@ protected HashAggregationOperator.Status mutateInstance(HashAggregationOperator.
5962
int pagesProcessed = instance.pagesProcessed();
6063
long rowsReceived = instance.rowsReceived();
6164
long rowsEmitted = instance.rowsEmitted();
62-
switch (between(0, 4)) {
65+
long emitNanos = instance.emitNanos();
66+
switch (between(0, 5)) {
6367
case 0 -> hashNanos = randomValueOtherThan(hashNanos, ESTestCase::randomNonNegativeLong);
6468
case 1 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
6569
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
6670
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
6771
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
72+
case 5 -> emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
6873
default -> throw new UnsupportedOperationException();
6974
}
70-
return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted);
75+
return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted, emitNanos);
7176
}
7277
}

0 commit comments

Comments
 (0)