diff --git a/docs/changelog/132260.yaml b/docs/changelog/132260.yaml new file mode 100644 index 0000000000000..f00d3fc07fcc9 --- /dev/null +++ b/docs/changelog/132260.yaml @@ -0,0 +1,6 @@ +pr: 132260 +summary: FIx Driver creating status with a live list of operators +area: ES|QL +type: bug +issues: + - 131564 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 628f17c46d232..8204eca9716fa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -531,7 +531,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. prev.cpuNanos() + extraCpuNanos, prev.iterations() + extraIterations, status, - statusOfCompletedOperators, + List.copyOf(statusOfCompletedOperators), activeOperators.stream().map(op -> new DriverStatus.OperatorStatus(op.toString(), op.status())).toList(), sleeps ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index eb5a99eccbca4..d31284f514e75 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -50,6 +50,8 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class DriverTests extends ESTestCase { /** @@ -253,6 +255,59 @@ public void testProfileAndStatusInterval() { assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); } + public void testUnchangedStatus() { + DriverContext driverContext = driverContext(); + List inPages = randomList(2, 100, DriverTests::randomPage); + List outPages = new ArrayList<>(); + + long startEpoch = randomNonNegativeLong(); + long startNanos = randomLong(); + long waitTime = randomLongBetween(10000, 100000); + long tickTime = randomLongBetween(10000, 100000); + long statusInterval = randomLongBetween(1, 10); + + Driver driver = new Driver( + "unset", + "test", + startEpoch, + startNanos, + driverContext, + () -> "unset", + new CannedSourceOperator(inPages.iterator()), + List.of(), + new TestResultPageSinkOperator(outPages::add), + TimeValue.timeValueNanos(statusInterval), + () -> {} + ); + + NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime); + + int iterationsPerTick = randomIntBetween(1, 10); + + for (int i = 0; i < inPages.size(); i += iterationsPerTick) { + DriverStatus initialStatus = driver.status(); + long completedOperatorsHash = initialStatus.completedOperators().hashCode(); + long activeOperatorsHash = initialStatus.activeOperators().hashCode(); + long sleepsHash = initialStatus.sleeps().hashCode(); + + driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier); + + DriverStatus newStatus = driver.status(); + assertThat(newStatus, not(sameInstance(initialStatus))); + assertThat( + newStatus.completedOperators() != initialStatus.completedOperators() + || newStatus.completedOperators().hashCode() == completedOperatorsHash, + equalTo(true) + ); + assertThat( + newStatus.activeOperators() != initialStatus.activeOperators() + || newStatus.activeOperators().hashCode() == activeOperatorsHash, + equalTo(true) + ); + assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true)); + } + } + class NowSupplier implements LongSupplier { private final long startNanos; private final long waitTime;