Skip to content

Commit f71368a

Browse files
committed
Introduce RefCountingListener
Similar to the `RefCountingRunnable` introduced in elastic#92620, this commit introduces `RefCountingListener` which wraps an `ActionListener<Void>` and allows callers to acquire a dynamic collection of subsidiary listeners. The `RefCountingListener` counts responses and collects exceptions passed to these subsidiary listeners, and completes the wrapped listener once all acquired listeners are complete. Unlike a `CountDownActionListener` or a `GroupedActionListener`, this mechanism avoids the need for callers to declare up-front the number of times the listener will be completed, saving effort in computing this number ahead of time and avoiding the need to sometimes supply an overestimate and then make up the difference with additional artificial completions.
1 parent 592bf66 commit f71368a

File tree

9 files changed

+671
-252
lines changed

9 files changed

+671
-252
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,22 +419,61 @@ public String toString() {
419419
* not be executed.
420420
*/
421421
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
422-
return new RunBeforeActionListener<>(delegate, runBefore);
422+
return runBefore(delegate, new CheckedConsumer<>() {
423+
@Override
424+
public void accept(Response ignored) throws Exception {
425+
runBefore.run();
426+
}
427+
428+
@Override
429+
public String toString() {
430+
return runBefore.toString();
431+
}
432+
}, new CheckedConsumer<>() {
433+
@Override
434+
public void accept(Exception ignored) throws Exception {
435+
runBefore.run();
436+
}
437+
438+
@Override
439+
public String toString() {
440+
return "ibid";
441+
}
442+
});
443+
}
444+
445+
/**
446+
* Wraps a given listener and returns a new listener which executes the relevant callback before the listener is notified via either
447+
* {@code #onResponse} or {@code #onFailure}. If the callback throws an exception then it will be passed to the listener's
448+
* {@code #onFailure} and its {@code #onResponse} will not be executed.
449+
*/
450+
static <Response> ActionListener<Response> runBefore(
451+
ActionListener<Response> delegate,
452+
CheckedConsumer<Response, ?> runBeforeOnResponse,
453+
CheckedConsumer<Exception, ?> runBeforeOnException
454+
) {
455+
return new RunBeforeActionListener<>(delegate, runBeforeOnResponse, runBeforeOnException);
423456
}
424457

425458
final class RunBeforeActionListener<T> extends Delegating<T, T> {
426459

427-
private final CheckedRunnable<?> runBefore;
460+
private final CheckedConsumer<T, ?> runBeforeOnResponse;
461+
private final CheckedConsumer<Exception, ?> runBeforeOnException;
428462

429-
protected RunBeforeActionListener(ActionListener<T> delegate, CheckedRunnable<?> runBefore) {
463+
protected RunBeforeActionListener(
464+
ActionListener<T> delegate,
465+
CheckedConsumer<T, ?> runBeforeOnResponse,
466+
CheckedConsumer<Exception, ?> runBeforeOnException
467+
) {
430468
super(delegate);
431-
this.runBefore = runBefore;
469+
this.runBeforeOnResponse = runBeforeOnResponse;
470+
this.runBeforeOnException = runBeforeOnException;
432471
}
433472

434473
@Override
435474
public void onResponse(T response) {
436475
try {
437-
runBefore.run();
476+
runBeforeOnResponse.accept(response);
438477
} catch (Exception ex) {
439478
super.onFailure(ex);
440479
return;
@@ -445,7 +484,7 @@ public void onResponse(T response) {
445484
@Override
446485
public void onFailure(Exception e) {
447486
try {
448-
runBefore.run();
487+
runBeforeOnException.accept(e);
449488
} catch (Exception ex) {
450489
e.addSuppressed(ex);
451490
}
@@ -454,7 +493,7 @@ public void onFailure(Exception e) {
454493

455494
@Override
456495
public String toString() {
457-
return super.toString() + "/" + runBefore;
496+
return super.toString() + "/" + runBeforeOnResponse + "/" + runBeforeOnException;
458497
}
459498
}
460499

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.support;
10+
11+
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.core.Releasable;
14+
15+
import java.util.Objects;
16+
import java.util.concurrent.Semaphore;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
/**
21+
* A mechanism to complete a listener on the completion of some (dynamic) collection of other actions. Basic usage is as follows:
22+
*
23+
* <pre>
24+
* try (var refs = new RefCountingListener(10, finalListener)) {
25+
* for (var item : collection) {
26+
* runAsyncAction(item, refs.acquire()); // completes the acquired listener on completion
27+
* }
28+
* }
29+
* </pre>
30+
*
31+
* The delegate listener is completed when execution leaves the try-with-resources block and every acquired reference is released. Unlike a
32+
* {@link GroupedActionListener} there is no need to declare the number of subsidiary listeners up front: listeners can be acquired
33+
* dynamically as needed. Moreover even outside the try-with-resources block you can continue to acquire additional listeners, even in a
34+
* separate thread, as long as there's at least one listener outstanding:
35+
*
36+
* <pre>
37+
* try (var refs = new RefCountingListener(10, finalListener)) {
38+
* for (var item : collection) {
39+
* if (condition(item)) {
40+
* runAsyncAction(item, refs.acquire());
41+
* }
42+
* }
43+
* if (flag) {
44+
* runOneOffAsyncAction(refs.acquire());
45+
* return;
46+
* }
47+
* for (var item : otherCollection) {
48+
* var itemRef = refs.acquire(); // delays completion while the background action is pending
49+
* executorService.execute(() -> {
50+
* try (var ignored = itemRef) {
51+
* if (condition(item)) {
52+
* runOtherAsyncAction(item, refs.acquire());
53+
* }
54+
* }
55+
* });
56+
* }
57+
* }
58+
* </pre>
59+
*
60+
* In particular (and also unlike a {@link GroupedActionListener}) this works even if you don't acquire any extra refs at all: in that case,
61+
* the delegate listener is completed at the end of the try-with-resources block.
62+
*/
63+
public final class RefCountingListener implements Releasable {
64+
65+
private final ActionListener<Void> delegate;
66+
private final RefCountingRunnable refs = new RefCountingRunnable(this::finish);
67+
68+
private final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
69+
private final Semaphore exceptionPermits;
70+
private final AtomicInteger droppedExceptionsRef = new AtomicInteger();
71+
72+
/**
73+
* Construct a {@link RefCountingListener} which completes {@code delegate} when all refs are released.
74+
* @param delegate The listener to complete when all refs are released. This listener must not throw any exception on completion. If all
75+
* the acquired listeners completed successfully then so is the delegate. If any of the acquired listeners completed
76+
* with failure then the delegate is completed with the first exception received, with other exceptions added to its
77+
* collection of suppressed exceptions.
78+
* @deprecated This imposes no limit on the number of exceptions accumulated, which could cause substantial memory use. Prefer to limit
79+
* the number of accumulated exceptions with {@link #RefCountingListener(int, ActionListener)} instead.
80+
*/
81+
@Deprecated
82+
public RefCountingListener(ActionListener<Void> delegate) {
83+
this(Integer.MAX_VALUE, delegate);
84+
}
85+
86+
/**
87+
* Construct a {@link RefCountingListener} which completes {@code delegate} when all refs are released.
88+
* @param delegate The listener to complete when all refs are released. This listener must not throw any exception on completion. If all
89+
* the acquired listeners completed successfully then so is the delegate. If any of the acquired listeners completed
90+
* with failure then the delegate is completed with the first exception received, with other exceptions added to its
91+
* collection of suppressed exceptions.
92+
* @param maxExceptions The maximum number of exceptions to accumulate on failure.
93+
*/
94+
public RefCountingListener(int maxExceptions, ActionListener<Void> delegate) {
95+
if (maxExceptions <= 0) {
96+
assert false : maxExceptions;
97+
throw new IllegalArgumentException("maxExceptions must be positive");
98+
}
99+
this.delegate = Objects.requireNonNull(delegate);
100+
this.exceptionPermits = new Semaphore(maxExceptions);
101+
}
102+
103+
@Override
104+
public void close() {
105+
refs.close();
106+
}
107+
108+
private void finish() {
109+
var exception = exceptionRef.get();
110+
if (exception == null) {
111+
delegate.onResponse(null);
112+
} else {
113+
final var droppedExceptions = droppedExceptionsRef.getAndSet(0);
114+
if (droppedExceptions > 0) {
115+
exception.addSuppressed(new ElasticsearchException(droppedExceptions + " further exceptions were dropped"));
116+
}
117+
delegate.onFailure(exception);
118+
}
119+
}
120+
121+
public ActionListener<Void> acquire() {
122+
return new ActionListener<>() {
123+
private final Releasable ref = refs.acquire();
124+
125+
@Override
126+
public void onResponse(Void unused) {
127+
ref.close();
128+
}
129+
130+
@Override
131+
public void onFailure(Exception e) {
132+
if (exceptionPermits.tryAcquire()) {
133+
final var firstException = exceptionRef.compareAndExchange(null, e);
134+
if (firstException != null && firstException != e) {
135+
firstException.addSuppressed(e);
136+
}
137+
} else {
138+
droppedExceptionsRef.incrementAndGet();
139+
}
140+
ref.close();
141+
}
142+
143+
@Override
144+
public String toString() {
145+
return "refCounted[" + delegate + "]";
146+
}
147+
};
148+
}
149+
150+
@Override
151+
public String toString() {
152+
return "refCounting[" + delegate + "]";
153+
}
154+
}

server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* The delegate action is completed when execution leaves the try-with-resources block and every acquired reference is released. Unlike a
3333
* {@link CountDown} there is no need to declare the number of subsidiary actions up front (refs can be acquired dynamically as needed) nor
3434
* does the caller need to check for completion each time a reference is released. Moreover even outside the try-with-resources block you
35-
* can continue to acquire additional listeners, even in a separate thread, as long as there's at least one listener outstanding:
35+
* can continue to acquire additional references, even in a separate thread, as long as there's at least one reference outstanding:
3636
*
3737
* <pre>
3838
* try (var refs = new RefCountingRunnable(finalRunnable)) {

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import org.elasticsearch.action.ActionRunnable;
2828
import org.elasticsearch.action.SingleResultDeduplicator;
2929
import org.elasticsearch.action.StepListener;
30-
import org.elasticsearch.action.support.CountDownActionListener;
3130
import org.elasticsearch.action.support.GroupedActionListener;
3231
import org.elasticsearch.action.support.ListenableActionFuture;
3332
import org.elasticsearch.action.support.PlainActionFuture;
33+
import org.elasticsearch.action.support.RefCountingListener;
3434
import org.elasticsearch.action.support.RefCountingRunnable;
3535
import org.elasticsearch.action.support.ThreadedActionListener;
3636
import org.elasticsearch.cluster.ClusterState;
@@ -1421,7 +1421,7 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
14211421
indexMetaIdentifiers = null;
14221422
}
14231423

1424-
final ActionListener<Void> allMetaListener = new CountDownActionListener(2 + indices.size(), ActionListener.wrap(v -> {
1424+
try (var allMetaListeners = new RefCountingListener(10, ActionListener.wrap(v -> {
14251425
final String slmPolicy = slmPolicy(snapshotInfo);
14261426
final SnapshotDetails snapshotDetails = new SnapshotDetails(
14271427
snapshotInfo.state(),
@@ -1444,52 +1444,53 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
14441444
}
14451445
}, onUpdateFailure)
14461446
);
1447-
}, onUpdateFailure));
1448-
1449-
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
1450-
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
1451-
// index or global metadata will be compatible with the segments written in this snapshot as well.
1452-
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
1453-
// that decrements the generation it points at
1454-
final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
1455-
// Write Global MetaData
1456-
executor.execute(
1457-
ActionRunnable.run(
1458-
allMetaListener,
1459-
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
1460-
)
1461-
);
1447+
}, onUpdateFailure))) {
1448+
1449+
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method
1450+
// will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of
1451+
// the index or global metadata will be compatible with the segments written in this snapshot as well.
1452+
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
1453+
// that decrements the generation it points at
1454+
final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
1455+
// Write Global MetaData
1456+
executor.execute(
1457+
ActionRunnable.run(
1458+
allMetaListeners.acquire(),
1459+
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
1460+
)
1461+
);
14621462

1463-
// write the index metadata for each index in the snapshot
1464-
for (IndexId index : indices) {
1465-
executor.execute(ActionRunnable.run(allMetaListener, () -> {
1466-
final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
1467-
if (writeIndexGens) {
1468-
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
1469-
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
1470-
if (metaUUID == null) {
1471-
// We don't yet have this version of the metadata so we write it
1472-
metaUUID = UUIDs.base64UUID();
1473-
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
1474-
indexMetaIdentifiers.put(identifiers, metaUUID);
1463+
// write the index metadata for each index in the snapshot
1464+
for (IndexId index : indices) {
1465+
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
1466+
final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
1467+
if (writeIndexGens) {
1468+
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
1469+
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
1470+
if (metaUUID == null) {
1471+
// We don't yet have this version of the metadata so we write it
1472+
metaUUID = UUIDs.base64UUID();
1473+
INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
1474+
indexMetaIdentifiers.put(identifiers, metaUUID);
1475+
}
1476+
indexMetas.put(index, identifiers);
1477+
} else {
1478+
INDEX_METADATA_FORMAT.write(
1479+
clusterMetadata.index(index.getName()),
1480+
indexContainer(index),
1481+
snapshotId.getUUID(),
1482+
compress
1483+
);
14751484
}
1476-
indexMetas.put(index, identifiers);
1477-
} else {
1478-
INDEX_METADATA_FORMAT.write(
1479-
clusterMetadata.index(index.getName()),
1480-
indexContainer(index),
1481-
snapshotId.getUUID(),
1482-
compress
1483-
);
1484-
}
1485-
}));
1485+
}));
1486+
}
1487+
executor.execute(
1488+
ActionRunnable.run(
1489+
allMetaListeners.acquire(),
1490+
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
1491+
)
1492+
);
14861493
}
1487-
executor.execute(
1488-
ActionRunnable.run(
1489-
allMetaListener,
1490-
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
1491-
)
1492-
);
14931494
}, onUpdateFailure);
14941495
}
14951496

0 commit comments

Comments
 (0)