Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
283c2d5
Fixed status variables naming
ivancea Aug 12, 2025
010d915
Fixed notifying threads before updating stats and warnings
ivancea Aug 12, 2025
262f488
Fixed status values for AsyncOperator
ivancea Aug 12, 2025
35d2ca2
Added emittedRows and transport version
ivancea Aug 12, 2025
916f591
Added capability for the fix
ivancea Aug 12, 2025
5d74c2d
Unmuted tests
ivancea Aug 12, 2025
007fd4a
Undo extracted PR changes
ivancea Aug 13, 2025
0da2e4e
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 13, 2025
55f8804
Remove unused supress warnings
ivancea Aug 13, 2025
5535503
Remove TODO
ivancea Aug 13, 2025
ba84ddd
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 13, 2025
4afdc80
Update docs/changelog/132738.yaml
ivancea Aug 13, 2025
121ce8a
Rename received_pages to pages_received, and similar status fields
ivancea Aug 13, 2025
a775efb
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 14, 2025
5dbbd22
Update test after merge
ivancea Aug 14, 2025
1b5ae7d
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 19, 2025
bf4aea8
Improve Lookup operator test for status
ivancea Aug 19, 2025
e2de5ed
[CI] Auto commit changes from spotless
Aug 19, 2025
6a505b0
Fix assertion type for map numbers
ivancea Aug 19, 2025
25ef602
Fix Lucene tests operator status check
ivancea Aug 19, 2025
e8e9226
Update Limit operator tests
ivancea Aug 19, 2025
9c64a0b
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 20, 2025
523d542
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 21, 2025
b56abca
Remove custom matching functions
ivancea Aug 21, 2025
675d171
Merge branch 'main' into esql-lookup-warnings-status-race
ivancea Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132738.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132738
summary: Fix `AsyncOperator` status values and add emitted rows
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RESOLVE_INDEX_MODE_ADDED = def(9_141_0_00);
public static final TransportVersion DATA_STREAM_WRITE_INDEX_ONLY_SETTINGS = def(9_142_0_00);
public static final TransportVersion SCRIPT_RESCORER = def(9_143_0_00);
public static final TransportVersion ESQL_LOOKUP_OPERATOR_EMITTED_ROWS = def(9_144_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public IsBlockedResult isBlocked() {

@Override
public final Operator.Status status() {
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 These are correct usages.

}

protected Operator.Status status(long receivedPages, long completedPages, long processNanos) {
Expand Down Expand Up @@ -289,7 +289,7 @@ public long completedPages() {
return completedPages;
}

public long procesNanos() {
public long processNanos() {
return processNanos;
}

Expand All @@ -310,8 +310,8 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws
if (builder.humanReadable()) {
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
}
builder.field("received_pages", receivedPages);
builder.field("completed_pages", completedPages);
builder.field("pages_received", receivedPages);
builder.field("pages_completed", completedPages);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IO
case 0 -> new AsyncOperator.Status(
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
in.completedPages(),
in.procesNanos()
in.processNanos()
);
case 1 -> new AsyncOperator.Status(
in.receivedPages(),
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
in.procesNanos()
in.processNanos()
);
case 2 -> new AsyncOperator.Status(
in.receivedPages(),
in.completedPages(),
randomValueOtherThan(in.procesNanos(), ESTestCase::randomNonNegativeLong)
randomValueOtherThan(in.processNanos(), ESTestCase::randomNonNegativeLong)
);
default -> throw new AssertionError("unknown ");
};
Expand All @@ -62,8 +62,8 @@ public void testToXContent() {
{
"process_nanos" : 10,
"process_time" : "10nanos",
"received_pages" : 100,
"completed_pages" : 50
"pages_received" : 100,
"pages_completed" : 50
}"""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected Matcher<String> expectedToStringOfSimple() {
}

@Override
protected void assertEmptyStatus(Map<String, Object> map) {
protected void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
assertThat(map, nullValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import static org.hamcrest.Matchers.equalTo;

public class LimitStatusTests extends AbstractWireSerializingTestCase<LimitOperator.Status> {
public class LimitOperatorStatusTests extends AbstractWireSerializingTestCase<LimitOperator.Status> {
public void testToXContent() {
assertThat(Strings.toString(new LimitOperator.Status(10, 1, 1, 111, 222)), equalTo("""
{"limit":10,"limit_remaining":1,"pages_processed":1,"rows_received":111,"rows_emitted":222}"""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;

import static org.elasticsearch.compute.test.RandomBlock.randomElementType;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;

Expand Down Expand Up @@ -192,4 +195,17 @@ Block randomBlock(BlockFactory blockFactory, int size) {
}
return RandomBlock.randomBlock(blockFactory, randomElementType(), size, false, 1, 1, 0, 0).block();
}

@Override
protected final void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
var emittedRows = output.stream().mapToInt(Page::getPositionCount).sum();

var mapMatcher = matchesMap().entry("rows_received", emittedRows)
.entry("pages_processed", output.size())
.entry("rows_emitted", emittedRows)
.entry("limit", 100)
.entry("limit_remaining", 100 - emittedRows);

assertMap(map, mapMatcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.AnyOperatorTestCase;
import org.hamcrest.Matcher;

Expand Down Expand Up @@ -52,7 +53,7 @@ public void testBigDescription() {
}

@Override
protected void assertEmptyStatus(Map<String, Object> map) {
protected void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
assertThat(map, nullValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
Expand All @@ -23,11 +24,14 @@
import org.hamcrest.Matcher;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.notNullValue;

/**
* Superclass for testing any {@link Operator}, including {@link SourceOperator}s.
Expand Down Expand Up @@ -106,39 +110,58 @@ public final void testSimpleToString() {
}

/**
* Ensures that the Operator.Status of this operator has the standard fields.
* Ensures that the Operator.Status of this operator has the standard fields, set to 0.
*/
public final void testOperatorStatus() throws IOException {
public void testEmptyOperatorStatus() {
DriverContext driverContext = driverContext();
try (var operator = simple().get(driverContext)) {
Operator.Status status = operator.status();
if (status == null) {
assertEmptyStatus(null);
return;
}
assertOperatorStatus(operator, List.of(), List.of());
}
}

try (var xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent())) {
status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
/**
* Extracts and asserts the operator status.
*/
protected final void assertOperatorStatus(Operator operator, List<Page> input, List<Page> output) {
Operator.Status status = operator.status();

var bytesReference = BytesReference.bytes(xContentBuilder);
var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2();
if (status == null) {
assertStatus(null, input, output);
return;
}

assertEmptyStatus(map);
}
var xContent = XContentType.JSON.xContent();
try (var xContentBuilder = XContentBuilder.builder(xContent)) {
status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);

var bytesReference = BytesReference.bytes(xContentBuilder);
var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2();

assertStatus(map, input, output);
} catch (IOException e) {
fail(e, "Failed to convert operator status to XContent");
}
}

/**
* Assert that the status is sane.
* <p>
* This method should be overridden with custom logics and for better assertions and for operators without status.
* </p>
*/
protected void assertEmptyStatus(@Nullable Map<String, Object> map) {
protected void assertStatus(@Nullable Map<String, Object> map, List<Page> input, List<Page> output) {
assertThat(map, notNullValue());

var totalInputRows = input.stream().mapToInt(Page::getPositionCount).sum();
var totalOutputRows = output.stream().mapToInt(Page::getPositionCount).sum();

MapMatcher matcher = matchesMap().extraOk();
if (map.containsKey("pages_processed")) {
matcher = matcher.entry("pages_processed", 0);
matcher = matcher.entry("pages_processed", greaterThanOrEqualTo(0));
} else {
matcher = matcher.entry("pages_received", 0).entry("pages_emitted", 0);
matcher = matcher.entry("pages_received", input.size()).entry("pages_emitted", output.size());
}
matcher = matcher.entry("rows_received", 0).entry("rows_emitted", 0);
matcher = matcher.entry("rows_received", totalInputRows).entry("rows_emitted", totalOutputRows);
assertMap(map, matcher);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.test;

import org.elasticsearch.compute.data.Page;
import org.elasticsearch.test.MapMatcher;
import org.hamcrest.Matcher;

import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public abstract class AsyncOperatorTestCase extends OperatorTestCase {
@Override
@SuppressWarnings("unchecked")
protected final void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
var mapMatcher = matchesMap().entry("pages_received", input.size())
.entry("pages_completed", input.size())
.entry("process_nanos", either(greaterThanOrEqualTo(0)).or((Matcher<Integer>) (Matcher<?>) greaterThanOrEqualTo(0L)));

mapMatcher = extendStatusMatcher(mapMatcher, input, output);

assertMap(map, mapMatcher);
}

protected MapMatcher extendStatusMatcher(MapMatcher mapMatcher, List<Page> input, List<Page> output) {
return mapMatcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ protected final void assertSimple(DriverContext context, int size) {
var operator = simple().get(context);
List<Page> results = drive(operator, input.iterator(), context);
assertSimpleOutput(origInput, results);
assertOperatorStatus(operator, origInput, results);
assertThat(context.breaker().getUsed(), equalTo(0L));

// Release all result blocks. After this, all input blocks should be released as well, otherwise we have a leak.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@

package org.elasticsearch.compute.test;

import org.elasticsearch.compute.data.Page;

import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;

public abstract class SourceOperatorTestCase extends AnyOperatorTestCase {
@Override
protected void assertEmptyStatus(Map<String, Object> map) {
assertMap(map, matchesMap().extraOk().entry("pages_emitted", 0).entry("rows_emitted", 0));
protected void assertStatus(Map<String, Object> map, List<Page> input, List<Page> output) {
assertMap(
map,
matchesMap().extraOk()
.entry("pages_emitted", output.size())
.entry("rows_emitted", output.stream().mapToInt(Page::getPositionCount).sum())
);
}
}
Loading