diff --git a/docs/changelog/123290.yaml b/docs/changelog/123290.yaml new file mode 100644 index 0000000000000..45b9168b494d6 --- /dev/null +++ b/docs/changelog/123290.yaml @@ -0,0 +1,6 @@ +pr: 123290 +summary: Fix Driver status iterations and `cpuTime` +area: ES|QL +type: enhancement +issues: + - 122967 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index c0d220fda5d4e..70d52068bb70b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -198,9 +198,16 @@ public DriverContext driverContext() { SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) { updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running"); long maxTimeNanos = maxTime.nanos(); + // Start time, used to stop the calculations after maxTime has passed. long startTime = nowSupplier.getAsLong(); + // The time of the next forced status update. long nextStatus = startTime + statusNanos; - int iter = 0; + // Total executed iterations this run, used to stop the calculations after maxIterations have passed. + int totalIterationsThisRun = 0; + // The iterations to be reported on the next status update. + int iterationsSinceLastStatusUpdate = 0; + // The time passed since the last status update. + long lastStatusUpdateTime = startTime; while (true) { IsBlockedResult isBlocked = Operator.NOT_BLOCKED; try { @@ -209,29 +216,33 @@ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplie closeEarlyFinishedOperators(); assert isFinished() : "not finished after early termination"; } - iter++; + totalIterationsThisRun++; + iterationsSinceLastStatusUpdate++; + + long now = nowSupplier.getAsLong(); if (isBlocked.listener().isDone() == false) { - updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason()); + updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason()); return isBlocked.listener(); } if (isFinished()) { - finishNanos = nowSupplier.getAsLong(); - updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done"); + finishNanos = now; + updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done"); driverContext.finish(); Releasables.close(releasable, driverContext.getSnapshot()); return Operator.NOT_BLOCKED.listener(); } - long now = nowSupplier.getAsLong(); - if (iter >= maxIterations) { - updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations"); + if (totalIterationsThisRun >= maxIterations) { + updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations"); return Operator.NOT_BLOCKED.listener(); } if (now - startTime >= maxTimeNanos) { - updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time"); + updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time"); return Operator.NOT_BLOCKED.listener(); } if (now > nextStatus) { - updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running"); + updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running"); + iterationsSinceLastStatusUpdate = 0; + lastStatusUpdateTime = now; nextStatus = now + statusNanos; } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index a0b04668b7307..b0b47ec56aeaa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -202,6 +202,56 @@ public void testProfileAndStatusTimeout() { assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); } + public void testProfileAndStatusInterval() { + DriverContext driverContext = driverContext(); + List inPages = randomList(2, 100, DriverTests::randomPage); + List outPages = new ArrayList<>(); + + long startEpoch = randomNonNegativeLong(); + long startNanos = randomLong(); + long waitTime = randomLongBetween(10000, 100000); + long tickTime = randomLongBetween(10000, 100000); + long statusInterval = randomLongBetween(1, 10); + + Driver driver = new Driver( + "unset", + "test", + startEpoch, + startNanos, + driverContext, + () -> "unset", + new CannedSourceOperator(inPages.iterator()), + List.of(), + new TestResultPageSinkOperator(outPages::add), + TimeValue.timeValueNanos(statusInterval), + () -> {} + ); + + NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime); + + int iterationsPerTick = randomIntBetween(1, 10); + + for (int i = 0; i < inPages.size(); i += iterationsPerTick) { + logger.info("status {} {}", i, driver.status()); + assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING)); + assertThat(driver.status().started(), equalTo(startEpoch)); + assertThat(driver.status().iterations(), equalTo((long) i)); + assertThat(driver.status().cpuNanos(), equalTo(tickTime * i)); + driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier); + } + + logger.info("status {}", driver.status()); + assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE)); + assertThat(driver.status().started(), equalTo(startEpoch)); + assertThat(driver.status().iterations(), equalTo((long) inPages.size())); + assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size())); + + logger.info("profile {}", driver.profile()); + assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1))); + assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); + } + class NowSupplier implements LongSupplier { private final long startNanos; private final long waitTime;