Skip to content

Commit 662bb70

Browse files
authored
[ESQL] Add finish() elapsed time to aggregation profiling times (#113172) (#113378)
Closes #112950 Added the `finish()` time to the `AggregationOperator` profiling times, as `finish()` here calls the aggregator evaluators, which may take a considerable time. Here, I added the time to the same count of nanos, but we could separate them in 2 fields, as aggregator times differ a lot between adding input and outputting the result. However, I believe that would require a new transport version? ## Example times Example of how much time is lost in the sample query `FROM test | STATS x=COUNT_DISTINCT(a)` **with the times separated in 2 variables**: ```JSON { "operator": "AggregationOperator[aggregators=[Aggregator[aggregatorFunction=CountDistinctLongAggregatorFunction[channels=[1]], mode=INITIAL]]]", "status": { "aggregation_nanos": 571900, "aggregation_finish_nanos": 1484600, "pages_processed": 3 } } ``` Another, more obvious example, where there are no input pages ```JSON { "operator": "AggregationOperator[aggregators=[Aggregator[aggregatorFunction=CountDistinctLongAggregatorFunction[channels=[1]], mode=INITIAL]]]", "status": { "aggregation_nanos": 0, "aggregation_finish_nanos": 48800, "pages_processed": 0 } } ```
1 parent 080ee4c commit 662bb70

File tree

5 files changed

+60
-10
lines changed

5 files changed

+60
-10
lines changed

docs/changelog/113172.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 113172
2+
summary: "[ESQL] Add finish() elapsed time to aggregation profiling times"
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 112950

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ static TransportVersion def(int id) {
220220
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_EMBEDDINGS_ADDED = def(8_744_00_0);
221221
public static final TransportVersion BULK_INCREMENTAL_STATE = def(8_745_00_0);
222222
public static final TransportVersion FAILURE_STORE_STATUS_IN_INDEX_RESPONSE = def(8_746_00_0);
223+
public static final TransportVersion ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS = def(8_747_00_0);
223224

224225
/*
225226
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public class AggregationOperator implements Operator {
5050
* Nanoseconds this operator has spent running the aggregations.
5151
*/
5252
private long aggregationNanos;
53+
/**
54+
* Nanoseconds this operator has spent running the aggregations final evaluation.
55+
*/
56+
private long aggregationFinishNanos;
5357
/**
5458
* Count of pages this operator has processed.
5559
*/
@@ -117,6 +121,7 @@ public void finish() {
117121
if (finished) {
118122
return;
119123
}
124+
long start = System.nanoTime();
120125
finished = true;
121126
Block[] blocks = null;
122127
boolean success = false;
@@ -136,6 +141,7 @@ public void finish() {
136141
if (success == false && blocks != null) {
137142
Releasables.closeExpectNoException(blocks);
138143
}
144+
aggregationFinishNanos += System.nanoTime() - start;
139145
}
140146
}
141147

@@ -175,7 +181,7 @@ public String toString() {
175181

176182
@Override
177183
public Operator.Status status() {
178-
return new Status(aggregationNanos, pagesProcessed);
184+
return new Status(aggregationNanos, aggregationFinishNanos, pagesProcessed);
179185
}
180186

181187
public static class Status implements Operator.Status {
@@ -189,6 +195,11 @@ public static class Status implements Operator.Status {
189195
* Nanoseconds this operator has spent running the aggregations.
190196
*/
191197
private final long aggregationNanos;
198+
199+
/**
200+
* Nanoseconds this operator has spent running the aggregations final evaluation.
201+
*/
202+
private final Long aggregationFinishNanos;
192203
/**
193204
* Count of pages this operator has processed.
194205
*/
@@ -197,21 +208,31 @@ public static class Status implements Operator.Status {
197208
/**
198209
* Build.
199210
* @param aggregationNanos Nanoseconds this operator has spent running the aggregations.
211+
* @param aggregationFinishNanos Nanoseconds this operator has spent running the aggregations.
200212
* @param pagesProcessed Count of pages this operator has processed.
201213
*/
202-
public Status(long aggregationNanos, int pagesProcessed) {
214+
public Status(long aggregationNanos, long aggregationFinishNanos, int pagesProcessed) {
203215
this.aggregationNanos = aggregationNanos;
216+
this.aggregationFinishNanos = aggregationFinishNanos;
204217
this.pagesProcessed = pagesProcessed;
205218
}
206219

207220
protected Status(StreamInput in) throws IOException {
208221
aggregationNanos = in.readVLong();
222+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS)) {
223+
aggregationFinishNanos = in.readOptionalVLong();
224+
} else {
225+
aggregationFinishNanos = null;
226+
}
209227
pagesProcessed = in.readVInt();
210228
}
211229

212230
@Override
213231
public void writeTo(StreamOutput out) throws IOException {
214232
out.writeVLong(aggregationNanos);
233+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_AGGREGATION_OPERATOR_STATUS_FINISH_NANOS)) {
234+
out.writeOptionalVLong(aggregationFinishNanos);
235+
}
215236
out.writeVInt(pagesProcessed);
216237
}
217238

@@ -227,6 +248,13 @@ public long aggregationNanos() {
227248
return aggregationNanos;
228249
}
229250

251+
/**
252+
* Nanoseconds this operator has spent running the aggregations final evaluation.
253+
*/
254+
public long aggregationFinishNanos() {
255+
return aggregationFinishNanos;
256+
}
257+
230258
/**
231259
* Count of pages this operator has processed.
232260
*/
@@ -241,6 +269,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
241269
if (builder.humanReadable()) {
242270
builder.field("aggregation_time", TimeValue.timeValueNanos(aggregationNanos));
243271
}
272+
builder.field("aggregation_finish_nanos", aggregationFinishNanos);
273+
if (builder.humanReadable()) {
274+
builder.field(
275+
"aggregation_finish_time",
276+
aggregationFinishNanos == null ? null : TimeValue.timeValueNanos(aggregationFinishNanos)
277+
);
278+
}
244279
builder.field("pages_processed", pagesProcessed);
245280
return builder.endObject();
246281

@@ -251,12 +286,14 @@ public boolean equals(Object o) {
251286
if (this == o) return true;
252287
if (o == null || getClass() != o.getClass()) return false;
253288
Status status = (Status) o;
254-
return aggregationNanos == status.aggregationNanos && pagesProcessed == status.pagesProcessed;
289+
return aggregationNanos == status.aggregationNanos
290+
&& pagesProcessed == status.pagesProcessed
291+
&& Objects.equals(aggregationFinishNanos, status.aggregationFinishNanos);
255292
}
256293

257294
@Override
258295
public int hashCode() {
259-
return Objects.hash(aggregationNanos, pagesProcessed);
296+
return Objects.hash(aggregationNanos, aggregationFinishNanos, pagesProcessed);
260297
}
261298

262299
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616

1717
public class AggregationOperatorStatusTests extends AbstractWireSerializingTestCase<AggregationOperator.Status> {
1818
public static AggregationOperator.Status simple() {
19-
return new AggregationOperator.Status(200012, 123);
19+
return new AggregationOperator.Status(200012, 400036, 123);
2020
}
2121

2222
public static String simpleToJson() {
2323
return """
2424
{
2525
"aggregation_nanos" : 200012,
2626
"aggregation_time" : "200micros",
27+
"aggregation_finish_nanos" : 400036,
28+
"aggregation_finish_time" : "400micros",
2729
"pages_processed" : 123
2830
}""";
2931
}
@@ -39,18 +41,20 @@ protected Writeable.Reader<AggregationOperator.Status> instanceReader() {
3941

4042
@Override
4143
public AggregationOperator.Status createTestInstance() {
42-
return new AggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeInt());
44+
return new AggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeInt());
4345
}
4446

4547
@Override
4648
protected AggregationOperator.Status mutateInstance(AggregationOperator.Status instance) {
4749
long aggregationNanos = instance.aggregationNanos();
50+
long aggregationFinishNanos = instance.aggregationFinishNanos();
4851
int pagesProcessed = instance.pagesProcessed();
49-
switch (between(0, 1)) {
52+
switch (between(0, 2)) {
5053
case 0 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong);
51-
case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
54+
case 1 -> aggregationFinishNanos = randomValueOtherThan(aggregationFinishNanos, ESTestCase::randomNonNegativeLong);
55+
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
5256
default -> throw new UnsupportedOperationException();
5357
}
54-
return new AggregationOperator.Status(aggregationNanos, pagesProcessed);
58+
return new AggregationOperator.Status(aggregationNanos, aggregationFinishNanos, pagesProcessed);
5559
}
5660
}

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,9 @@ private String checkOperatorProfile(Map<String, Object> o) {
562562
.entry("processing_nanos", greaterThan(0))
563563
.entry("processed_queries", List.of("*:*"));
564564
case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk());
565-
case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)).entry("aggregation_nanos", greaterThan(0));
565+
case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
566+
.entry("aggregation_nanos", greaterThan(0))
567+
.entry("aggregation_finish_nanos", greaterThan(0));
566568
case "ExchangeSinkOperator" -> matchesMap().entry("pages_accepted", greaterThan(0));
567569
case "ExchangeSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)).entry("pages_waiting", 0);
568570
case "ProjectOperator", "EvalOperator" -> basicProfile();

0 commit comments

Comments
 (0)