From 7877fb3003f2ed46cbdfdf40cdfb79b759f97eb4 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 18 Jul 2025 15:07:38 -0400 Subject: [PATCH] ESQL: Add times to topn status Adds times to the TopNOperator status, specifically the nanoseconds spent receiving the values and the nanoseconds spent emitting the values: ``` { "operator" : "TopNOperator[count=0/1000, elementTypes=[BYTES_REF, DOUBLE], encoders=[UTF8TopNEncoder, DefaultSortable], sortOrders=[SortOrder[channel=1, asc=false, nullsFirst=true]]]", "status" : { "receive_nanos" : 193415, <--- this row "emit_nanos" : 61, <--- and this row "occupied_rows" : 0, "ram_bytes_used" : 4296, "ram_used" : "4.1kb", "pages_received" : 1, "pages_emitted" : 1, "rows_received" : 1000, "rows_emitted" : 1000 } } ``` --- .../org/elasticsearch/TransportVersions.java | 1 + .../compute/operator/topn/TopNOperator.java | 18 +++++++- .../operator/topn/TopNOperatorStatus.java | 34 ++++++++++++++- .../topn/TopNOperatorStatusTests.java | 37 +++++++++++++---- .../esql/action/NamedWriteablesTests.java | 41 ------------------- 5 files changed, 79 insertions(+), 52 deletions(-) delete mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/NamedWriteablesTests.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 11a0103cd22e0..a6eb6e07fd5a9 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -342,6 +342,7 @@ static TransportVersion def(int id) { public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00); public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00); + public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_124_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index fde51d4642ae0..a2ab2e516b2e1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -308,6 +308,9 @@ public String describe() { private Iterator output; + private long receiveNanos; + private long emitNanos; + /** * Count of pages that have been received by this operator. */ @@ -387,6 +390,7 @@ public boolean needsInput() { @Override public void addInput(Page page) { + long start = System.nanoTime(); /* * Since row tracks memory we have to be careful to close any unused rows, * including any rows that fail while constructing because they allocate @@ -423,6 +427,7 @@ public void addInput(Page page) { pagesReceived++; rowsReceived += page.getPositionCount(); } + receiveNanos += System.nanoTime() - start; } @Override @@ -548,9 +553,11 @@ public Page getOutput() { if (output == null || output.hasNext() == false) { return null; } + long start = System.nanoTime(); Page ret = output.next(); pagesEmitted++; rowsEmitted += ret.getPositionCount(); + emitNanos += System.nanoTime() - start; return ret; } @@ -588,7 +595,16 @@ public long ramBytesUsed() { @Override public Status status() { - return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); + return new TopNOperatorStatus( + receiveNanos, + emitNanos, + inputQueue.size(), + ramBytesUsed(), + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java index ceccdce529ce8..07222d75cb43c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java @@ -25,6 +25,8 @@ public class TopNOperatorStatus implements Operator.Status { "topn", TopNOperatorStatus::new ); + private final long receiveNanos; + private final long emitNanos; private final int occupiedRows; private final long ramBytesUsed; private final int pagesReceived; @@ -33,6 +35,8 @@ public class TopNOperatorStatus implements Operator.Status { private final long rowsEmitted; public TopNOperatorStatus( + long receiveNanos, + long emitNanos, int occupiedRows, long ramBytesUsed, int pagesReceived, @@ -40,6 +44,8 @@ public TopNOperatorStatus( long rowsReceived, long rowsEmitted ) { + this.receiveNanos = receiveNanos; + this.emitNanos = emitNanos; this.occupiedRows = occupiedRows; this.ramBytesUsed = ramBytesUsed; this.pagesReceived = pagesReceived; @@ -49,6 +55,13 @@ public TopNOperatorStatus( } TopNOperatorStatus(StreamInput in) throws IOException { + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) { + this.receiveNanos = in.readVLong(); + this.emitNanos = in.readVLong(); + } else { + this.receiveNanos = 0; + this.emitNanos = 0; + } this.occupiedRows = in.readVInt(); this.ramBytesUsed = in.readVLong(); @@ -67,6 +80,11 @@ public TopNOperatorStatus( @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) { + out.writeVLong(receiveNanos); + out.writeVLong(emitNanos); + } + out.writeVInt(occupiedRows); out.writeVLong(ramBytesUsed); @@ -83,6 +101,14 @@ public String getWriteableName() { return ENTRY.name; } + public long receiveNanos() { + return receiveNanos; + } + + public long emitNanos() { + return emitNanos; + } + public int occupiedRows() { return occupiedRows; } @@ -110,6 +136,8 @@ public long rowsEmitted() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field("receive_nanos", receiveNanos); + builder.field("emit_nanos", emitNanos); builder.field("occupied_rows", occupiedRows); builder.field("ram_bytes_used", ramBytesUsed); builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed)); @@ -126,7 +154,9 @@ public boolean equals(Object o) { return false; } TopNOperatorStatus that = (TopNOperatorStatus) o; - return occupiedRows == that.occupiedRows + return receiveNanos == that.receiveNanos + && emitNanos == that.emitNanos + && occupiedRows == that.occupiedRows && ramBytesUsed == that.ramBytesUsed && pagesReceived == that.pagesReceived && pagesEmitted == that.pagesEmitted @@ -136,7 +166,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); + return Objects.hash(receiveNanos, emitNanos, occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java index 5faf5159a5465..5d11d37622449 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java @@ -16,12 +16,14 @@ public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase { public static TopNOperatorStatus simple() { - return new TopNOperatorStatus(10, 2000, 123, 123, 111, 222); + return new TopNOperatorStatus(100, 40, 10, 2000, 123, 123, 111, 222); } public static String simpleToJson() { return """ { + "receive_nanos" : 100, + "emit_nanos" : 40, "occupied_rows" : 10, "ram_bytes_used" : 2000, "ram_used" : "1.9kb", @@ -44,6 +46,8 @@ protected Writeable.Reader instanceReader() { @Override protected TopNOperatorStatus createTestInstance() { return new TopNOperatorStatus( + randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeInt(), randomNonNegativeLong(), randomNonNegativeInt(), @@ -55,34 +59,51 @@ protected TopNOperatorStatus createTestInstance() { @Override protected TopNOperatorStatus mutateInstance(TopNOperatorStatus instance) { + long receiveNanos = instance.receiveNanos(); + long emitNanos = instance.emitNanos(); int occupiedRows = instance.occupiedRows(); long ramBytesUsed = instance.ramBytesUsed(); int pagesReceived = instance.pagesReceived(); int pagesEmitted = instance.pagesEmitted(); long rowsReceived = instance.rowsReceived(); long rowsEmitted = instance.rowsEmitted(); - switch (between(0, 5)) { + switch (between(0, 7)) { case 0: - occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt); + receiveNanos = randomValueOtherThan(receiveNanos, ESTestCase::randomNonNegativeLong); break; case 1: - ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong); + emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong); break; case 2: - pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); + occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt); break; case 3: - pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong); break; case 4: - rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); break; case 5: + pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + break; + case 6: + rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + break; + case 7: rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); break; default: throw new IllegalArgumentException(); } - return new TopNOperatorStatus(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); + return new TopNOperatorStatus( + receiveNanos, + emitNanos, + occupiedRows, + ramBytesUsed, + pagesReceived, + pagesEmitted, + rowsReceived, + rowsEmitted + ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/NamedWriteablesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/NamedWriteablesTests.java deleted file mode 100644 index 186120a59d3fe..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/NamedWriteablesTests.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.action; - -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.compute.operator.Operator; -import org.elasticsearch.compute.operator.topn.TopNOperatorStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; - -import static org.hamcrest.Matchers.equalTo; - -public class NamedWriteablesTests extends ESTestCase { - - public void testTopNStatus() throws Exception { - try (EsqlPlugin plugin = new EsqlPlugin(Settings.EMPTY)) { - NamedWriteableRegistry registry = new NamedWriteableRegistry(plugin.getNamedWriteables()); - TopNOperatorStatus origin = new TopNOperatorStatus( - randomNonNegativeInt(), - randomNonNegativeLong(), - randomNonNegativeInt(), - randomNonNegativeInt(), - randomNonNegativeLong(), - randomNonNegativeLong() - ); - TopNOperatorStatus copy = (TopNOperatorStatus) copyNamedWriteable(origin, registry, Operator.Status.class); - assertThat(copy.occupiedRows(), equalTo(origin.occupiedRows())); - assertThat(copy.ramBytesUsed(), equalTo(origin.ramBytesUsed())); - assertThat(copy.pagesReceived(), equalTo(origin.pagesReceived())); - assertThat(copy.pagesEmitted(), equalTo(origin.pagesEmitted())); - assertThat(copy.rowsReceived(), equalTo(origin.rowsReceived())); - assertThat(copy.rowsEmitted(), equalTo(origin.rowsEmitted())); - } - } -}