Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/123290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 123290
summary: Fix Driver status iterations and `cpuTime`
area: ES|QL
type: enhancement
issues:
- 122967
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,16 @@ public DriverContext driverContext() {
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
long maxTimeNanos = maxTime.nanos();
// Start time, used to stop the calculations after maxTime has passed.
long startTime = nowSupplier.getAsLong();
// The time of the next forced status update.
long nextStatus = startTime + statusNanos;
int iter = 0;
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
int totalIterationsThisRun = 0;
// The iterations to be reported on the next status update.
int iterationsSinceLastStatusUpdate = 0;
// The time passed since the last status update.
long lastStatusUpdateTime = startTime;
while (true) {
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
try {
Expand All @@ -182,29 +189,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
closeEarlyFinishedOperators();
assert isFinished() : "not finished after early termination";
}
iter++;
totalIterationsThisRun++;
iterationsSinceLastStatusUpdate++;

long now = nowSupplier.getAsLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: you could extract an elapsed time, since all the checks use that as 1st updateStatus() param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One branch doesn't use that calculation, and it may be unused if no if is executed. So I would keep it that way, probably

if (isBlocked.listener().isDone() == false) {
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
return isBlocked.listener();
}
if (isFinished()) {
finishNanos = nowSupplier.getAsLong();
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
finishNanos = now;
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
driverContext.finish();
Releasables.close(releasable, driverContext.getSnapshot());
return Operator.NOT_BLOCKED.listener();
}
long now = nowSupplier.getAsLong();
if (iter >= maxIterations) {
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
if (totalIterationsThisRun >= maxIterations) {
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
return Operator.NOT_BLOCKED.listener();
}
if (now - startTime >= maxTimeNanos) {
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
return Operator.NOT_BLOCKED.listener();
}
if (now > nextStatus) {
updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
iterationsSinceLastStatusUpdate = 0;
lastStatusUpdateTime = now;
nextStatus = now + statusNanos;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,44 @@ public void testProfileAndStatusTimeout() {
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
}

public void testProfileAndStatusInterval() {
DriverContext driverContext = driverContext();
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
List<Page> outPages = new ArrayList<>();

long startEpoch = randomNonNegativeLong();
long startNanos = randomLong();
long waitTime = randomLongBetween(10000, 100000);
long tickTime = randomLongBetween(10000, 100000);
long statusInterval = randomLongBetween(1, 10);

Driver driver = createDriver(startEpoch, startNanos, driverContext, inPages, outPages, TimeValue.timeValueNanos(statusInterval));

NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);

int iterationsPerTick = randomIntBetween(1, 10);

for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
logger.info("status {} {}", i, driver.status());
assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
assertThat(driver.status().started(), equalTo(startEpoch));
assertThat(driver.status().iterations(), equalTo((long) i));
assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
}

logger.info("status {}", driver.status());
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
assertThat(driver.status().started(), equalTo(startEpoch));
assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));

logger.info("profile {}", driver.profile());
assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
}

private static Driver createDriver(
long startEpoch,
long startNanos,
Expand Down