Skip to content

Commit 19bb31a

Browse files
committed
Introduce CancellableFanOut
We have this somewhat-complex pattern in 3 places already, and elastic#96279 will introduce a couple more, so this commit extracts it as a dedicated utility. Relates elastic#92987 Relates elastic#93484
1 parent fdabe7d commit 19bb31a

File tree

4 files changed

+433
-238
lines changed

4 files changed

+433
-238
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.common.util.concurrent.RunOnce;
15+
import org.elasticsearch.core.Nullable;
16+
import org.elasticsearch.tasks.CancellableTask;
17+
import org.elasticsearch.tasks.Task;
18+
19+
import java.util.Iterator;
20+
21+
/**
22+
* Allows an action to fan-out to several sub-actions and accumulate their results, but which reacts to a cancellation by releasing all
23+
* references to itself, and hence the partially-accumulated results, allowing them to be garbage-collected. This is a useful protection for
24+
* cases where the results may consume a lot of heap (e.g. stats) but the final response may be delayed by a single slow node for long
25+
* enough that the client gives up.
26+
* <p>
27+
* Note that it's easy to accidentally capture another reference to this class when implementing it, and this will prevent the early release
28+
* of any accumulated results. Beware of lambdas and method references. You must test your implementation carefully (using e.g.
29+
* {@code ReachabilityChecker}) to make sure it doesn't do this.
30+
*/
31+
public abstract class CancellableFanOut<Item, ItemResponse, FinalResponse> {
32+
33+
private static final Logger logger = LogManager.getLogger(CancellableFanOut.class);
34+
35+
/**
36+
* Run the fan-out action.
37+
*
38+
* @param task The task to watch for cancellations. If {@code null} or not a {@link CancellableTask} then the fan-out still
39+
* works, just without any cancellation handling.
40+
* @param itemsIterator The items over which to fan out. Iterated on the calling thread.
41+
* @param listener A listener for the final response, which is completed after all the fanned-out actions have completed. It is not
42+
* completed promptly on cancellation. Completed on the thread that handles the final per-item response (or
43+
* the calling thread if there are no items).
44+
*/
45+
public final void run(@Nullable Task task, Iterator<Item> itemsIterator, ActionListener<FinalResponse> listener) {
46+
47+
final var cancellableTask = task instanceof CancellableTask ct ? ct : null;
48+
49+
// Captures the final result as soon as it's known (either on completion or on cancellation) without necessarily completing the
50+
// outer listener, because we do not want to complete the outer listener until all sub-tasks are complete
51+
final var resultListener = new SubscribableListener<FinalResponse>();
52+
53+
// Completes resultListener (either on completion or on cancellation). Captures a reference to 'this', but within a 'RunOnce' so it
54+
// is released promptly when executed.
55+
final var resultListenerCompleter = new RunOnce(() -> {
56+
if (cancellableTask != null && cancellableTask.notifyIfCancelled(resultListener)) {
57+
return;
58+
}
59+
onCompletion(resultListener);
60+
61+
// It's important that onCompletion() completes resultListener before returning, because otherwise there's a risk that
62+
// a cancellation arrives later which might unexpectedly complete the final listener on a transport thread.
63+
assert resultListener.isDone() : "onCompletion did not complete its listener";
64+
});
65+
66+
// Collects the per-item listeners up so they can all be completed exceptionally on cancellation. Never completed successfully.
67+
final var itemCancellationListener = new SubscribableListener<ItemResponse>();
68+
if (cancellableTask != null) {
69+
cancellableTask.addListener(() -> {
70+
assert cancellableTask.isCancelled();
71+
resultListenerCompleter.run();
72+
cancellableTask.notifyIfCancelled(itemCancellationListener);
73+
});
74+
}
75+
76+
try (var refs = new RefCountingRunnable(() -> {
77+
// When all sub-tasks are complete, pass the result from resultListener to the outer listener.
78+
resultListenerCompleter.run();
79+
// resultListener is always complete by this point, so the outer listener is completed on this thread
80+
resultListener.addListener(listener);
81+
})) {
82+
while (itemsIterator.hasNext()) {
83+
final var item = itemsIterator.next();
84+
85+
// Captures a reference to 'this', but within a 'notifyOnce' so it is released promptly when completed.
86+
final ActionListener<ItemResponse> itemResponseListener = ActionListener.notifyOnce(new ActionListener<>() {
87+
@Override
88+
public void onResponse(ItemResponse itemResponse) {
89+
onItemResponse(item, itemResponse);
90+
}
91+
92+
@Override
93+
public void onFailure(Exception e) {
94+
if (cancellableTask != null && cancellableTask.isCancelled()) {
95+
// Completed on cancellation so it is released promptly, but there's no need to handle the exception.
96+
return;
97+
}
98+
onItemFailure(item, e);
99+
}
100+
101+
@Override
102+
public String toString() {
103+
return "[" + CancellableFanOut.this + "][" + item + "]";
104+
}
105+
});
106+
107+
if (cancellableTask != null) {
108+
if (cancellableTask.isCancelled()) {
109+
return;
110+
}
111+
112+
// Register this item's listener for prompt cancellation notification.
113+
itemCancellationListener.addListener(itemResponseListener);
114+
}
115+
116+
// Process the item, capturing a ref to make sure the outer listener is completed after this item is processed.
117+
sendItemRequest(item, ActionListener.releaseAfter(itemResponseListener, refs.acquire()));
118+
}
119+
} catch (Exception e) {
120+
// NB the listener may have been completed already (by exiting this try block) so this exception may not be sent to the caller,
121+
// but we cannot do anything else with it; an exception here is a bug anyway.
122+
logger.error("unexpected failure in [" + this + "]", e);
123+
assert false : e;
124+
throw e;
125+
}
126+
}
127+
128+
/**
129+
* Run the action (typically by sending a transport request) for an individual item. Called in sequence on the thread that invoked
130+
* {@link #run}. May not be called for every item if the task is cancelled during the iteration.
131+
* <p>
132+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
133+
* early release of any accumulated results. Beware of lambdas, and test carefully.
134+
*/
135+
protected abstract void sendItemRequest(Item item, ActionListener<ItemResponse> listener);
136+
137+
/**
138+
* Handle a successful response for an item. May be called concurrently for multiple items. Not called if the task is cancelled.
139+
* <p>
140+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
141+
* early release of any accumulated results. Beware of lambdas, and test carefully.
142+
*/
143+
protected abstract void onItemResponse(Item item, ItemResponse itemResponse);
144+
145+
/**
146+
* Handle a failure for an item. May be called concurrently for multiple items. Not called if the task is cancelled.
147+
* <p>
148+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
149+
* early release of any accumulated results. Beware of lambdas, and test carefully.
150+
*/
151+
protected abstract void onItemFailure(Item item, Exception e);
152+
153+
/**
154+
* Called when responses for all items have been processed, on the thread that processed the last per-item response. Not called if the
155+
* task is cancelled. Must complete the listener before returning.
156+
* <p>
157+
* Note that it's easy to accidentally capture another reference to this class when implementing this method, and that will prevent the
158+
* early release of any accumulated results. Beware of lambdas, and test carefully.
159+
*/
160+
protected abstract void onCompletion(ActionListener<FinalResponse> listener);
161+
}

0 commit comments

Comments
 (0)