Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ public String describe() {

private Iterator<Page> output;

private long receiveNanos;
private long emitNanos;

/**
* Count of pages that have been received by this operator.
*/
Expand Down Expand Up @@ -387,6 +390,7 @@ public boolean needsInput() {

@Override
public void addInput(Page page) {
long start = System.nanoTime();
/*
* Since row tracks memory we have to be careful to close any unused rows,
* including any rows that fail while constructing because they allocate
Expand Down Expand Up @@ -422,13 +426,16 @@ public void addInput(Page page) {
page.releaseBlocks();
pagesReceived++;
rowsReceived += page.getPositionCount();
receiveNanos += System.nanoTime() - start;
}
}

@Override
public void finish() {
if (output == null) {
long start = System.nanoTime();
output = toPages();
emitNanos += System.nanoTime() - start;
}
}

Expand Down Expand Up @@ -588,7 +595,16 @@ public long ramBytesUsed() {

@Override
public Status status() {
return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
return new TopNOperatorStatus(
receiveNanos,
emitNanos,
inputQueue.size(),
ramBytesUsed(),
pagesReceived,
pagesEmitted,
rowsReceived,
rowsEmitted
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -25,6 +26,8 @@ public class TopNOperatorStatus implements Operator.Status {
"topn",
TopNOperatorStatus::new
);
private final long receiveNanos;
private final long emitNanos;
private final int occupiedRows;
private final long ramBytesUsed;
private final int pagesReceived;
Expand All @@ -33,13 +36,17 @@ public class TopNOperatorStatus implements Operator.Status {
private final long rowsEmitted;

public TopNOperatorStatus(
long receiveNanos,
long emitNanos,
int occupiedRows,
long ramBytesUsed,
int pagesReceived,
int pagesEmitted,
long rowsReceived,
long rowsEmitted
) {
this.receiveNanos = receiveNanos;
this.emitNanos = emitNanos;
this.occupiedRows = occupiedRows;
this.ramBytesUsed = ramBytesUsed;
this.pagesReceived = pagesReceived;
Expand All @@ -49,6 +56,13 @@ public TopNOperatorStatus(
}

TopNOperatorStatus(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
this.receiveNanos = in.readVLong();
this.emitNanos = in.readVLong();
} else {
this.receiveNanos = 0;
this.emitNanos = 0;
}
this.occupiedRows = in.readVInt();
this.ramBytesUsed = in.readVLong();

Expand All @@ -67,6 +81,11 @@ public TopNOperatorStatus(

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
out.writeVLong(receiveNanos);
out.writeVLong(emitNanos);
}

out.writeVInt(occupiedRows);
out.writeVLong(ramBytesUsed);

Expand All @@ -83,6 +102,14 @@ public String getWriteableName() {
return ENTRY.name;
}

public long receiveNanos() {
return receiveNanos;
}

public long emitNanos() {
return emitNanos;
}

public int occupiedRows() {
return occupiedRows;
}
Expand Down Expand Up @@ -110,6 +137,14 @@ public long rowsEmitted() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("receive_nanos", receiveNanos);
if (builder.humanReadable()) {
builder.field("receive_time", TimeValue.timeValueNanos(receiveNanos).toString());
}
builder.field("emit_nanos", emitNanos);
if (builder.humanReadable()) {
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos).toString());
}
builder.field("occupied_rows", occupiedRows);
builder.field("ram_bytes_used", ramBytesUsed);
builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed));
Expand All @@ -126,7 +161,9 @@ public boolean equals(Object o) {
return false;
}
TopNOperatorStatus that = (TopNOperatorStatus) o;
return occupiedRows == that.occupiedRows
return receiveNanos == that.receiveNanos
&& emitNanos == that.emitNanos
&& occupiedRows == that.occupiedRows
&& ramBytesUsed == that.ramBytesUsed
&& pagesReceived == that.pagesReceived
&& pagesEmitted == that.pagesEmitted
Expand All @@ -136,7 +173,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
return Objects.hash(receiveNanos, emitNanos, occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase<TopNOperatorStatus> {
public static TopNOperatorStatus simple() {
return new TopNOperatorStatus(10, 2000, 123, 123, 111, 222);
return new TopNOperatorStatus(100, 40, 10, 2000, 123, 123, 111, 222);
}

public static String simpleToJson() {
return """
{
"receive_nanos" : 100,
"receive_time" : "100nanos",
"emit_nanos" : 40,
"emit_time" : "40nanos",
"occupied_rows" : 10,
"ram_bytes_used" : 2000,
"ram_used" : "1.9kb",
Expand All @@ -44,6 +48,8 @@ protected Writeable.Reader<TopNOperatorStatus> instanceReader() {
@Override
protected TopNOperatorStatus createTestInstance() {
return new TopNOperatorStatus(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeInt(),
Expand All @@ -55,34 +61,51 @@ protected TopNOperatorStatus createTestInstance() {

@Override
protected TopNOperatorStatus mutateInstance(TopNOperatorStatus instance) {
long receiveNanos = instance.receiveNanos();
long emitNanos = instance.emitNanos();
int occupiedRows = instance.occupiedRows();
long ramBytesUsed = instance.ramBytesUsed();
int pagesReceived = instance.pagesReceived();
int pagesEmitted = instance.pagesEmitted();
long rowsReceived = instance.rowsReceived();
long rowsEmitted = instance.rowsEmitted();
switch (between(0, 5)) {
switch (between(0, 7)) {
case 0:
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
receiveNanos = randomValueOtherThan(receiveNanos, ESTestCase::randomNonNegativeLong);
break;
case 1:
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
break;
case 2:
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
break;
case 3:
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
break;
case 4:
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
break;
case 5:
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
break;
case 6:
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
break;
case 7:
rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
break;
default:
throw new IllegalArgumentException();
}
return new TopNOperatorStatus(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
return new TopNOperatorStatus(
receiveNanos,
emitNanos,
occupiedRows,
ramBytesUsed,
pagesReceived,
pagesEmitted,
rowsReceived,
rowsEmitted
);
}
}

This file was deleted.