-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Fix Driver status iterations and cpuTime #123290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
faef410
457dcc8
8e17d9e
8dd5e0f
acf5f5a
3b5672c
7b19599
5c661b3
6088342
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 123290 | ||
| summary: Fix Driver status iterations and `cpuTime` | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: | ||
| - 122967 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,9 +198,16 @@ public DriverContext driverContext() { | |
| SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) { | ||
| updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running"); | ||
| long maxTimeNanos = maxTime.nanos(); | ||
| // Start time, used to stop the calculations after maxTime has passed. | ||
| long startTime = nowSupplier.getAsLong(); | ||
| // The time of the next forced status update. | ||
| long nextStatus = startTime + statusNanos; | ||
| int iter = 0; | ||
| // Current executed iteration, used to stop the calculations after maxIterations have passed. | ||
| int currentIteration = 0; | ||
| // The iterations to be reported on the next status update. | ||
| int unreportedIterations = 0; | ||
| // The time passed since the last status update. | ||
| long lastStatusUpdateTime = startTime; | ||
| while (true) { | ||
| IsBlockedResult isBlocked = Operator.NOT_BLOCKED; | ||
| try { | ||
|
|
@@ -209,29 +216,33 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie | |
| closeEarlyFinishedOperators(); | ||
| assert isFinished() : "not finished after early termination"; | ||
| } | ||
| iter++; | ||
| currentIteration++; | ||
| unreportedIterations++; | ||
|
|
||
| long now = nowSupplier.getAsLong(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional: you could extract an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One branch doesn't use that calculation, and it may be unused if no |
||
| if (isBlocked.listener().isDone() == false) { | ||
| updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason()); | ||
| updateStatus(now - lastStatusUpdateTime, unreportedIterations, DriverStatus.Status.ASYNC, isBlocked.reason()); | ||
| return isBlocked.listener(); | ||
| } | ||
| if (isFinished()) { | ||
| finishNanos = nowSupplier.getAsLong(); | ||
| updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done"); | ||
| finishNanos = now; | ||
| updateStatus(finishNanos - lastStatusUpdateTime, unreportedIterations, DriverStatus.Status.DONE, "driver done"); | ||
| driverContext.finish(); | ||
| Releasables.close(releasable, driverContext.getSnapshot()); | ||
| return Operator.NOT_BLOCKED.listener(); | ||
| } | ||
| long now = nowSupplier.getAsLong(); | ||
| if (iter >= maxIterations) { | ||
| updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations"); | ||
| if (currentIteration >= maxIterations) { | ||
| updateStatus(now - lastStatusUpdateTime, unreportedIterations, DriverStatus.Status.WAITING, "driver iterations"); | ||
| return Operator.NOT_BLOCKED.listener(); | ||
| } | ||
| if (now - startTime >= maxTimeNanos) { | ||
| updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time"); | ||
| updateStatus(now - lastStatusUpdateTime, unreportedIterations, DriverStatus.Status.WAITING, "driver time"); | ||
| return Operator.NOT_BLOCKED.listener(); | ||
| } | ||
| if (now > nextStatus) { | ||
| updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running"); | ||
| updateStatus(now - lastStatusUpdateTime, unreportedIterations, DriverStatus.Status.RUNNING, "driver running"); | ||
| unreportedIterations = 0; | ||
| lastStatusUpdateTime = now; | ||
| nextStatus = now + statusNanos; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: you could call them
totalIterations(andcurrentIterations, respectively).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just made the variable names more explicit 👀