Skip to content

Commit d071171

Browse files
committed
Emit process_nanos from LookupOperator
1 parent 0de4b92 commit d071171

File tree

8 files changed

+45
-41
lines changed

8 files changed

+45
-41
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ static TransportVersion def(int id) {
160160
public static final TransportVersion REVERT_BYTE_SIZE_VALUE_ALWAYS_USES_BYTES_1 = def(8_826_00_0);
161161
public static final TransportVersion ESQL_SKIP_ES_INDEX_SERIALIZATION = def(8_827_00_0);
162162
public static final TransportVersion ADD_INDEX_BLOCK_TWO_PHASE = def(8_828_00_0);
163+
public static final TransportVersion ESQL_PROFILE_ASYNC_NANOS = def(8_829_00_0);
163164

164165
/*
165166
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,10 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws
188188
if (builder.humanReadable()) {
189189
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
190190
}
191-
return builder.field("pages_processed", pagesProcessed).field("rows_received", rowsReceived).field("rows_emitted", rowsEmitted);
191+
builder.field("pages_processed", pagesProcessed);
192+
builder.field("rows_received", rowsReceived);
193+
builder.field("rows_emitted", rowsEmitted);
194+
return builder;
192195
}
193196

194197
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public abstract class AsyncOperator<Fetched> implements Operator {
4545
private final DriverContext driverContext;
4646

4747
private final int maxOutstandingRequests;
48-
private final LongAdder totalTimeInNanos = new LongAdder();
48+
private final LongAdder processNanos = new LongAdder();
4949

5050
private boolean finished = false;
5151
private volatile boolean closed = false;
@@ -98,7 +98,7 @@ public void addInput(Page input) {
9898
final long startNanos = System.nanoTime();
9999
performAsync(input, ActionListener.runAfter(listener, () -> {
100100
driverContext.removeAsyncAction();
101-
totalTimeInNanos.add(System.nanoTime() - startNanos);
101+
processNanos.add(System.nanoTime() - startNanos);
102102
}));
103103
success = true;
104104
} finally {
@@ -231,15 +231,11 @@ public IsBlockedResult isBlocked() {
231231

232232
@Override
233233
public final Operator.Status status() {
234-
return status(
235-
Math.max(0L, checkpoint.getMaxSeqNo()),
236-
Math.max(0L, checkpoint.getProcessedCheckpoint()),
237-
TimeValue.timeValueNanos(totalTimeInNanos.sum()).millis()
238-
);
234+
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
239235
}
240236

241-
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
242-
return new Status(receivedPages, completedPages, totalTimeInMillis);
237+
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
238+
return new Status(receivedPages, completedPages, processNanos);
243239
}
244240

245241
public static class Status implements Operator.Status {
@@ -251,25 +247,31 @@ public static class Status implements Operator.Status {
251247

252248
final long receivedPages;
253249
final long completedPages;
254-
final long totalTimeInMillis;
250+
final long processNanos;
255251

256-
protected Status(long receivedPages, long completedPages, long totalTimeInMillis) {
252+
protected Status(long receivedPages, long completedPages, long processNanos) {
257253
this.receivedPages = receivedPages;
258254
this.completedPages = completedPages;
259-
this.totalTimeInMillis = totalTimeInMillis;
255+
this.processNanos = processNanos;
260256
}
261257

262258
protected Status(StreamInput in) throws IOException {
263259
this.receivedPages = in.readVLong();
264260
this.completedPages = in.readVLong();
265-
this.totalTimeInMillis = in.readVLong();
261+
this.processNanos = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ASYNC_NANOS)
262+
? in.readVLong()
263+
: TimeValue.timeValueMillis(in.readVLong()).nanos();
266264
}
267265

268266
@Override
269267
public void writeTo(StreamOutput out) throws IOException {
270268
out.writeVLong(receivedPages);
271269
out.writeVLong(completedPages);
272-
out.writeVLong(totalTimeInMillis);
270+
out.writeVLong(
271+
out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ASYNC_NANOS)
272+
? processNanos
273+
: TimeValue.timeValueNanos(processNanos).millis()
274+
);
273275
}
274276

275277
public long receivedPages() {
@@ -280,8 +282,8 @@ public long completedPages() {
280282
return completedPages;
281283
}
282284

283-
public long totalTimeInMillis() {
284-
return totalTimeInMillis;
285+
public long procesNanos() {
286+
return processNanos;
285287
}
286288

287289
@Override
@@ -297,12 +299,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
297299
}
298300

299301
protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException {
302+
builder.field("process_nanos", processNanos);
303+
if (builder.humanReadable()) {
304+
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
305+
}
300306
builder.field("received_pages", receivedPages);
301307
builder.field("completed_pages", completedPages);
302-
builder.field("total_time_in_millis", totalTimeInMillis);
303-
if (totalTimeInMillis >= 0) {
304-
builder.field("total_time", TimeValue.timeValueMillis(totalTimeInMillis));
305-
}
306308
return builder;
307309
}
308310

@@ -311,14 +313,12 @@ public boolean equals(Object o) {
311313
if (this == o) return true;
312314
if (o == null || getClass() != o.getClass()) return false;
313315
Status status = (Status) o;
314-
return receivedPages == status.receivedPages
315-
&& completedPages == status.completedPages
316-
&& totalTimeInMillis == status.totalTimeInMillis;
316+
return receivedPages == status.receivedPages && completedPages == status.completedPages && processNanos == status.processNanos;
317317
}
318318

319319
@Override
320320
public int hashCode() {
321-
return Objects.hash(receivedPages, completedPages, totalTimeInMillis);
321+
return Objects.hash(receivedPages, completedPages, processNanos);
322322
}
323323

324324
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IO
3939
case 0 -> new AsyncOperator.Status(
4040
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
4141
in.completedPages(),
42-
in.totalTimeInMillis()
42+
in.procesNanos()
4343
);
4444
case 1 -> new AsyncOperator.Status(
4545
in.receivedPages(),
4646
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
47-
in.totalTimeInMillis()
47+
in.procesNanos()
4848
);
4949
case 2 -> new AsyncOperator.Status(
5050
in.receivedPages(),
5151
in.completedPages(),
52-
randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
52+
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
5353
);
5454
default -> throw new AssertionError("unknown ");
5555
};

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ protected void doClose() {
175175
}
176176

177177
@Override
178-
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
179-
return new EnrichLookupOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms);
178+
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
179+
return new EnrichLookupOperator.Status(receivedPages, completedPages, processNanos, totalTerms);
180180
}
181181

182182
public static class Status extends AsyncOperator.Status {
@@ -188,8 +188,8 @@ public static class Status extends AsyncOperator.Status {
188188

189189
final long totalTerms;
190190

191-
Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) {
192-
super(receivedPages, completedPages, totalTimeInMillis);
191+
Status(long receivedPages, long completedPages, long processNanos, long totalTerms) {
192+
super(receivedPages, completedPages, processNanos);
193193
this.totalTerms = totalTerms;
194194
}
195195

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ protected void doClose() {
214214
}
215215

216216
@Override
217-
protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
218-
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
217+
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
218+
return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages);
219219
}
220220

221221
public static class Status extends AsyncOperator.Status {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,25 @@ protected EnrichLookupOperator.Status mutateInstance(EnrichLookupOperator.Status
4141
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
4242
in.completedPages(),
4343
in.totalTerms,
44-
in.totalTimeInMillis()
44+
in.procesNanos()
4545
);
4646
case 1 -> new EnrichLookupOperator.Status(
4747
in.receivedPages(),
4848
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
4949
in.totalTerms,
50-
in.totalTimeInMillis()
50+
in.procesNanos()
5151
);
5252
case 2 -> new EnrichLookupOperator.Status(
5353
in.receivedPages(),
5454
in.completedPages(),
5555
randomValueOtherThan(in.totalTerms, ESTestCase::randomNonNegativeLong),
56-
in.totalTimeInMillis()
56+
in.procesNanos()
5757
);
5858
case 3 -> new EnrichLookupOperator.Status(
5959
in.receivedPages(),
6060
in.completedPages(),
6161
in.totalTerms,
62-
randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
62+
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
6363
);
6464
default -> throw new AssertionError("unknown ");
6565
};

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorStatusTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,18 @@ protected LookupFromIndexOperator.Status createTestInstance() {
3838
protected LookupFromIndexOperator.Status mutateInstance(LookupFromIndexOperator.Status in) throws IOException {
3939
long receivedPages = in.receivedPages();
4040
long completedPages = in.completedPages();
41-
long totalTimeInMillis = in.totalTimeInMillis();
41+
long procesNanos = in.procesNanos();
4242
long totalTerms = in.totalTerms();
4343
long emittedPages = in.emittedPages();
4444
switch (randomIntBetween(0, 4)) {
4545
case 0 -> receivedPages = randomValueOtherThan(receivedPages, ESTestCase::randomNonNegativeLong);
4646
case 1 -> completedPages = randomValueOtherThan(completedPages, ESTestCase::randomNonNegativeLong);
47-
case 2 -> totalTimeInMillis = randomValueOtherThan(totalTimeInMillis, ESTestCase::randomNonNegativeLong);
47+
case 2 -> procesNanos = randomValueOtherThan(procesNanos, ESTestCase::randomNonNegativeLong);
4848
case 3 -> totalTerms = randomValueOtherThan(totalTerms, ESTestCase::randomNonNegativeLong);
4949
case 4 -> emittedPages = randomValueOtherThan(emittedPages, ESTestCase::randomNonNegativeLong);
5050
default -> throw new UnsupportedOperationException();
5151
}
52-
return new LookupFromIndexOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms, emittedPages);
52+
return new LookupFromIndexOperator.Status(receivedPages, completedPages, procesNanos, totalTerms, emittedPages);
5353
}
5454

5555
public void testToXContent() {

0 commit comments

Comments
 (0)