Skip to content

Commit 7877fb3

Browse files
committed
ESQL: Add times to topn status
Adds times to the TopNOperator status, specifically the nanoseconds spent receiving the values and the nanoseconds spent emitting the values: ``` { "operator" : "TopNOperator[count=0/1000, elementTypes=[BYTES_REF, DOUBLE], encoders=[UTF8TopNEncoder, DefaultSortable], sortOrders=[SortOrder[channel=1, asc=false, nullsFirst=true]]]", "status" : { "receive_nanos" : 193415, <--- this row "emit_nanos" : 61, <--- and this row "occupied_rows" : 0, "ram_bytes_used" : 4296, "ram_used" : "4.1kb", "pages_received" : 1, "pages_emitted" : 1, "rows_received" : 1000, "rows_emitted" : 1000 } } ```
1 parent feafb3a commit 7877fb3

File tree

5 files changed

+79
-52
lines changed

5 files changed

+79
-52
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ static TransportVersion def(int id) {
342342
public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00);
343343
public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00);
344344
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
345+
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_124_0_00);
345346

346347
/*
347348
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,9 @@ public String describe() {
308308

309309
private Iterator<Page> output;
310310

311+
private long receiveNanos;
312+
private long emitNanos;
313+
311314
/**
312315
* Count of pages that have been received by this operator.
313316
*/
@@ -387,6 +390,7 @@ public boolean needsInput() {
387390

388391
@Override
389392
public void addInput(Page page) {
393+
long start = System.nanoTime();
390394
/*
391395
* Since row tracks memory we have to be careful to close any unused rows,
392396
* including any rows that fail while constructing because they allocate
@@ -423,6 +427,7 @@ public void addInput(Page page) {
423427
pagesReceived++;
424428
rowsReceived += page.getPositionCount();
425429
}
430+
receiveNanos += System.nanoTime() - start;
426431
}
427432

428433
@Override
@@ -548,9 +553,11 @@ public Page getOutput() {
548553
if (output == null || output.hasNext() == false) {
549554
return null;
550555
}
556+
long start = System.nanoTime();
551557
Page ret = output.next();
552558
pagesEmitted++;
553559
rowsEmitted += ret.getPositionCount();
560+
emitNanos += System.nanoTime() - start;
554561
return ret;
555562
}
556563

@@ -588,7 +595,16 @@ public long ramBytesUsed() {
588595

589596
@Override
590597
public Status status() {
591-
return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
598+
return new TopNOperatorStatus(
599+
receiveNanos,
600+
emitNanos,
601+
inputQueue.size(),
602+
ramBytesUsed(),
603+
pagesReceived,
604+
pagesEmitted,
605+
rowsReceived,
606+
rowsEmitted
607+
);
592608
}
593609

594610
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public class TopNOperatorStatus implements Operator.Status {
2525
"topn",
2626
TopNOperatorStatus::new
2727
);
28+
private final long receiveNanos;
29+
private final long emitNanos;
2830
private final int occupiedRows;
2931
private final long ramBytesUsed;
3032
private final int pagesReceived;
@@ -33,13 +35,17 @@ public class TopNOperatorStatus implements Operator.Status {
3335
private final long rowsEmitted;
3436

3537
public TopNOperatorStatus(
38+
long receiveNanos,
39+
long emitNanos,
3640
int occupiedRows,
3741
long ramBytesUsed,
3842
int pagesReceived,
3943
int pagesEmitted,
4044
long rowsReceived,
4145
long rowsEmitted
4246
) {
47+
this.receiveNanos = receiveNanos;
48+
this.emitNanos = emitNanos;
4349
this.occupiedRows = occupiedRows;
4450
this.ramBytesUsed = ramBytesUsed;
4551
this.pagesReceived = pagesReceived;
@@ -49,6 +55,13 @@ public TopNOperatorStatus(
4955
}
5056

5157
TopNOperatorStatus(StreamInput in) throws IOException {
58+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
59+
this.receiveNanos = in.readVLong();
60+
this.emitNanos = in.readVLong();
61+
} else {
62+
this.receiveNanos = 0;
63+
this.emitNanos = 0;
64+
}
5265
this.occupiedRows = in.readVInt();
5366
this.ramBytesUsed = in.readVLong();
5467

@@ -67,6 +80,11 @@ public TopNOperatorStatus(
6780

6881
@Override
6982
public void writeTo(StreamOutput out) throws IOException {
83+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
84+
out.writeVLong(receiveNanos);
85+
out.writeVLong(emitNanos);
86+
}
87+
7088
out.writeVInt(occupiedRows);
7189
out.writeVLong(ramBytesUsed);
7290

@@ -83,6 +101,14 @@ public String getWriteableName() {
83101
return ENTRY.name;
84102
}
85103

104+
public long receiveNanos() {
105+
return receiveNanos;
106+
}
107+
108+
public long emitNanos() {
109+
return emitNanos;
110+
}
111+
86112
public int occupiedRows() {
87113
return occupiedRows;
88114
}
@@ -110,6 +136,8 @@ public long rowsEmitted() {
110136
@Override
111137
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
112138
builder.startObject();
139+
builder.field("receive_nanos", receiveNanos);
140+
builder.field("emit_nanos", emitNanos);
113141
builder.field("occupied_rows", occupiedRows);
114142
builder.field("ram_bytes_used", ramBytesUsed);
115143
builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed));
@@ -126,7 +154,9 @@ public boolean equals(Object o) {
126154
return false;
127155
}
128156
TopNOperatorStatus that = (TopNOperatorStatus) o;
129-
return occupiedRows == that.occupiedRows
157+
return receiveNanos == that.receiveNanos
158+
&& emitNanos == that.emitNanos
159+
&& occupiedRows == that.occupiedRows
130160
&& ramBytesUsed == that.ramBytesUsed
131161
&& pagesReceived == that.pagesReceived
132162
&& pagesEmitted == that.pagesEmitted
@@ -136,7 +166,7 @@ public boolean equals(Object o) {
136166

137167
@Override
138168
public int hashCode() {
139-
return Objects.hash(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
169+
return Objects.hash(receiveNanos, emitNanos, occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
140170
}
141171

142172
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase<TopNOperatorStatus> {
1818
public static TopNOperatorStatus simple() {
19-
return new TopNOperatorStatus(10, 2000, 123, 123, 111, 222);
19+
return new TopNOperatorStatus(100, 40, 10, 2000, 123, 123, 111, 222);
2020
}
2121

2222
public static String simpleToJson() {
2323
return """
2424
{
25+
"receive_nanos" : 100,
26+
"emit_nanos" : 40,
2527
"occupied_rows" : 10,
2628
"ram_bytes_used" : 2000,
2729
"ram_used" : "1.9kb",
@@ -44,6 +46,8 @@ protected Writeable.Reader<TopNOperatorStatus> instanceReader() {
4446
@Override
4547
protected TopNOperatorStatus createTestInstance() {
4648
return new TopNOperatorStatus(
49+
randomNonNegativeLong(),
50+
randomNonNegativeLong(),
4751
randomNonNegativeInt(),
4852
randomNonNegativeLong(),
4953
randomNonNegativeInt(),
@@ -55,34 +59,51 @@ protected TopNOperatorStatus createTestInstance() {
5559

5660
@Override
5761
protected TopNOperatorStatus mutateInstance(TopNOperatorStatus instance) {
62+
long receiveNanos = instance.receiveNanos();
63+
long emitNanos = instance.emitNanos();
5864
int occupiedRows = instance.occupiedRows();
5965
long ramBytesUsed = instance.ramBytesUsed();
6066
int pagesReceived = instance.pagesReceived();
6167
int pagesEmitted = instance.pagesEmitted();
6268
long rowsReceived = instance.rowsReceived();
6369
long rowsEmitted = instance.rowsEmitted();
64-
switch (between(0, 5)) {
70+
switch (between(0, 7)) {
6571
case 0:
66-
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
72+
receiveNanos = randomValueOtherThan(receiveNanos, ESTestCase::randomNonNegativeLong);
6773
break;
6874
case 1:
69-
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
75+
emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
7076
break;
7177
case 2:
72-
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
78+
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
7379
break;
7480
case 3:
75-
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
81+
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
7682
break;
7783
case 4:
78-
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
84+
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
7985
break;
8086
case 5:
87+
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
88+
break;
89+
case 6:
90+
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
91+
break;
92+
case 7:
8193
rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
8294
break;
8395
default:
8496
throw new IllegalArgumentException();
8597
}
86-
return new TopNOperatorStatus(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
98+
return new TopNOperatorStatus(
99+
receiveNanos,
100+
emitNanos,
101+
occupiedRows,
102+
ramBytesUsed,
103+
pagesReceived,
104+
pagesEmitted,
105+
rowsReceived,
106+
rowsEmitted
107+
);
87108
}
88109
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/NamedWriteablesTests.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)