Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/123290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 123290
summary: Fix Driver status iterations and `cpuTime`
area: ES|QL
type: enhancement
issues:
- 122967
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,16 @@ public DriverContext driverContext() {
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
long maxTimeNanos = maxTime.nanos();
// Start time, used to stop the calculations after maxTime has passed.
long startTime = nowSupplier.getAsLong();
// The time of the next forced status update.
long nextStatus = startTime + statusNanos;
int iter = 0;
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
int totalIterationsThisRun = 0;
// The iterations to be reported on the next status update.
int iterationsSinceLastStatusUpdate = 0;
// The time passed since the last status update.
long lastStatusUpdateTime = startTime;
while (true) {
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
try {
Expand All @@ -209,29 +216,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
closeEarlyFinishedOperators();
assert isFinished() : "not finished after early termination";
}
iter++;
totalIterationsThisRun++;
iterationsSinceLastStatusUpdate++;

long now = nowSupplier.getAsLong();
if (isBlocked.listener().isDone() == false) {
updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
return isBlocked.listener();
}
if (isFinished()) {
finishNanos = nowSupplier.getAsLong();
updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
finishNanos = now;
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
driverContext.finish();
Releasables.close(releasable, driverContext.getSnapshot());
return Operator.NOT_BLOCKED.listener();
}
long now = nowSupplier.getAsLong();
if (iter >= maxIterations) {
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
if (totalIterationsThisRun >= maxIterations) {
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
return Operator.NOT_BLOCKED.listener();
}
if (now - startTime >= maxTimeNanos) {
updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
return Operator.NOT_BLOCKED.listener();
}
if (now > nextStatus) {
updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
iterationsSinceLastStatusUpdate = 0;
lastStatusUpdateTime = now;
nextStatus = now + statusNanos;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,56 @@ public void testProfileAndStatusTimeout() {
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
}

public void testProfileAndStatusInterval() {
DriverContext driverContext = driverContext();
List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
List<Page> outPages = new ArrayList<>();

long startEpoch = randomNonNegativeLong();
long startNanos = randomLong();
long waitTime = randomLongBetween(10000, 100000);
long tickTime = randomLongBetween(10000, 100000);
long statusInterval = randomLongBetween(1, 10);

Driver driver = new Driver(
"unset",
"test",
startEpoch,
startNanos,
driverContext,
() -> "unset",
new CannedSourceOperator(inPages.iterator()),
List.of(),
new TestResultPageSinkOperator(outPages::add),
TimeValue.timeValueNanos(statusInterval),
() -> {}
);

NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);

int iterationsPerTick = randomIntBetween(1, 10);

for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
logger.info("status {} {}", i, driver.status());
assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
assertThat(driver.status().started(), equalTo(startEpoch));
assertThat(driver.status().iterations(), equalTo((long) i));
assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
}

logger.info("status {}", driver.status());
assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
assertThat(driver.status().started(), equalTo(startEpoch));
assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));

logger.info("profile {}", driver.profile());
assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
}

class NowSupplier implements LongSupplier {
private final long startNanos;
private final long waitTime;
Expand Down