Skip to content

Commit d68b970

Browse files
committed
ESQL: Fix Driver status iterations and cpuTime (elastic#123290)
Fixes elastic#122967 When the Driver reported status, if the "report at least every X time" report was triggered, it was re-adding the same iterations and cpuTime again, as it wasn't clearing it before the next iteration. Now, there are separated variables for the last updated iterations and reported time.
1 parent 08b3320 commit d68b970

File tree

3 files changed

+76
-10
lines changed

3 files changed

+76
-10
lines changed

docs/changelog/123290.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 123290
2+
summary: Fix Driver status iterations and `cpuTime`
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 122967

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,16 @@ public DriverContext driverContext() {
182182
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
183183
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
184184
long maxTimeNanos = maxTime.nanos();
185+
// Start time, used to stop the calculations after maxTime has passed.
185186
long startTime = nowSupplier.getAsLong();
187+
// The time of the next forced status update.
186188
long nextStatus = startTime + statusNanos;
187-
int iter = 0;
189+
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
190+
int totalIterationsThisRun = 0;
191+
// The iterations to be reported on the next status update.
192+
int iterationsSinceLastStatusUpdate = 0;
193+
// The time passed since the last status update.
194+
long lastStatusUpdateTime = startTime;
188195
while (true) {
189196
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
190197
try {
@@ -193,29 +200,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
193200
closeEarlyFinishedOperators();
194201
assert isFinished() : "not finished after early termination";
195202
}
196-
iter++;
203+
totalIterationsThisRun++;
204+
iterationsSinceLastStatusUpdate++;
205+
206+
long now = nowSupplier.getAsLong();
197207
if (isBlocked.listener().isDone() == false) {
198-
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
208+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
199209
return isBlocked.listener();
200210
}
201211
if (isFinished()) {
202-
finishNanos = nowSupplier.getAsLong();
203-
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
212+
finishNanos = now;
213+
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
204214
driverContext.finish();
205215
Releasables.close(releasable, driverContext.getSnapshot());
206216
return Operator.NOT_BLOCKED.listener();
207217
}
208-
long now = nowSupplier.getAsLong();
209-
if (iter >= maxIterations) {
210-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
218+
if (totalIterationsThisRun >= maxIterations) {
219+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
211220
return Operator.NOT_BLOCKED.listener();
212221
}
213222
if (now - startTime >= maxTimeNanos) {
214-
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
223+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
215224
return Operator.NOT_BLOCKED.listener();
216225
}
217226
if (now > nextStatus) {
218-
updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
227+
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
228+
iterationsSinceLastStatusUpdate = 0;
229+
lastStatusUpdateTime = now;
219230
nextStatus = now + statusNanos;
220231
}
221232
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,55 @@ public void testProfileAndStatusTimeout() {
199199
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
200200
}
201201

202+
public void testProfileAndStatusInterval() {
203+
DriverContext driverContext = driverContext();
204+
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
205+
List<Page> outPages = new ArrayList<>();
206+
207+
long startEpoch = randomNonNegativeLong();
208+
long startNanos = randomLong();
209+
long waitTime = randomLongBetween(10000, 100000);
210+
long tickTime = randomLongBetween(10000, 100000);
211+
long statusInterval = randomLongBetween(1, 10);
212+
213+
Driver driver = new Driver(
214+
"unset",
215+
startEpoch,
216+
startNanos,
217+
driverContext,
218+
() -> "unset",
219+
new CannedSourceOperator(inPages.iterator()),
220+
List.of(),
221+
new TestResultPageSinkOperator(outPages::add),
222+
TimeValue.timeValueNanos(statusInterval),
223+
() -> {}
224+
);
225+
226+
NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
227+
228+
int iterationsPerTick = randomIntBetween(1, 10);
229+
230+
for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
231+
logger.info("status {} {}", i, driver.status());
232+
assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
233+
assertThat(driver.status().started(), equalTo(startEpoch));
234+
assertThat(driver.status().iterations(), equalTo((long) i));
235+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
236+
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
237+
}
238+
239+
logger.info("status {}", driver.status());
240+
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
241+
assertThat(driver.status().started(), equalTo(startEpoch));
242+
assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
243+
assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
244+
245+
logger.info("profile {}", driver.profile());
246+
assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
247+
assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
248+
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
249+
}
250+
202251
class NowSupplier implements LongSupplier {
203252
private final long startNanos;
204253
private final long waitTime;

0 commit comments

Comments
 (0)