Skip to content

Commit 2b0d6e6

Browse files
authored
Introduce CancellableFanOut (#96373)
We have this somewhat-complex pattern in 3 places already, and #96279 will introduce a couple more, so this commit extracts it as a dedicated utility. Relates #92987 Relates #93484
1 parent 9649ee0 commit 2b0d6e6

File tree

4 files changed

+448
-239
lines changed

4 files changed

+448
-239
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.common.util.concurrent.RunOnce;
15+
import org.elasticsearch.core.Nullable;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
19+
import java.util.Iterator;
20+
21+
/**
22+
* Allows an action to fan-out to several sub-actions and accumulate their results, but which reacts to a cancellation by releasing all
23+
* references to itself, and hence the partially-accumulated results, allowing them to be garbage-collected. This is a useful protection for
24+
* cases where the results may consume a lot of heap (e.g. stats) but the final response may be delayed by a single slow node for long
25+
* enough that the client gives up.
26+
* <p>
27+
* Note that it's easy to accidentally capture another reference to this class when implementing it, and this will prevent the early release
28+
* of any accumulated results. Beware of lambdas and method references. You must test your implementation carefully (using e.g.
29+
* {@code ReachabilityChecker}) to make sure it doesn't do this.
30+
*/
31+
public abstract class CancellableFanOut<Item, ItemResponse, FinalResponse> {
32+
33+
private static final Logger logger = LogManager.getLogger(CancellableFanOut.class);
34+
35+
/**
36+
* Run the fan-out action.
37+
*
38+
* @param task The task to watch for cancellations. If {@code null} or not a {@link CancellableTask} then the fan-out still
39+
* works, just without any cancellation handling.
40+
* @param itemsIterator The items over which to fan out. Iterated on the calling thread.
41+
* @param listener A listener for the final response, which is completed after all the fanned-out actions have completed. It is not
42+
* completed promptly on cancellation. Completed on the thread that handles the final per-item response (or
43+
* the calling thread if there are no items).
44+
*/
45+
public final void run(@Nullable Task task, Iterator<Item> itemsIterator, ActionListener<FinalResponse> listener) {
46+
47+
final var cancellableTask = task instanceof CancellableTask ct ? ct : null;
48+
49+
// Captures the final result as soon as it's known (either on completion or on cancellation) without necessarily completing the
50+
// outer listener, because we do not want to complete the outer listener until all sub-tasks are complete
51+
final var resultListener = new SubscribableListener<FinalResponse>();
52+
53+
// Completes resultListener (either on completion or on cancellation). Captures a reference to 'this', but within a 'RunOnce' so it
54+
// is released promptly when executed.
55+
final var resultListenerCompleter = new RunOnce(() -> {
56+
if (cancellableTask != null && cancellableTask.notifyIfCancelled(resultListener)) {
57+
return;
58+
}
59+
// It's important that we complete resultListener before returning, because otherwise there's a risk that a cancellation arrives
60+
// later which might unexpectedly complete the final listener on a transport thread.
61+
ActionListener.completeWith(resultListener, this::onCompletion);
62+
});
63+
64+
// Collects the per-item listeners up so they can all be completed exceptionally on cancellation. Never completed successfully.
65+
final var itemCancellationListener = new SubscribableListener<ItemResponse>();
66+
if (cancellableTask != null) {
67+
cancellableTask.addListener(() -> {
68+
assert cancellableTask.isCancelled();
69+
resultListenerCompleter.run();
70+
cancellableTask.notifyIfCancelled(itemCancellationListener);
71+
});
72+
}
73+
74+
try (var refs = new RefCountingRunnable(() -> {
75+
// When all sub-tasks are complete, pass the result from resultListener to the outer listener.
76+
resultListenerCompleter.run();
77+
// resultListener is always complete by this point, so the outer listener is completed on this thread
78+
resultListener.addListener(listener);
79+
})) {
80+
while (itemsIterator.hasNext()) {
81+
final var item = itemsIterator.next();
82+
83+
// Captures a reference to 'this', but within a 'notifyOnce' so it is released promptly when completed.
84+
final ActionListener<ItemResponse> itemResponseListener = ActionListener.notifyOnce(new ActionListener<>() {
85+
@Override
86+
public void onResponse(ItemResponse itemResponse) {
87+
onItemResponse(item, itemResponse);
88+
}
89+
90+
@Override
91+
public void onFailure(Exception e) {
92+
if (cancellableTask != null && cancellableTask.isCancelled()) {
93+
// Completed on cancellation so it is released promptly, but there's no need to handle the exception.
94+
return;
95+
}
96+
onItemFailure(item, e);
97+
}
98+
99+
@Override
100+
public String toString() {
101+
return "[" + CancellableFanOut.this + "][" + item + "]";
102+
}
103+
});
104+
105+
if (cancellableTask != null) {
106+
if (cancellableTask.isCancelled()) {
107+
return;
108+
}
109+
110+
// Register this item's listener for prompt cancellation notification.
111+
itemCancellationListener.addListener(itemResponseListener);
112+
}
113+
114+
// Process the item, capturing a ref to make sure the outer listener is completed after this item is processed.
115+
sendItemRequest(item, ActionListener.releaseAfter(itemResponseListener, refs.acquire()));
116+
}
117+
} catch (Exception e) {
118+
// NB the listener may have been completed already (by exiting this try block) so this exception may not be sent to the caller,
119+
// but we cannot do anything else with it; an exception here is a bug anyway.
120+
logger.error("unexpected failure in [" + this + "]", e);
121+
assert false : e;
122+
throw e;
123+
}
124+
}
125+
126+
/**
127+
* Run the action (typically by sending a transport request) for an individual item. Called in sequence on the thread that invoked
128+
* {@link #run}. May not be called for every item if the task is cancelled during the iteration.
129+
* <p>
130+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
131+
* early release of any accumulated results. Beware of lambdas, and test carefully.
132+
*/
133+
protected abstract void sendItemRequest(Item item, ActionListener<ItemResponse> listener);
134+
135+
/**
136+
* Handle a successful response for an item. May be called concurrently for multiple items. Not called if the task is cancelled.
137+
* <p>
138+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
139+
* early release of any accumulated results. Beware of lambdas, and test carefully.
140+
*/
141+
protected abstract void onItemResponse(Item item, ItemResponse itemResponse);
142+
143+
/**
144+
* Handle a failure for an item. May be called concurrently for multiple items. Not called if the task is cancelled.
145+
* <p>
146+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
147+
* early release of any accumulated results. Beware of lambdas, and test carefully.
148+
*/
149+
protected abstract void onItemFailure(Item item, Exception e);
150+
151+
/**
152+
* Called when responses for all items have been processed, on the thread that processed the last per-item response. Not called if the
153+
* task is cancelled.
154+
* <p>
155+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
156+
* early release of any accumulated results. Beware of lambdas, and test carefully.
157+
*/
158+
protected abstract FinalResponse onCompletion() throws Exception;
159+
}

0 commit comments

Comments
 (0)