Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ static TransportVersion def(int id) {
public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00);
public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00);
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_124_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ public String describe() {

private Iterator<Page> output;

private long receiveNanos;
private long emitNanos;

/**
* Count of pages that have been received by this operator.
*/
Expand Down Expand Up @@ -387,6 +390,7 @@ public boolean needsInput() {

@Override
public void addInput(Page page) {
long start = System.nanoTime();
/*
* Since row tracks memory we have to be careful to close any unused rows,
* including any rows that fail while constructing because they allocate
Expand Down Expand Up @@ -423,6 +427,7 @@ public void addInput(Page page) {
pagesReceived++;
rowsReceived += page.getPositionCount();
}
receiveNanos += System.nanoTime() - start;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in the finally, in case of exception (?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well, yeah. I think the exception here is generally terminal to the process and we don't return the status, but may as well put it there so it's possible.

}

@Override
Expand Down Expand Up @@ -548,9 +553,11 @@ public Page getOutput() {
if (output == null || output.hasNext() == false) {
return null;
}
long start = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should capture the emit time in finish() (toPages) method instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more closely - of course. Wow. I wasn't paying attention. Thanks!

Page ret = output.next();
pagesEmitted++;
rowsEmitted += ret.getPositionCount();
emitNanos += System.nanoTime() - start;
return ret;
}

Expand Down Expand Up @@ -588,7 +595,16 @@ public long ramBytesUsed() {

@Override
public Status status() {
return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
return new TopNOperatorStatus(
receiveNanos,
emitNanos,
inputQueue.size(),
ramBytesUsed(),
pagesReceived,
pagesEmitted,
rowsReceived,
rowsEmitted
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class TopNOperatorStatus implements Operator.Status {
"topn",
TopNOperatorStatus::new
);
private final long receiveNanos;
private final long emitNanos;
private final int occupiedRows;
private final long ramBytesUsed;
private final int pagesReceived;
Expand All @@ -33,13 +35,17 @@ public class TopNOperatorStatus implements Operator.Status {
private final long rowsEmitted;

public TopNOperatorStatus(
long receiveNanos,
long emitNanos,
int occupiedRows,
long ramBytesUsed,
int pagesReceived,
int pagesEmitted,
long rowsReceived,
long rowsEmitted
) {
this.receiveNanos = receiveNanos;
this.emitNanos = emitNanos;
this.occupiedRows = occupiedRows;
this.ramBytesUsed = ramBytesUsed;
this.pagesReceived = pagesReceived;
Expand All @@ -49,6 +55,13 @@ public TopNOperatorStatus(
}

TopNOperatorStatus(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
this.receiveNanos = in.readVLong();
this.emitNanos = in.readVLong();
} else {
this.receiveNanos = 0;
this.emitNanos = 0;
}
this.occupiedRows = in.readVInt();
this.ramBytesUsed = in.readVLong();

Expand All @@ -67,6 +80,11 @@ public TopNOperatorStatus(

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
out.writeVLong(receiveNanos);
out.writeVLong(emitNanos);
}

out.writeVInt(occupiedRows);
out.writeVLong(ramBytesUsed);

Expand All @@ -83,6 +101,14 @@ public String getWriteableName() {
return ENTRY.name;
}

public long receiveNanos() {
return receiveNanos;
}

public long emitNanos() {
return emitNanos;
}

public int occupiedRows() {
return occupiedRows;
}
Expand Down Expand Up @@ -110,6 +136,8 @@ public long rowsEmitted() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("receive_nanos", receiveNanos);
builder.field("emit_nanos", emitNanos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other statuses also add the "X_time" with the readable time. For example:

builder.field("hash_nanos", hashNanos);
if (builder.humanReadable()) {
builder.field("hash_time", TimeValue.timeValueNanos(hashNanos));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Makes sense. Something I'd have asked for I think.

builder.field("occupied_rows", occupiedRows);
builder.field("ram_bytes_used", ramBytesUsed);
builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed));
Expand All @@ -126,7 +154,9 @@ public boolean equals(Object o) {
return false;
}
TopNOperatorStatus that = (TopNOperatorStatus) o;
return occupiedRows == that.occupiedRows
return receiveNanos == that.receiveNanos
&& emitNanos == that.emitNanos
&& occupiedRows == that.occupiedRows
&& ramBytesUsed == that.ramBytesUsed
&& pagesReceived == that.pagesReceived
&& pagesEmitted == that.pagesEmitted
Expand All @@ -136,7 +166,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
return Objects.hash(receiveNanos, emitNanos, occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase<TopNOperatorStatus> {
public static TopNOperatorStatus simple() {
return new TopNOperatorStatus(10, 2000, 123, 123, 111, 222);
return new TopNOperatorStatus(100, 40, 10, 2000, 123, 123, 111, 222);
}

public static String simpleToJson() {
return """
{
"receive_nanos" : 100,
"emit_nanos" : 40,
"occupied_rows" : 10,
"ram_bytes_used" : 2000,
"ram_used" : "1.9kb",
Expand All @@ -44,6 +46,8 @@ protected Writeable.Reader<TopNOperatorStatus> instanceReader() {
@Override
protected TopNOperatorStatus createTestInstance() {
return new TopNOperatorStatus(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeInt(),
Expand All @@ -55,34 +59,51 @@ protected TopNOperatorStatus createTestInstance() {

@Override
protected TopNOperatorStatus mutateInstance(TopNOperatorStatus instance) {
long receiveNanos = instance.receiveNanos();
long emitNanos = instance.emitNanos();
int occupiedRows = instance.occupiedRows();
long ramBytesUsed = instance.ramBytesUsed();
int pagesReceived = instance.pagesReceived();
int pagesEmitted = instance.pagesEmitted();
long rowsReceived = instance.rowsReceived();
long rowsEmitted = instance.rowsEmitted();
switch (between(0, 5)) {
switch (between(0, 7)) {
case 0:
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
receiveNanos = randomValueOtherThan(receiveNanos, ESTestCase::randomNonNegativeLong);
break;
case 1:
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
break;
case 2:
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
break;
case 3:
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
break;
case 4:
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
break;
case 5:
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
break;
case 6:
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
break;
case 7:
rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
break;
default:
throw new IllegalArgumentException();
}
return new TopNOperatorStatus(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
return new TopNOperatorStatus(
receiveNanos,
emitNanos,
occupiedRows,
ramBytesUsed,
pagesReceived,
pagesEmitted,
rowsReceived,
rowsEmitted
);
}
}

This file was deleted.