Skip to content

Commit 976b651

Browse files
authored
Simplify TransportNodesAction (#92987)
Recent improvements to the primitives for writing async code (particularly #92452 and #92620) mean that we can enormously simplify `TransportNodesAction`. In particular, we can avoid accumulating an intermediate array of responses for later processing in favour of just accumulating the successes and failures into their final separate lists. We also no longer need to use a separate `NodesResponseTracker` to discard responses on cancellation.
1 parent fcdcba8 commit 976b651

File tree

1 file changed

+84
-164
lines changed

1 file changed

+84
-164
lines changed

server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 84 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,26 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.ActionListenerResponseHandler;
15+
import org.elasticsearch.action.ActionRunnable;
1416
import org.elasticsearch.action.FailedNodeException;
1517
import org.elasticsearch.action.support.ActionFilters;
1618
import org.elasticsearch.action.support.HandledTransportAction;
17-
import org.elasticsearch.action.support.NodeResponseTracker;
19+
import org.elasticsearch.action.support.RefCountingRunnable;
1820
import org.elasticsearch.cluster.ClusterState;
1921
import org.elasticsearch.cluster.node.DiscoveryNode;
2022
import org.elasticsearch.cluster.service.ClusterService;
2123
import org.elasticsearch.common.io.stream.StreamInput;
2224
import org.elasticsearch.common.io.stream.Writeable;
25+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
26+
import org.elasticsearch.common.util.concurrent.RunOnce;
2327
import org.elasticsearch.tasks.CancellableTask;
2428
import org.elasticsearch.tasks.Task;
25-
import org.elasticsearch.tasks.TaskCancelledException;
2629
import org.elasticsearch.threadpool.ThreadPool;
2730
import org.elasticsearch.transport.TransportChannel;
28-
import org.elasticsearch.transport.TransportException;
2931
import org.elasticsearch.transport.TransportRequest;
3032
import org.elasticsearch.transport.TransportRequestHandler;
3133
import org.elasticsearch.transport.TransportRequestOptions;
32-
import org.elasticsearch.transport.TransportResponseHandler;
3334
import org.elasticsearch.transport.TransportService;
3435

3536
import java.io.IOException;
@@ -38,6 +39,8 @@
3839
import java.util.List;
3940
import java.util.Objects;
4041

42+
import static org.elasticsearch.core.Strings.format;
43+
4144
public abstract class TransportNodesAction<
4245
NodesRequest extends BaseNodesRequest<NodesRequest>,
4346
NodesResponse extends BaseNodesResponse<?>,
@@ -85,7 +88,7 @@ protected TransportNodesAction(
8588
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);
8689

8790
this.transportNodeAction = actionName + "[n]";
88-
this.finalExecutor = finalExecutor;
91+
this.finalExecutor = finalExecutor.equals(ThreadPool.Names.SAME) ? ThreadPool.Names.GENERIC : finalExecutor;
8992
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
9093
}
9194

@@ -123,40 +126,89 @@ protected TransportNodesAction(
123126

124127
@Override
125128
protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
126-
new AsyncAction(task, request, listener).start();
127-
}
129+
if (request.concreteNodes() == null) {
130+
resolveRequest(request, clusterService.state());
131+
assert request.concreteNodes() != null;
132+
}
128133

129-
/**
130-
* Map the responses into {@code nodeResponseClass} responses and {@link FailedNodeException}s, convert to a {@link NodesResponse} and
131-
* pass it to the listener. Fails the listener with a {@link NullPointerException} if {@code nodesResponses} is null.
132-
*
133-
* @param request The associated request.
134-
* @param nodeResponseTracker All node-level responses collected so far
135-
* @throws NodeResponseTracker.DiscardedResponsesException if {@code nodeResponseTracker} has already discarded the intermediate results
136-
* @see #newResponseAsync(Task, BaseNodesRequest, List, List, ActionListener)
137-
*/
138-
// exposed for tests
139-
void newResponse(Task task, NodesRequest request, NodeResponseTracker nodeResponseTracker, ActionListener<NodesResponse> listener)
140-
throws NodeResponseTracker.DiscardedResponsesException {
134+
final var responses = new ArrayList<NodeResponse>(request.concreteNodes().length);
135+
final var exceptions = new ArrayList<FailedNodeException>(0);
141136

142-
if (nodeResponseTracker == null) {
143-
listener.onFailure(new NullPointerException("nodesResponses"));
144-
return;
137+
final var resultListener = new ListenableFuture<NodesResponse>();
138+
final var resultListenerCompleter = new RunOnce(() -> {
139+
if (task instanceof CancellableTask cancellableTask) {
140+
if (cancellableTask.notifyIfCancelled(resultListener)) {
141+
return;
142+
}
143+
}
144+
// ref releases all happen-before here so no need to be synchronized
145+
threadPool.executor(finalExecutor)
146+
.execute(ActionRunnable.wrap(resultListener, l -> newResponseAsync(task, request, responses, exceptions, l)));
147+
});
148+
149+
final var nodeCancellationListener = new ListenableFuture<NodeResponse>(); // collects node listeners & completes them if cancelled
150+
if (task instanceof CancellableTask cancellableTask) {
151+
cancellableTask.addListener(() -> {
152+
assert cancellableTask.isCancelled();
153+
resultListenerCompleter.run();
154+
cancellableTask.notifyIfCancelled(nodeCancellationListener);
155+
});
145156
}
146157

147-
final List<NodeResponse> responses = new ArrayList<>();
148-
final List<FailedNodeException> failures = new ArrayList<>();
158+
final var transportRequestOptions = TransportRequestOptions.timeout(request.timeout());
159+
160+
try (var refs = new RefCountingRunnable(() -> {
161+
resultListener.addListener(listener);
162+
resultListenerCompleter.run();
163+
})) {
164+
for (final var node : request.concreteNodes()) {
165+
final ActionListener<NodeResponse> nodeResponseListener = ActionListener.notifyOnce(new ActionListener<>() {
166+
@Override
167+
public void onResponse(NodeResponse nodeResponse) {
168+
synchronized (responses) {
169+
responses.add(nodeResponse);
170+
}
171+
}
172+
173+
@Override
174+
public void onFailure(Exception e) {
175+
if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) {
176+
return;
177+
}
178+
179+
logger.debug(() -> format("failed to execute [%s] on node [%s]", actionName, node), e);
180+
synchronized (exceptions) {
181+
exceptions.add(new FailedNodeException(node.getId(), "Failed node [" + node.getId() + "]", e));
182+
}
183+
}
149184

150-
for (int i = 0; i < nodeResponseTracker.getExpectedResponseCount(); ++i) {
151-
Object response = nodeResponseTracker.getResponse(i);
152-
if (nodeResponseTracker.getResponse(i)instanceof FailedNodeException failedNodeException) {
153-
failures.add(failedNodeException);
154-
} else {
155-
responses.add(nodeResponseClass.cast(response));
185+
@Override
186+
public String toString() {
187+
return "[" + actionName + "][" + node.descriptionWithoutAttributes() + "]";
188+
}
189+
});
190+
191+
if (task instanceof CancellableTask) {
192+
nodeCancellationListener.addListener(nodeResponseListener);
193+
}
194+
195+
final var nodeRequest = newNodeRequest(request);
196+
if (task != null) {
197+
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
198+
}
199+
200+
transportService.sendRequest(
201+
node,
202+
transportNodeAction,
203+
nodeRequest,
204+
transportRequestOptions,
205+
new ActionListenerResponseHandler<>(
206+
ActionListener.releaseAfter(nodeResponseListener, refs.acquire()),
207+
in -> newNodeResponse(in, node)
208+
)
209+
);
156210
}
157211
}
158-
159-
newResponseAsync(task, request, responses, failures, listener);
160212
}
161213

162214
/**
@@ -199,141 +251,9 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
199251
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
200252
}
201253

202-
/**
203-
* Get a backwards compatible transport action name
204-
*/
205-
protected String getTransportNodeAction(DiscoveryNode node) {
206-
return transportNodeAction;
207-
}
208-
209-
class AsyncAction implements CancellableTask.CancellationListener {
210-
211-
private final NodesRequest request;
212-
private final ActionListener<NodesResponse> listener;
213-
private final NodeResponseTracker nodeResponseTracker;
214-
private final Task task;
215-
216-
AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
217-
this.task = task;
218-
this.request = request;
219-
this.listener = listener;
220-
if (request.concreteNodes() == null) {
221-
resolveRequest(request, clusterService.state());
222-
assert request.concreteNodes() != null;
223-
}
224-
this.nodeResponseTracker = new NodeResponseTracker(request.concreteNodes().length);
225-
}
226-
227-
void start() {
228-
if (task instanceof CancellableTask cancellableTask) {
229-
cancellableTask.addListener(this);
230-
}
231-
final DiscoveryNode[] nodes = request.concreteNodes();
232-
if (nodes.length == 0) {
233-
finishHim();
234-
return;
235-
}
236-
final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout());
237-
for (int i = 0; i < nodes.length; i++) {
238-
final int idx = i;
239-
final DiscoveryNode node = nodes[i];
240-
final String nodeId = node.getId();
241-
try {
242-
TransportRequest nodeRequest = newNodeRequest(request);
243-
if (task != null) {
244-
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
245-
}
246-
247-
transportService.sendRequest(
248-
node,
249-
getTransportNodeAction(node),
250-
nodeRequest,
251-
transportRequestOptions,
252-
new TransportResponseHandler<NodeResponse>() {
253-
@Override
254-
public NodeResponse read(StreamInput in) throws IOException {
255-
return newNodeResponse(in, node);
256-
}
257-
258-
@Override
259-
public void handleResponse(NodeResponse response) {
260-
onOperation(idx, response);
261-
}
262-
263-
@Override
264-
public void handleException(TransportException exp) {
265-
onFailure(idx, node.getId(), exp);
266-
}
267-
268-
@Override
269-
public String toString() {
270-
return "AsyncActionNodeResponseHandler{node=" + node + ", action=" + AsyncAction.this + '}';
271-
}
272-
}
273-
);
274-
} catch (Exception e) {
275-
onFailure(idx, nodeId, e);
276-
}
277-
}
278-
}
279-
280-
// For testing purposes
281-
NodeResponseTracker getNodeResponseTracker() {
282-
return nodeResponseTracker;
283-
}
284-
285-
private void onOperation(int idx, NodeResponse nodeResponse) {
286-
if (nodeResponseTracker.trackResponseAndCheckIfLast(idx, nodeResponse)) {
287-
finishHim();
288-
}
289-
}
290-
291-
private void onFailure(int idx, String nodeId, Throwable t) {
292-
logger.debug(() -> "failed to execute on node [" + nodeId + "]", t);
293-
if (nodeResponseTracker.trackResponseAndCheckIfLast(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t))) {
294-
finishHim();
295-
}
296-
}
297-
298-
private void finishHim() {
299-
if ((task instanceof CancellableTask t) && t.notifyIfCancelled(listener)) {
300-
return;
301-
}
302-
303-
final String executor = finalExecutor.equals(ThreadPool.Names.SAME) ? ThreadPool.Names.GENERIC : finalExecutor;
304-
threadPool.executor(executor).execute(() -> {
305-
try {
306-
newResponse(task, request, nodeResponseTracker, listener);
307-
} catch (NodeResponseTracker.DiscardedResponsesException e) {
308-
// We propagate the reason that the results, in this case the task cancellation, in case the listener needs to take
309-
// follow-up actions
310-
listener.onFailure((Exception) e.getCause());
311-
}
312-
});
313-
}
314-
315-
@Override
316-
public void onCancelled() {
317-
assert task instanceof CancellableTask : "task must be cancellable";
318-
try {
319-
((CancellableTask) task).ensureNotCancelled();
320-
} catch (TaskCancelledException e) {
321-
nodeResponseTracker.discardIntermediateResponses(e);
322-
}
323-
}
324-
325-
@Override
326-
public String toString() {
327-
return "AsyncAction{request=" + request + ", listener=" + listener + '}';
328-
}
329-
}
330-
331254
class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
332255
@Override
333256
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
334-
if (task instanceof CancellableTask) {
335-
((CancellableTask) task).ensureNotCancelled();
336-
}
337257
channel.sendResponse(nodeOperation(request, task));
338258
}
339259
}

0 commit comments

Comments
 (0)