Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
283c2d5
Fixed status variables naming
ivancea Aug 12, 2025
010d915
Fixed notifying threads before updating stats and warnings
ivancea Aug 12, 2025
262f488
Fixed status values for AsyncOperator
ivancea Aug 12, 2025
35d2ca2
Added emittedRows and transport version
ivancea Aug 12, 2025
916f591
Added capability for the fix
ivancea Aug 12, 2025
5d74c2d
Unmuted tests
ivancea Aug 12, 2025
007fd4a
Undo extracted PR changes
ivancea Aug 13, 2025
0da2e4e
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 13, 2025
55f8804
Remove unused supress warnings
ivancea Aug 13, 2025
5535503
Remove TODO
ivancea Aug 13, 2025
ba84ddd
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 13, 2025
4afdc80
Update docs/changelog/132738.yaml
ivancea Aug 13, 2025
121ce8a
Rename received_pages to pages_received, and similar status fields
ivancea Aug 13, 2025
a775efb
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 14, 2025
5dbbd22
Update test after merge
ivancea Aug 14, 2025
1b5ae7d
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 19, 2025
bf4aea8
Improve Lookup operator test for status
ivancea Aug 19, 2025
e2de5ed
[CI] Auto commit changes from spotless
Aug 19, 2025
6a505b0
Fix assertion type for map numbers
ivancea Aug 19, 2025
25ef602
Fix Lucene tests operator status check
ivancea Aug 19, 2025
e8e9226
Update Limit operator tests
ivancea Aug 19, 2025
9c64a0b
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 20, 2025
523d542
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 21, 2025
b56abca
Remove custom matching functions
ivancea Aug 21, 2025
675d171
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132738.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132738
summary: Fix `AsyncOperator` status values and add emitted rows
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_139_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public IsBlockedResult isBlocked() {

@Override
public final Operator.Status status() {
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 These are correct usages.

}

protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
Expand Down Expand Up @@ -292,7 +292,7 @@ public long completedPages() {
return completedPages;
}

public long procesNanos() {
public long processNanos() {
return processNanos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IO
case 0 -> new AsyncOperator.Status(
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
in.completedPages(),
in.procesNanos()
in.processNanos()
);
case 1 -> new AsyncOperator.Status(
in.receivedPages(),
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
in.procesNanos()
in.processNanos()
);
case 2 -> new AsyncOperator.Status(
in.receivedPages(),
in.completedPages(),
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
randomValueOtherThan(in.processNanos(), ESTestCase::randomNonNegativeLong)
);
default -> throw new AssertionError("unknown ");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.AsyncOperator;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SinkOperator;
Expand All @@ -20,12 +22,19 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;

/**
Expand Down Expand Up @@ -105,37 +114,107 @@ public final void testSimpleToString() {
}

/**
* Ensures that the Operator.Status of this operator has the standard fields.
* Ensures that the Operator.Status of this operator has the standard fields, set to 0.
*/
public void testOperatorStatus() throws IOException {
public void testEmptyOperatorStatus() {
DriverContext driverContext = driverContext();
try (var operator = simple().get(driverContext)) {
Operator.Status status = operator.status();

assumeTrue("Operator does not provide a status", status != null);

var xContent = XContentType.JSON.xContent();
try (var xContentBuilder = XContentBuilder.builder(xContent)) {
status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);

var bytesReference = BytesReference.bytes(xContentBuilder);
var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2();

if (operator instanceof SourceOperator) {
assertThat(map, hasKey("pages_emitted"));
assertThat(map, hasKey("rows_emitted"));
} else if (operator instanceof SinkOperator) {
assertThat(map, hasKey("pages_received"));
assertThat(map, hasKey("rows_received"));
} else {
assertThat(map, either(hasKey("pages_processed")).or(both(hasKey("pages_received")).and(hasKey("pages_emitted"))));
assertThat(map, hasKey("rows_received"));
assertThat(map, hasKey("rows_emitted"));
}
assertOperatorStatus(operator, List.of(), List.of());
}
}

/**
* Assert the operator has a status, and its values are acceptable.
* Delegates specific checks to {link #checkOperatorStatusFields}, which may be overridden.
*/
protected void assertOperatorStatus(Operator operator, List<Page> input, List<Page> output) {
Operator.Status status = operator.status();

if (status == null) {
// Operator doesn't provide a status
return;
}

var xContent = XContentType.JSON.xContent();
try (var xContentBuilder = XContentBuilder.builder(xContent)) {
status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);

var bytesReference = BytesReference.bytes(xContentBuilder);
var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2();

// For some operators, we don't know here if they consumed/processed all the input.
// That check should be done in the operator test
var nonNegativeMatcher = input.isEmpty() ? matchNumberEqualTo(0) : matchNumberGreaterThanOrEqualTo(0);
var inputPagesMatcher = matchNumberEqualTo(input.size());
var totalInputRows = input.stream().mapToLong(Page::getPositionCount).sum();
var inputRowsMatcher = matchNumberEqualTo(totalInputRows);
var outputPagesMatcher = matchNumberEqualTo(output.size());
var totalOutputRows = output.stream().mapToLong(Page::getPositionCount).sum();
var outputRowsMatcher = matchNumberEqualTo(totalOutputRows);

if (operator instanceof SourceOperator) {
assertThat(map, hasEntry(is("pages_emitted"), outputPagesMatcher));
assertThat(map, hasEntry(is("rows_emitted"), outputRowsMatcher));
} else if (operator instanceof SinkOperator) {
assertThat(map, hasEntry(is("pages_received"), inputPagesMatcher));
assertThat(map, hasEntry(is("rows_received"), inputRowsMatcher));
} else if (operator instanceof AsyncOperator) {
assertThat(map, hasEntry(is("received_pages"), nonNegativeMatcher));
assertThat(map, hasEntry(is("completed_pages"), nonNegativeMatcher));
assertThat(map, hasEntry(is("process_nanos"), nonNegativeMatcher));
Copy link
Contributor Author

@ivancea ivancea Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async operator status fields are serialized as "received_pages" instead of "pages_received" (Like the others operators).
Should I normalize it here? It's not even a transport version change, just some unit tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to make this consistent with the other operators.

} else {
assertThat(
map,
Matchers.<Map<String, Object>>either(hasEntry(is("pages_processed"), nonNegativeMatcher))
.or(
Matchers.<Map<String, Object>>both(hasEntry(is("pages_received"), nonNegativeMatcher))
.and(hasEntry(is("pages_emitted"), outputPagesMatcher))
)
);
assertThat(map, hasEntry(is("rows_received"), nonNegativeMatcher));
assertThat(map, hasEntry(is("rows_emitted"), outputRowsMatcher));
}

checkOperatorStatusFields(map, input, output);
} catch (IOException e) {
fail(e, "Failed to convert operator status to XContent");
}
}

protected final Matcher<Object> matchNumberEqualTo(Number value) {
return wholeMatcher(comparesEqualTo(value.intValue()), comparesEqualTo(value.longValue()));
}

protected final Matcher<Object> matchNumberGreaterThanOrEqualTo(Number value) {
return wholeMatcher(greaterThanOrEqualTo(value.intValue()), greaterThanOrEqualTo(value.longValue()));
}

@SuppressWarnings("unchecked")
protected final Matcher<Object> wholeMatcher(Matcher<Integer> integerMatcher, Matcher<Long> longMatcher) {
return either(both(instanceOf(Integer.class)).and((Matcher<? super Object>) (Matcher<?>) integerMatcher)).or(
both(instanceOf(Long.class)).and((Matcher<? super Object>) (Matcher<?>) longMatcher)
);
}

/**
* Tests the non-standard operator status fields.
* <p>
* The standard fields (already tested in the generic test) are:
* </p>
* <ul>
* <li>pages_received</li>
* <li>rows_received</li>
* <li>pages_processed</li>
* <li>pages_emitted</li>
* <li>rows_emitted</li>
* </ul>
* <p>
* To be overridden by subclasses.
* </p>
* @param status The XContent map representation of the status.
*/
protected void checkOperatorStatusFields(Map<String, Object> status, List<Page> input, List<Page> output) {}

/**
* A {@link DriverContext} with a nonBreakingBigArrays.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ protected final void assertSimple(DriverContext context, int size) {
var operator = simple().get(context);
List<Page> results = drive(operator, input.iterator(), context);
assertSimpleOutput(origInput, results);
assertOperatorStatus(operator, origInput, results);
assertThat(context.breaker().getUsed(), equalTo(0L));

// Release all result blocks. After this, all input blocks should be released as well, otherwise we have a leak.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -99,6 +100,10 @@ public Operator get(DriverContext driverContext) {
* Total number of pages emitted by this {@link Operator}.
*/
private long emittedPages = 0L;
/**
* Total number of rows emitted by this {@link Operator}.
*/
private long emittedRows = 0L;
/**
* The ongoing join or {@code null} none is ongoing at the moment.
*/
Expand Down Expand Up @@ -167,7 +172,9 @@ public Page getOutput() {
Page right = ongoing.itr.next();
emittedPages++;
try {
return ongoing.join.join(right);
Page joinedPage = ongoing.join.join(right);
emittedRows += joinedPage.getPositionCount();
return joinedPage;
} finally {
right.releaseBlocks();
}
Expand All @@ -180,6 +187,7 @@ public Page getOutput() {
return null;
}
emittedPages++;
emittedRows += remaining.get().getPositionCount();
return remaining.get();
}

Expand Down Expand Up @@ -225,7 +233,7 @@ protected void doClose() {

@Override
protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages);
return new LookupFromIndexOperator.Status(receivedPages, completedPages, processNanos, totalTerms, emittedPages, emittedRows);
}

public static class Status extends AsyncOperator.Status {
Expand All @@ -240,24 +248,38 @@ public static class Status extends AsyncOperator.Status {
* Total number of pages emitted by this {@link Operator}.
*/
private final long emittedPages;
/**
* Total number of rows emitted by this {@link Operator}.
*/
private final long emittedRows;

Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms, long emittedPages) {
super(receivedPages, completedPages, totalTimeInMillis);
Status(long receivedPages, long completedPages, long processNanos, long totalTerms, long emittedPages, long emittedRows) {
super(receivedPages, completedPages, processNanos);
this.totalTerms = totalTerms;
this.emittedPages = emittedPages;
this.emittedRows = emittedRows;
}

Status(StreamInput in) throws IOException {
super(in);
this.totalTerms = in.readVLong();
this.emittedPages = in.readVLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_OPERATOR_EMITTED_ROWS)) {
this.emittedRows = in.readVLong();
} else {
this.emittedRows = 0L;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(totalTerms);
out.writeVLong(emittedPages);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_OPERATOR_EMITTED_ROWS)) {
out.writeVLong(emittedRows);
}

}

@Override
Expand All @@ -269,6 +291,10 @@ public long emittedPages() {
return emittedPages;
}

public long emittedRows() {
return emittedPages;
}

public long totalTerms() {
return totalTerms;
}
Expand All @@ -277,8 +303,9 @@ public long totalTerms() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
super.innerToXContent(builder);
builder.field("emitted_pages", emittedPages());
builder.field("total_terms", totalTerms());
builder.field("emitted_pages", emittedPages);
builder.field("emitted_rows", emittedRows);
builder.field("total_terms", totalTerms);
return builder.endObject();
}

Expand All @@ -291,12 +318,12 @@ public boolean equals(Object o) {
return false;
}
Status status = (Status) o;
return totalTerms == status.totalTerms && emittedPages == status.emittedPages;
return totalTerms == status.totalTerms && emittedPages == status.emittedPages && emittedRows == status.emittedRows;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), totalTerms, emittedPages);
return Objects.hash(super.hashCode(), totalTerms, emittedPages, emittedRows);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ protected EnrichLookupOperator.Status mutateInstance(EnrichLookupOperator.Status
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
in.completedPages(),
in.totalTerms,
in.procesNanos()
in.processNanos()
);
case 1 -> new EnrichLookupOperator.Status(
in.receivedPages(),
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
in.totalTerms,
in.procesNanos()
in.processNanos()
);
case 2 -> new EnrichLookupOperator.Status(
in.receivedPages(),
in.completedPages(),
randomValueOtherThan(in.totalTerms, ESTestCase::randomNonNegativeLong),
in.procesNanos()
in.processNanos()
);
case 3 -> new EnrichLookupOperator.Status(
in.receivedPages(),
in.completedPages(),
in.totalTerms,
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
randomValueOtherThan(in.processNanos(), ESTestCase::randomNonNegativeLong)
);
default -> throw new AssertionError("unknown ");
};
Expand Down
Loading