From fbf7606903b4018130180d9ee9715e2b00d27477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Thu, 31 Jul 2025 12:50:43 +0200 Subject: [PATCH 1/4] ESQL: FIx Driver creating status with a live list of operators --- .../main/java/org/elasticsearch/compute/operator/Driver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 0ad3c64fcaf09..22f761e79c618 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -556,7 +556,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. prev.cpuNanos() + extraCpuNanos, prev.iterations() + extraIterations, status, - statusOfCompletedOperators, + new ArrayList<>(statusOfCompletedOperators), activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(), sleeps ); From 9a464cc6fedf58a55cd26cb60a930c5e82901e26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Thu, 31 Jul 2025 12:57:34 +0200 Subject: [PATCH 2/4] Update docs/changelog/132260.yaml --- docs/changelog/132260.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/132260.yaml diff --git a/docs/changelog/132260.yaml b/docs/changelog/132260.yaml new file mode 100644 index 0000000000000..f00d3fc07fcc9 --- /dev/null +++ b/docs/changelog/132260.yaml @@ -0,0 +1,6 @@ +pr: 132260 +summary: FIx Driver creating status with a live list of operators +area: ES|QL +type: bug +issues: + - 131564 From 961164c8e5cf7a7f326b232afb498b3c3a356c82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Thu, 31 Jul 2025 17:05:29 +0200 Subject: [PATCH 3/4] Use List.copyOf() and add test --- .../compute/operator/Driver.java | 2 +- .../compute/operator/DriverTests.java | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 22f761e79c618..b91cd3f468ad5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -556,7 +556,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. prev.cpuNanos() + extraCpuNanos, prev.iterations() + extraIterations, status, - new ArrayList<>(statusOfCompletedOperators), + List.copyOf(statusOfCompletedOperators), activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(), sleeps ); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index c538cf41ee1fd..c633d1accc92e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -47,8 +47,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; +import static net.bytebuddy.matcher.ElementMatchers.is; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.comparesEqualTo; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; public class DriverTests extends ESTestCase { /** @@ -204,6 +209,39 @@ public void testProfileAndStatusInterval() { assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); } + public void testUnchangedStatus() { + DriverContext driverContext = driverContext(); + List inPages = randomList(2, 100, DriverTests::randomPage); + List outPages = new ArrayList<>(); + + long startEpoch = randomNonNegativeLong(); + long startNanos = randomLong(); + long waitTime = randomLongBetween(10000, 100000); + long tickTime = randomLongBetween(10000, 100000); + long statusInterval = randomLongBetween(1, 10); + + Driver driver = createDriver(startEpoch, startNanos, driverContext, inPages, outPages, TimeValue.timeValueNanos(statusInterval)); + + NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime); + + int iterationsPerTick = randomIntBetween(1, 10); + + for (int i = 0; i < inPages.size(); i += iterationsPerTick) { + DriverStatus initialStatus = driver.status(); + long completedOperatorsHash = initialStatus.completedOperators().hashCode(); + long activeOperatorsHash = initialStatus.activeOperators().hashCode(); + long sleepsHash = initialStatus.sleeps().hashCode(); + + driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier); + + DriverStatus newStatus = driver.status(); + assertThat(newStatus, not(sameInstance(initialStatus))); + assertThat(newStatus.completedOperators() != initialStatus.completedOperators() || newStatus.completedOperators().hashCode() == completedOperatorsHash, equalTo(true)); + assertThat(newStatus.activeOperators() != initialStatus.activeOperators() || newStatus.activeOperators().hashCode() == activeOperatorsHash, equalTo(true)); + assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true)); + } + } + private static Driver createDriver( long startEpoch, long startNanos, From 0110c995a09076e33ae5ba1cbb49307e654f9c9d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 31 Jul 2025 15:13:59 +0000 Subject: [PATCH 4/4] [CI] Auto commit changes from spotless --- .../compute/operator/DriverTests.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index c633d1accc92e..be3f11744d3a0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -47,9 +47,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; -import static net.bytebuddy.matcher.ElementMatchers.is; -import static org.hamcrest.Matchers.any; -import static org.hamcrest.Matchers.comparesEqualTo; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -236,8 +233,16 @@ public void testUnchangedStatus() { DriverStatus newStatus = driver.status(); assertThat(newStatus, not(sameInstance(initialStatus))); - assertThat(newStatus.completedOperators() != initialStatus.completedOperators() || newStatus.completedOperators().hashCode() == completedOperatorsHash, equalTo(true)); - assertThat(newStatus.activeOperators() != initialStatus.activeOperators() || newStatus.activeOperators().hashCode() == activeOperatorsHash, equalTo(true)); + assertThat( + newStatus.completedOperators() != initialStatus.completedOperators() + || newStatus.completedOperators().hashCode() == completedOperatorsHash, + equalTo(true) + ); + assertThat( + newStatus.activeOperators() != initialStatus.activeOperators() + || newStatus.activeOperators().hashCode() == activeOperatorsHash, + equalTo(true) + ); assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true)); } }