|
11 | 11 | import org.apache.logging.log4j.LogManager;
|
12 | 12 | import org.apache.logging.log4j.Logger;
|
13 | 13 | import org.elasticsearch.action.ActionListener;
|
| 14 | +import org.elasticsearch.action.ActionListenerResponseHandler; |
| 15 | +import org.elasticsearch.action.ActionRunnable; |
14 | 16 | import org.elasticsearch.action.FailedNodeException;
|
15 | 17 | import org.elasticsearch.action.support.ActionFilters;
|
16 | 18 | import org.elasticsearch.action.support.HandledTransportAction;
|
17 |
| -import org.elasticsearch.action.support.NodeResponseTracker; |
| 19 | +import org.elasticsearch.action.support.RefCountingRunnable; |
18 | 20 | import org.elasticsearch.cluster.ClusterState;
|
19 | 21 | import org.elasticsearch.cluster.node.DiscoveryNode;
|
20 | 22 | import org.elasticsearch.cluster.service.ClusterService;
|
21 | 23 | import org.elasticsearch.common.io.stream.StreamInput;
|
22 | 24 | import org.elasticsearch.common.io.stream.Writeable;
|
| 25 | +import org.elasticsearch.common.util.concurrent.ListenableFuture; |
| 26 | +import org.elasticsearch.common.util.concurrent.RunOnce; |
23 | 27 | import org.elasticsearch.tasks.CancellableTask;
|
24 | 28 | import org.elasticsearch.tasks.Task;
|
25 |
| -import org.elasticsearch.tasks.TaskCancelledException; |
26 | 29 | import org.elasticsearch.threadpool.ThreadPool;
|
27 | 30 | import org.elasticsearch.transport.TransportChannel;
|
28 |
| -import org.elasticsearch.transport.TransportException; |
29 | 31 | import org.elasticsearch.transport.TransportRequest;
|
30 | 32 | import org.elasticsearch.transport.TransportRequestHandler;
|
31 | 33 | import org.elasticsearch.transport.TransportRequestOptions;
|
32 |
| -import org.elasticsearch.transport.TransportResponseHandler; |
33 | 34 | import org.elasticsearch.transport.TransportService;
|
34 | 35 |
|
35 | 36 | import java.io.IOException;
|
|
38 | 39 | import java.util.List;
|
39 | 40 | import java.util.Objects;
|
40 | 41 |
|
| 42 | +import static org.elasticsearch.core.Strings.format; |
| 43 | + |
41 | 44 | public abstract class TransportNodesAction<
|
42 | 45 | NodesRequest extends BaseNodesRequest<NodesRequest>,
|
43 | 46 | NodesResponse extends BaseNodesResponse<?>,
|
@@ -85,7 +88,7 @@ protected TransportNodesAction(
|
85 | 88 | this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);
|
86 | 89 |
|
87 | 90 | this.transportNodeAction = actionName + "[n]";
|
88 |
| - this.finalExecutor = finalExecutor; |
| 91 | + this.finalExecutor = finalExecutor.equals(ThreadPool.Names.SAME) ? ThreadPool.Names.GENERIC : finalExecutor; |
89 | 92 | transportService.registerRequestHandler(transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
|
90 | 93 | }
|
91 | 94 |
|
@@ -123,40 +126,90 @@ protected TransportNodesAction(
|
123 | 126 |
|
124 | 127 | @Override
|
125 | 128 | protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
|
126 |
| - new AsyncAction(task, request, listener).start(); |
127 |
| - } |
| 129 | + if (request.concreteNodes() == null) { |
| 130 | + resolveRequest(request, clusterService.state()); |
| 131 | + assert request.concreteNodes() != null; |
| 132 | + } |
128 | 133 |
|
129 |
| - /** |
130 |
| - * Map the responses into {@code nodeResponseClass} responses and {@link FailedNodeException}s, convert to a {@link NodesResponse} and |
131 |
| - * pass it to the listener. Fails the listener with a {@link NullPointerException} if {@code nodesResponses} is null. |
132 |
| - * |
133 |
| - * @param request The associated request. |
134 |
| - * @param nodeResponseTracker All node-level responses collected so far |
135 |
| - * @throws NodeResponseTracker.DiscardedResponsesException if {@code nodeResponseTracker} has already discarded the intermediate results |
136 |
| - * @see #newResponseAsync(Task, BaseNodesRequest, List, List, ActionListener) |
137 |
| - */ |
138 |
| - // exposed for tests |
139 |
| - void newResponse(Task task, NodesRequest request, NodeResponseTracker nodeResponseTracker, ActionListener<NodesResponse> listener) |
140 |
| - throws NodeResponseTracker.DiscardedResponsesException { |
| 134 | + final var mutex = new Object(); |
| 135 | + final var responses = new ArrayList<NodeResponse>(request.concreteNodes().length); |
| 136 | + final var exceptions = new ArrayList<FailedNodeException>(0); |
141 | 137 |
|
142 |
| - if (nodeResponseTracker == null) { |
143 |
| - listener.onFailure(new NullPointerException("nodesResponses")); |
144 |
| - return; |
| 138 | + final var resultListener = new ListenableFuture<NodesResponse>(); |
| 139 | + final var resultListenerCompleter = new RunOnce(() -> { |
| 140 | + if (task instanceof CancellableTask cancellableTask) { |
| 141 | + if (cancellableTask.notifyIfCancelled(resultListener)) { |
| 142 | + return; |
| 143 | + } |
| 144 | + } |
| 145 | + // ref releases all happen-before here so no need to be synchronized |
| 146 | + threadPool.executor(finalExecutor) |
| 147 | + .execute(ActionRunnable.wrap(resultListener, l -> newResponseAsync(task, request, responses, exceptions, l))); |
| 148 | + }); |
| 149 | + |
| 150 | + final var nodeFailureListeners = new ListenableFuture<NodeResponse>(); |
| 151 | + if (task instanceof CancellableTask cancellableTask) { |
| 152 | + cancellableTask.addListener(() -> { |
| 153 | + assert cancellableTask.isCancelled(); |
| 154 | + resultListenerCompleter.run(); |
| 155 | + cancellableTask.notifyIfCancelled(nodeFailureListeners); |
| 156 | + }); |
145 | 157 | }
|
146 | 158 |
|
147 |
| - final List<NodeResponse> responses = new ArrayList<>(); |
148 |
| - final List<FailedNodeException> failures = new ArrayList<>(); |
| 159 | + final var transportRequestOptions = TransportRequestOptions.timeout(request.timeout()); |
| 160 | + |
| 161 | + try (var refs = new RefCountingRunnable(() -> { |
| 162 | + resultListener.addListener(listener); |
| 163 | + resultListenerCompleter.run(); |
| 164 | + })) { |
| 165 | + for (final var node : request.concreteNodes()) { |
| 166 | + final ActionListener<NodeResponse> nodeResponseListener = ActionListener.notifyOnce(new ActionListener<>() { |
| 167 | + @Override |
| 168 | + public void onResponse(NodeResponse nodeResponse) { |
| 169 | + synchronized (mutex) { |
| 170 | + responses.add(nodeResponse); |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + @Override |
| 175 | + public void onFailure(Exception e) { |
| 176 | + if (task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) { |
| 177 | + return; |
| 178 | + } |
| 179 | + |
| 180 | + logger.debug(() -> format("failed to execute [%s] on node [%s]", actionName, node), e); |
| 181 | + synchronized (mutex) { |
| 182 | + exceptions.add(new FailedNodeException(node.getId(), "Failed node [" + node.getId() + "]", e)); |
| 183 | + } |
| 184 | + } |
149 | 185 |
|
150 |
| - for (int i = 0; i < nodeResponseTracker.getExpectedResponseCount(); ++i) { |
151 |
| - Object response = nodeResponseTracker.getResponse(i); |
152 |
| - if (nodeResponseTracker.getResponse(i)instanceof FailedNodeException failedNodeException) { |
153 |
| - failures.add(failedNodeException); |
154 |
| - } else { |
155 |
| - responses.add(nodeResponseClass.cast(response)); |
| 186 | + @Override |
| 187 | + public String toString() { |
| 188 | + return "[" + actionName + "][" + node.descriptionWithoutAttributes() + "]"; |
| 189 | + } |
| 190 | + }); |
| 191 | + |
| 192 | + if (task instanceof CancellableTask) { |
| 193 | + nodeFailureListeners.addListener(nodeResponseListener); |
| 194 | + } |
| 195 | + |
| 196 | + final var nodeRequest = newNodeRequest(request); |
| 197 | + if (task != null) { |
| 198 | + nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); |
| 199 | + } |
| 200 | + |
| 201 | + transportService.sendRequest( |
| 202 | + node, |
| 203 | + transportNodeAction, |
| 204 | + nodeRequest, |
| 205 | + transportRequestOptions, |
| 206 | + new ActionListenerResponseHandler<>( |
| 207 | + ActionListener.releaseAfter(nodeResponseListener, refs.acquire()), |
| 208 | + in -> newNodeResponse(in, node) |
| 209 | + ) |
| 210 | + ); |
156 | 211 | }
|
157 | 212 | }
|
158 |
| - |
159 |
| - newResponseAsync(task, request, responses, failures, listener); |
160 | 213 | }
|
161 | 214 |
|
162 | 215 | /**
|
@@ -199,141 +252,9 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
|
199 | 252 | request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
|
200 | 253 | }
|
201 | 254 |
|
202 |
| - /** |
203 |
| - * Get a backwards compatible transport action name |
204 |
| - */ |
205 |
| - protected String getTransportNodeAction(DiscoveryNode node) { |
206 |
| - return transportNodeAction; |
207 |
| - } |
208 |
| - |
209 |
| - class AsyncAction implements CancellableTask.CancellationListener { |
210 |
| - |
211 |
| - private final NodesRequest request; |
212 |
| - private final ActionListener<NodesResponse> listener; |
213 |
| - private final NodeResponseTracker nodeResponseTracker; |
214 |
| - private final Task task; |
215 |
| - |
216 |
| - AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) { |
217 |
| - this.task = task; |
218 |
| - this.request = request; |
219 |
| - this.listener = listener; |
220 |
| - if (request.concreteNodes() == null) { |
221 |
| - resolveRequest(request, clusterService.state()); |
222 |
| - assert request.concreteNodes() != null; |
223 |
| - } |
224 |
| - this.nodeResponseTracker = new NodeResponseTracker(request.concreteNodes().length); |
225 |
| - } |
226 |
| - |
227 |
| - void start() { |
228 |
| - if (task instanceof CancellableTask cancellableTask) { |
229 |
| - cancellableTask.addListener(this); |
230 |
| - } |
231 |
| - final DiscoveryNode[] nodes = request.concreteNodes(); |
232 |
| - if (nodes.length == 0) { |
233 |
| - finishHim(); |
234 |
| - return; |
235 |
| - } |
236 |
| - final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout()); |
237 |
| - for (int i = 0; i < nodes.length; i++) { |
238 |
| - final int idx = i; |
239 |
| - final DiscoveryNode node = nodes[i]; |
240 |
| - final String nodeId = node.getId(); |
241 |
| - try { |
242 |
| - TransportRequest nodeRequest = newNodeRequest(request); |
243 |
| - if (task != null) { |
244 |
| - nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); |
245 |
| - } |
246 |
| - |
247 |
| - transportService.sendRequest( |
248 |
| - node, |
249 |
| - getTransportNodeAction(node), |
250 |
| - nodeRequest, |
251 |
| - transportRequestOptions, |
252 |
| - new TransportResponseHandler<NodeResponse>() { |
253 |
| - @Override |
254 |
| - public NodeResponse read(StreamInput in) throws IOException { |
255 |
| - return newNodeResponse(in, node); |
256 |
| - } |
257 |
| - |
258 |
| - @Override |
259 |
| - public void handleResponse(NodeResponse response) { |
260 |
| - onOperation(idx, response); |
261 |
| - } |
262 |
| - |
263 |
| - @Override |
264 |
| - public void handleException(TransportException exp) { |
265 |
| - onFailure(idx, node.getId(), exp); |
266 |
| - } |
267 |
| - |
268 |
| - @Override |
269 |
| - public String toString() { |
270 |
| - return "AsyncActionNodeResponseHandler{node=" + node + ", action=" + AsyncAction.this + '}'; |
271 |
| - } |
272 |
| - } |
273 |
| - ); |
274 |
| - } catch (Exception e) { |
275 |
| - onFailure(idx, nodeId, e); |
276 |
| - } |
277 |
| - } |
278 |
| - } |
279 |
| - |
280 |
| - // For testing purposes |
281 |
| - NodeResponseTracker getNodeResponseTracker() { |
282 |
| - return nodeResponseTracker; |
283 |
| - } |
284 |
| - |
285 |
| - private void onOperation(int idx, NodeResponse nodeResponse) { |
286 |
| - if (nodeResponseTracker.trackResponseAndCheckIfLast(idx, nodeResponse)) { |
287 |
| - finishHim(); |
288 |
| - } |
289 |
| - } |
290 |
| - |
291 |
| - private void onFailure(int idx, String nodeId, Throwable t) { |
292 |
| - logger.debug(() -> "failed to execute on node [" + nodeId + "]", t); |
293 |
| - if (nodeResponseTracker.trackResponseAndCheckIfLast(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t))) { |
294 |
| - finishHim(); |
295 |
| - } |
296 |
| - } |
297 |
| - |
298 |
| - private void finishHim() { |
299 |
| - if ((task instanceof CancellableTask t) && t.notifyIfCancelled(listener)) { |
300 |
| - return; |
301 |
| - } |
302 |
| - |
303 |
| - final String executor = finalExecutor.equals(ThreadPool.Names.SAME) ? ThreadPool.Names.GENERIC : finalExecutor; |
304 |
| - threadPool.executor(executor).execute(() -> { |
305 |
| - try { |
306 |
| - newResponse(task, request, nodeResponseTracker, listener); |
307 |
| - } catch (NodeResponseTracker.DiscardedResponsesException e) { |
308 |
| - // We propagate the reason that the results, in this case the task cancellation, in case the listener needs to take |
309 |
| - // follow-up actions |
310 |
| - listener.onFailure((Exception) e.getCause()); |
311 |
| - } |
312 |
| - }); |
313 |
| - } |
314 |
| - |
315 |
| - @Override |
316 |
| - public void onCancelled() { |
317 |
| - assert task instanceof CancellableTask : "task must be cancellable"; |
318 |
| - try { |
319 |
| - ((CancellableTask) task).ensureNotCancelled(); |
320 |
| - } catch (TaskCancelledException e) { |
321 |
| - nodeResponseTracker.discardIntermediateResponses(e); |
322 |
| - } |
323 |
| - } |
324 |
| - |
325 |
| - @Override |
326 |
| - public String toString() { |
327 |
| - return "AsyncAction{request=" + request + ", listener=" + listener + '}'; |
328 |
| - } |
329 |
| - } |
330 |
| - |
331 | 255 | class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
|
332 | 256 | @Override
|
333 | 257 | public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
|
334 |
| - if (task instanceof CancellableTask) { |
335 |
| - ((CancellableTask) task).ensureNotCancelled(); |
336 |
| - } |
337 | 258 | channel.sendResponse(nodeOperation(request, task));
|
338 | 259 | }
|
339 | 260 | }
|
|
0 commit comments