Skip to content

Commit e8b01ff

Browse files
authored
ESQL: Fix Driver status iterations and cpuTime (#123290)
Fixes #122967 When the Driver reported status, if the "report at least every X time" report was triggered, it was re-adding the same iterations and cpuTime again, as it wasn't clearing it before the next iteration. Now, there are separated variables for the last updated iterations and reported time.
1 parent 46fc7de commit e8b01ff

File tree

3 files changed

+65
-10
lines changed

3 files changed

+65
-10
lines changed

docs/changelog/123290.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123290
2+
summary: Fix Driver status iterations and `cpuTime`
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 122967

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,16 @@ public DriverContext driverContext() {
171171
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
172172
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
173173
long maxTimeNanos = maxTime.nanos();
174+
// Start time, used to stop the calculations after maxTime has passed.
174175
long startTime = nowSupplier.getAsLong();
176+
// The time of the next forced status update.
175177
long nextStatus = startTime + statusNanos;
176-
int iter = 0;
178+
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
179+
int totalIterationsThisRun = 0;
180+
// The iterations to be reported on the next status update.
181+
int iterationsSinceLastStatusUpdate = 0;
182+
// The time passed since the last status update.
183+
long lastStatusUpdateTime = startTime;
177184
while (true) {
178185
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
179186
try {
@@ -182,29 +189,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
182189
closeEarlyFinishedOperators();
183190
assert isFinished() : "not finished after early termination";
184191
}
185-
iter++;
192+
totalIterationsThisRun++;
193+
iterationsSinceLastStatusUpdate++;
194+
195+
long now = nowSupplier.getAsLong();
186196
if (isBlocked.listener().isDone() == false) {
187-
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
197+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
188198
return isBlocked.listener();
189199
}
190200
if (isFinished()) {
191-
finishNanos = nowSupplier.getAsLong();
192-
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
201+
finishNanos = now;
202+
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
193203
driverContext.finish();
194204
Releasables.close(releasable, driverContext.getSnapshot());
195205
return Operator.NOT_BLOCKED.listener();
196206
}
197-
long now = nowSupplier.getAsLong();
198-
if (iter >= maxIterations) {
199-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
207+
if (totalIterationsThisRun >= maxIterations) {
208+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
200209
return Operator.NOT_BLOCKED.listener();
201210
}
202211
if (now - startTime >= maxTimeNanos) {
203-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
212+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
204213
return Operator.NOT_BLOCKED.listener();
205214
}
206215
if (now > nextStatus) {
207-
updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
216+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
217+
iterationsSinceLastStatusUpdate = 0;
218+
lastStatusUpdateTime = now;
208219
nextStatus = now + statusNanos;
209220
}
210221
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,44 @@ public void testProfileAndStatusTimeout() {
167167
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
168168
}
169169

170+
public void testProfileAndStatusInterval() {
171+
DriverContext driverContext = driverContext();
172+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
173+
List<Page> outPages = new ArrayList<>();
174+
175+
long startEpoch = randomNonNegativeLong();
176+
long startNanos = randomLong();
177+
long waitTime = randomLongBetween(10000, 100000);
178+
long tickTime = randomLongBetween(10000, 100000);
179+
long statusInterval = randomLongBetween(1, 10);
180+
181+
Driver driver = createDriver(startEpoch, startNanos, driverContext, inPages, outPages, TimeValue.timeValueNanos(statusInterval));
182+
183+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
184+
185+
int iterationsPerTick = randomIntBetween(1, 10);
186+
187+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
188+
logger.info("status {} {}", i, driver.status());
189+
assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
190+
assertThat(driver.status().started(), equalTo(startEpoch));
191+
assertThat(driver.status().iterations(), equalTo((long) i));
192+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
193+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
194+
}
195+
196+
logger.info("status {}", driver.status());
197+
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
198+
assertThat(driver.status().started(), equalTo(startEpoch));
199+
assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
200+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
201+
202+
logger.info("profile {}", driver.profile());
203+
assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
204+
assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
205+
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
206+
}
207+
170208
private static Driver createDriver(
171209
long startEpoch,
172210
long startNanos,

0 commit comments

Comments
 (0)