-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Improve cancellability in TransportTasksAction #96279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve cancellability in TransportTasksAction #96279
Conversation
Each `TransportTasksAction` fans-out to multiple nodes, accumulates responses and retains them until all the nodes have responded, and then converts the responses into a final result. Similarly to elastic#92987 and elastic#93484, we should accumulate the responses in a structure that doesn't require so much copying later on, and should drop the received responses if the task is cancelled while some nodes' responses are still pending.
Pinging @elastic/es-distributed (Team:Distributed) |
Hi @DaveCTurner, I've created a changelog YAML for you. |
In a busy cluster the list-tasks API may retain information about a very large number of tasks while waiting for all nodes to respond. This commit makes the API cancellable so that unnecessary partial results can be released earlier. Relates elastic#96279, which implements the early-release functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I've left a few cosmeticcomments
|
||
reachabilityChecker.ensureUnreachable(); | ||
|
||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to use while (taskResponseListeners.peek() != null)
here instead of manually breaking out of the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That'd work, but I'd rather just do a single read, avoiding having to reason about the relationship between peek()
and poll()
.
return; | ||
} | ||
|
||
logger.debug(Strings.format("failed to execute on node [{}]", nodeId), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if can perform formatting lazily by using a supplier : logger.debug(() -> Strings.format(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes; it's not very common to get here, but I'll do that.
@Override | ||
public void onResponse(NodeTasksResponse nodeResponse) { | ||
synchronized (taskResponses) { | ||
taskResponses.addAll(nodeResponse.results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've run some tests and have seen a lot of cases where nodeResponse.results
is empty. Would it make sense to perform an isEmpty
check on nodeResponse.results
before acquiring the lock on taskResponses
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that makes sense.
private void nodeOperation(CancellableTask task, NodeTaskRequest nodeTaskRequest, ActionListener<NodeTasksResponse> listener) { | ||
TasksRequest request = nodeTaskRequest.tasksRequest; | ||
processTasks(request, ActionListener.wrap(tasks -> nodeOperation(task, listener, request, tasks), listener::onFailure)); | ||
final var taskResponses = new ArrayList<TaskResponse>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to pre-size taskResponses
based on the amount of the nodes from which we collect responses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the number of nodes would be a particularly useful estimate for this - as you say, many nodes return nothing, but sometimes they will return thousands of results. These are pretty low-throughput APIs so I think the default sizing is ok.
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Slightly complex (but good), I wonder if a comment or two on how the result lists are captured locally and how cancelling ensures it is freed could make sense?
We have this somewhat-complex pattern in 3 places already, and elastic#96279 will introduce a couple more, so this commit extracts it as a dedicated utility. Relates elastic#92987 Relates elastic#93484
Yeah it is a bit convoluted isn't it? I extracted a utility (and added more commentary) in #96373, and will update this PR to use it once that's merged. |
In a busy cluster the list-tasks API may retain information about a very large number of tasks while waiting for all nodes to respond. This commit makes the API cancellable so that unnecessary partial results can be released earlier. Relates #96279, which implements the early-release functionality.
Each
TransportTasksAction
fans-out to multiple nodes, accumulates responses and retains them until all the nodes have responded, and then converts the responses into a final result.Similarly to #92987 and #93484, we should accumulate the responses in a structure that doesn't require so much copying later on, and should drop the received responses if the task is cancelled while some nodes' responses are still pending.