Skip to content

Commit 7a5dca0

Browse files
authored
Introduce RefCountingRunnable (#92620)
Today a `CountDownActionListener` which wraps a bare `Runnable` collects all the exceptions it receives only to drop them when finally completing the delegate action. Moreover callers must declare up-front the number of times the listener will be completed, which means they must put extra effort into computing this number ahead of time and/or supply an overestimate and then make up the difference with additional artificial completions. This commit introduces `RefCountingRunnable` which allows callers to acquire and release references as needed, executing the delegate `Runnable` once all references are released. It also refactors all the relevant call sites to use this new utility.
1 parent 04f88e7 commit 7a5dca0

File tree

16 files changed

+799
-390
lines changed

16 files changed

+799
-390
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.core.CheckedConsumer;
1414
import org.elasticsearch.core.CheckedFunction;
1515
import org.elasticsearch.core.CheckedRunnable;
16+
import org.elasticsearch.core.Releasable;
17+
import org.elasticsearch.core.Releasables;
1618

1719
import java.util.ArrayList;
1820
import java.util.List;
@@ -252,6 +254,13 @@ public String toString() {
252254
}
253255
}
254256

257+
/**
258+
* Creates a listener which releases the given resource on completion (whether success or failure)
259+
*/
260+
static <Response> ActionListener<Response> releasing(Releasable releasable) {
261+
return wrap(runnableFromReleasable(releasable));
262+
}
263+
255264
/**
256265
* Creates a listener that listens for a response (or failure) and executes the
257266
* corresponding runnable when the response (or failure) is received.
@@ -362,6 +371,14 @@ static <Response> ActionListener<Response> runAfter(ActionListener<Response> del
362371
return new RunAfterActionListener<>(delegate, runAfter);
363372
}
364373

374+
/**
375+
* Wraps a given listener and returns a new listener which releases the provided {@code releaseAfter}
376+
* resource when the listener is notified via either {@code #onResponse} or {@code #onFailure}.
377+
*/
378+
static <Response> ActionListener<Response> releaseAfter(ActionListener<Response> delegate, Releasable releaseAfter) {
379+
return new RunAfterActionListener<>(delegate, runnableFromReleasable(releaseAfter));
380+
}
381+
365382
final class RunAfterActionListener<T> extends Delegating<T, T> {
366383

367384
private final Runnable runAfter;
@@ -498,4 +515,18 @@ static <Response> void completeWith(ActionListener<Response> listener, CheckedSu
498515
throw ex;
499516
}
500517
}
518+
519+
private static Runnable runnableFromReleasable(Releasable releasable) {
520+
return new Runnable() {
521+
@Override
522+
public void run() {
523+
Releasables.closeExpectNoException(releasable);
524+
}
525+
526+
@Override
527+
public String toString() {
528+
return "release[" + releasable + "]";
529+
}
530+
};
531+
}
501532
}

server/src/main/java/org/elasticsearch/action/support/CountDownActionListener.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,6 @@ public CountDownActionListener(int groupSize, ActionListener<Void> delegate) {
3737
countDown = new AtomicInteger(groupSize);
3838
}
3939

40-
/**
41-
* Creates a new listener
42-
* @param groupSize the group size
43-
* @param runnable the runnable
44-
*/
45-
public CountDownActionListener(int groupSize, Runnable runnable) {
46-
this(groupSize, ActionListener.wrap(Objects.requireNonNull(runnable)));
47-
}
48-
4940
private boolean countDown() {
5041
final var result = countDown.getAndUpdate(current -> Math.max(0, current - 1));
5142
assert result > 0;
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.common.util.concurrent.CountDown;
15+
import org.elasticsearch.core.AbstractRefCounted;
16+
import org.elasticsearch.core.RefCounted;
17+
import org.elasticsearch.core.Releasable;
18+
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
21+
/**
22+
* A mechanism to trigger an action on the completion of some (dynamic) collection of other actions. Basic usage is as follows:
23+
*
24+
* <pre>
25+
* try (var refs = new RefCountingRunnable(finalRunnable)) {
26+
* for (var item : collection) {
27+
* runAsyncAction(item, refs.acquire()); // releases the acquired ref on completion
28+
* }
29+
* }
30+
* </pre>
31+
*
32+
* The delegate action is completed when execution leaves the try-with-resources block and every acquired reference is released. Unlike a
33+
* {@link CountDown} there is no need to declare the number of subsidiary actions up front (refs can be acquired dynamically as needed) nor
34+
* does the caller need to check for completion each time a reference is released. Moreover even outside the try-with-resources block you
35+
* can continue to acquire additional listeners, even in a separate thread, as long as there's at least one listener outstanding:
36+
*
37+
* <pre>
38+
* try (var refs = new RefCountingRunnable(finalRunnable)) {
39+
* for (var item : collection) {
40+
* if (condition(item)) {
41+
* runAsyncAction(item, refs.acquire());
42+
* }
43+
* }
44+
* if (flag) {
45+
* runOneOffAsyncAction(refs.acquire());
46+
* return;
47+
* }
48+
* for (var item : otherCollection) {
49+
* var itemRef = refs.acquire(); // delays completion while the background action is pending
50+
* executorService.execute(() -> {
51+
* try (var ignored = itemRef) {
52+
* if (condition(item)) {
53+
* runOtherAsyncAction(item, refs.acquire());
54+
* }
55+
* }
56+
* });
57+
* }
58+
* }
59+
* </pre>
60+
*
61+
* In particular (and also unlike a {@link CountDown}) this works even if you don't acquire any extra refs at all: in that case, the
62+
* delegate action executes at the end of the try-with-resources block.
63+
*/
64+
public final class RefCountingRunnable implements Releasable {
65+
66+
private static final Logger logger = LogManager.getLogger(RefCountingRunnable.class);
67+
static final String ALREADY_CLOSED_MESSAGE = "already closed, cannot acquire or release any further refs";
68+
69+
private final RefCounted refCounted;
70+
private final AtomicBoolean originalRefReleased = new AtomicBoolean();
71+
72+
private class AcquiredRef implements Releasable {
73+
private final AtomicBoolean released = new AtomicBoolean();
74+
75+
@Override
76+
public void close() {
77+
releaseRef(released);
78+
}
79+
80+
@Override
81+
public String toString() {
82+
return RefCountingRunnable.this.toString();
83+
}
84+
}
85+
86+
/**
87+
* Construct a {@link RefCountingRunnable} which executes {@code delegate} when all refs are released.
88+
* @param delegate The action to execute when all refs are released. This action must not throw any exception.
89+
*/
90+
public RefCountingRunnable(Runnable delegate) {
91+
this.refCounted = AbstractRefCounted.of(delegate);
92+
}
93+
94+
/**
95+
* Acquire a reference to this object and return an action which releases it. The delegate {@link Runnable} is called when all its
96+
* references have been released.
97+
*
98+
* Callers must take care to close the returned resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
99+
*/
100+
public Releasable acquire() {
101+
if (refCounted.tryIncRef()) {
102+
return new AcquiredRef();
103+
}
104+
assert false : ALREADY_CLOSED_MESSAGE;
105+
throw new IllegalStateException(ALREADY_CLOSED_MESSAGE);
106+
}
107+
108+
/**
109+
* Acquire a reference to this object and return a listener which releases it when notified. The delegate {@link Runnable} is called
110+
* when all its references have been released.
111+
*/
112+
public ActionListener<Void> acquireListener() {
113+
return ActionListener.releasing(acquire());
114+
}
115+
116+
/**
117+
* Release the original reference to this object, which executes the delegate {@link Runnable} if there are no other references.
118+
*
119+
* Callers must take care to close this resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
120+
*/
121+
@Override
122+
public void close() {
123+
releaseRef(originalRefReleased);
124+
}
125+
126+
private void releaseRef(AtomicBoolean released) {
127+
if (released.compareAndSet(false, true)) {
128+
try {
129+
refCounted.decRef();
130+
} catch (Exception e) {
131+
logger.error("exception in delegate", e);
132+
assert false : e;
133+
}
134+
} else {
135+
assert false : "already closed";
136+
}
137+
}
138+
139+
@Override
140+
public String toString() {
141+
return refCounted.toString();
142+
}
143+
144+
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2121
import org.elasticsearch.action.support.IndicesOptions;
2222
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.action.support.RefCountingRunnable;
2324
import org.elasticsearch.action.support.ThreadedActionListener;
2425
import org.elasticsearch.client.internal.Client;
2526
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -32,7 +33,6 @@
3233
import org.elasticsearch.common.settings.Setting;
3334
import org.elasticsearch.common.settings.Setting.Property;
3435
import org.elasticsearch.common.settings.Settings;
35-
import org.elasticsearch.common.util.concurrent.CountDown;
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.index.shard.ShardId;
3838
import org.elasticsearch.index.store.StoreStats;
@@ -161,7 +161,7 @@ public void clusterChanged(ClusterChangedEvent event) {
161161
private class AsyncRefresh {
162162

163163
private final List<ActionListener<ClusterInfo>> thisRefreshListeners;
164-
private final CountDown countDown = new CountDown(2);
164+
private final RefCountingRunnable fetchRefs = new RefCountingRunnable(this::callListeners);
165165

166166
AsyncRefresh(List<ActionListener<ClusterInfo>> thisRefreshListeners) {
167167
this.thisRefreshListeners = thisRefreshListeners;
@@ -177,15 +177,15 @@ void execute() {
177177
return;
178178
}
179179

180-
assert countDown.isCountedDown() == false;
181180
logger.trace("starting async refresh");
182181

183-
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
184-
fetchNodeStats();
185-
}
186-
187-
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
188-
fetchIndicesStats();
182+
try (var ignoredRefs = fetchRefs) {
183+
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
184+
fetchNodeStats();
185+
}
186+
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
187+
fetchIndicesStats();
188+
}
189189
}
190190
}
191191

@@ -203,7 +203,7 @@ private void fetchIndicesStats() {
203203
logger,
204204
threadPool,
205205
ThreadPool.Names.MANAGEMENT,
206-
ActionListener.runAfter(new ActionListener<>() {
206+
ActionListener.releaseAfter(new ActionListener<>() {
207207
@Override
208208
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
209209
logger.trace("received indices stats response");
@@ -277,7 +277,7 @@ public void onFailure(Exception e) {
277277
}
278278
indicesStatsSummary = IndicesStatsSummary.EMPTY;
279279
}
280-
}, this::onStatsProcessed),
280+
}, fetchRefs.acquire()),
281281
false
282282
)
283283
);
@@ -288,7 +288,7 @@ private void fetchNodeStats() {
288288
nodesStatsRequest.clear();
289289
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
290290
nodesStatsRequest.timeout(fetchTimeout);
291-
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() {
291+
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.releaseAfter(new ActionListener<>() {
292292
@Override
293293
public void onResponse(NodesStatsResponse nodesStatsResponse) {
294294
logger.trace("received node stats response");
@@ -318,18 +318,12 @@ public void onFailure(Exception e) {
318318
leastAvailableSpaceUsages = Map.of();
319319
mostAvailableSpaceUsages = Map.of();
320320
}
321-
}, this::onStatsProcessed));
322-
}
323-
324-
private void onStatsProcessed() {
325-
if (countDown.countDown()) {
326-
logger.trace("stats all received, computing cluster info and notifying listeners");
327-
callListeners();
328-
}
321+
}, fetchRefs.acquire()));
329322
}
330323

331324
private void callListeners() {
332325
try {
326+
logger.trace("stats all received, computing cluster info and notifying listeners");
333327
final ClusterInfo clusterInfo = getClusterInfo();
334328
boolean anyListeners = false;
335329
for (final Consumer<ClusterInfo> listener : listeners) {

0 commit comments

Comments
 (0)