Skip to content

Commit 416bfcb

Browse files
authored
Simplify ThreadedActionListener construction (#93184)
- There's no real need for the caller's `Logger` just in case an impossible situation happens - we will get enough info from the log message. - Almost nobody uses the `forceExecution` feature, default it to `false`. - Accept an `ExecutorService` rather than requiring the whole `ThreadPool` only to look up the executor later on.
1 parent 3027d1b commit 416bfcb

File tree

22 files changed

+81
-99
lines changed

22 files changed

+81
-99
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,9 @@ protected void masterOperation(
142142
TransportNodesSnapshotsStatus.TYPE,
143143
new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots)
144144
.timeout(request.masterNodeTimeout()),
145+
// fork to snapshot meta since building the response is expensive for large snapshots
145146
new ThreadedActionListener<>(
146-
logger,
147-
threadPool,
148-
ThreadPool.Names.SNAPSHOT_META, // fork to snapshot meta since building the response is expensive for large snapshots
147+
threadPool.executor(ThreadPool.Names.SNAPSHOT_META),
149148
ActionListener.wrap(
150149
nodeSnapshotStatuses -> buildResponse(
151150
snapshotsInProgress,
@@ -156,8 +155,7 @@ protected void masterOperation(
156155
listener
157156
),
158157
listener::onFailure
159-
),
160-
false
158+
)
161159
)
162160
);
163161
} else {

server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,37 @@
88

99
package org.elasticsearch.action.support;
1010

11+
import org.apache.logging.log4j.LogManager;
1112
import org.apache.logging.log4j.Logger;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionRunnable;
1415
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
15-
import org.elasticsearch.threadpool.ThreadPool;
16+
17+
import java.util.concurrent.ExecutorService;
1618

1719
/**
18-
* An action listener that wraps another action listener and threading its execution.
20+
* An action listener that wraps another action listener and dispatches its completion to an executor.
1921
*/
2022
public final class ThreadedActionListener<Response> extends ActionListener.Delegating<Response, Response> {
2123

22-
private final Logger logger;
23-
private final ThreadPool threadPool;
24-
private final String executor;
24+
private static final Logger logger = LogManager.getLogger(ThreadedActionListener.class);
25+
26+
private final ExecutorService executor;
2527
private final boolean forceExecution;
2628

27-
public ThreadedActionListener(
28-
Logger logger,
29-
ThreadPool threadPool,
30-
String executor,
31-
ActionListener<Response> listener,
32-
boolean forceExecution
33-
) {
29+
public ThreadedActionListener(ExecutorService executor, ActionListener<Response> listener) {
30+
this(executor, false, listener);
31+
}
32+
33+
public ThreadedActionListener(ExecutorService executor, boolean forceExecution, ActionListener<Response> listener) {
3434
super(listener);
35-
this.logger = logger;
36-
this.threadPool = threadPool;
37-
this.executor = executor;
3835
this.forceExecution = forceExecution;
36+
this.executor = executor;
3937
}
4038

4139
@Override
4240
public void onResponse(final Response response) {
43-
threadPool.executor(executor).execute(new ActionRunnable<>(delegate) {
41+
executor.execute(new ActionRunnable<>(delegate) {
4442
@Override
4543
public boolean isForceExecution() {
4644
return forceExecution;
@@ -60,7 +58,7 @@ public String toString() {
6058

6159
@Override
6260
public void onFailure(final Exception e) {
63-
threadPool.executor(executor).execute(new AbstractRunnable() {
61+
executor.execute(new AbstractRunnable() {
6462
@Override
6563
public boolean isForceExecution() {
6664
return forceExecution;
@@ -84,8 +82,8 @@ public void onRejection(Exception e2) {
8482

8583
@Override
8684
public void onFailure(Exception e) {
85+
logger.error(() -> "failed to execute failure callback on [" + ThreadedActionListener.this + "]", e);
8786
assert false : e;
88-
logger.error(() -> "failed to execute failure callback on [" + delegate + "]", e);
8987
}
9088

9189
@Override

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,7 @@ private void fetchIndicesStats() {
200200
.stats(
201201
indicesStatsRequest,
202202
new ThreadedActionListener<>(
203-
logger,
204-
threadPool,
205-
ThreadPool.Names.MANAGEMENT,
203+
threadPool.executor(ThreadPool.Names.MANAGEMENT),
206204
ActionListener.releaseAfter(new ActionListener<>() {
207205
@Override
208206
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
@@ -277,8 +275,7 @@ public void onFailure(Exception e) {
277275
}
278276
indicesStatsSummary = IndicesStatsSummary.EMPTY;
279277
}
280-
}, fetchRefs.acquire()),
281-
false
278+
}, fetchRefs.acquire())
282279
)
283280
);
284281
}

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -543,11 +543,8 @@ private void restore(
543543
// BwC path, running against an old version master that did not add the IndexId to the recovery source
544544
repository.getRepositoryData(
545545
new ThreadedActionListener<>(
546-
logger,
547-
indexShard.getThreadPool(),
548-
ThreadPool.Names.GENERIC,
549-
indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName())),
550-
false
546+
indexShard.getThreadPool().generic(),
547+
indexIdListener.map(repositoryData -> repositoryData.resolveIndexId(indexId.getName()))
551548
)
552549
);
553550
} else {

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,7 @@ && isTargetSameHistory()
286286
// new one later on in the recovery.
287287
shard.removePeerRecoveryRetentionLease(
288288
request.targetNode().getId(),
289-
new ThreadedActionListener<>(
290-
logger,
291-
shard.getThreadPool(),
292-
ThreadPool.Names.GENERIC,
293-
deleteRetentionLeaseStep,
294-
false
295-
)
289+
new ThreadedActionListener<>(shard.getThreadPool().generic(), deleteRetentionLeaseStep)
296290
);
297291
} catch (RetentionLeaseNotFoundException e) {
298292
logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());
@@ -981,7 +975,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
981975
final StepListener<ReplicationResponse> cloneRetentionLeaseStep = new StepListener<>();
982976
final RetentionLease clonedLease = shard.cloneLocalPeerRecoveryRetentionLease(
983977
request.targetNode().getId(),
984-
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, cloneRetentionLeaseStep, false)
978+
new ThreadedActionListener<>(shard.getThreadPool().generic(), cloneRetentionLeaseStep)
985979
);
986980
logger.trace("cloned primary's retention lease as [{}]", clonedLease);
987981
cloneRetentionLeaseStep.addListener(listener.map(rr -> clonedLease));
@@ -996,7 +990,7 @@ void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLeas
996990
final RetentionLease newLease = shard.addPeerRecoveryRetentionLease(
997991
request.targetNode().getId(),
998992
estimatedGlobalCheckpoint,
999-
new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false)
993+
new ThreadedActionListener<>(shard.getThreadPool().generic(), addRetentionLeaseStep)
1000994
);
1001995
addRetentionLeaseStep.addListener(listener.map(rr -> newLease));
1002996
logger.trace("created retention lease with estimated checkpoint of [{}]", estimatedGlobalCheckpoint);

server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener<Optiona
104104
client.execute(
105105
GetShardSnapshotAction.INSTANCE,
106106
request,
107-
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(this::fetchSnapshotFiles), false)
107+
new ThreadedActionListener<>(threadPool.generic(), listener.map(this::fetchSnapshotFiles))
108108
);
109109
}
110110

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1982,7 +1982,7 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
19821982
clusterService,
19831983
metadata.name(),
19841984
loaded,
1985-
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(v -> loaded), false)
1985+
new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded))
19861986
);
19871987
}
19881988
}

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ private void initiateConnection(DiscoveryNode node, ConnectionProfile connection
423423
node,
424424
connectionProfile,
425425
channels,
426-
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener, false)
426+
new ThreadedActionListener<>(threadPool.generic(), listener)
427427
);
428428

429429
for (TcpChannel channel : channels) {

server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1415
import org.elasticsearch.core.TimeValue;
1516
import org.elasticsearch.test.ESTestCase;
1617
import org.elasticsearch.threadpool.FixedExecutorBuilder;
@@ -47,11 +48,9 @@ public void testRejectionHandling() throws InterruptedException {
4748
for (int i = 0; i < listenerCount; i++) {
4849
final var pool = randomFrom(pools);
4950
final var listener = new ThreadedActionListener<Void>(
50-
logger,
51-
threadPool,
52-
pool,
53-
ActionListener.wrap(countdownLatch::countDown),
54-
(pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely()
51+
threadPool.executor(pool),
52+
(pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely(),
53+
ActionListener.wrap(countdownLatch::countDown)
5554
);
5655
synchronized (closeFlag) {
5756
if (closeFlag.get() && shutdownUnsafePools.contains(pool)) {
@@ -76,4 +75,30 @@ public void testRejectionHandling() throws InterruptedException {
7675
assertTrue(countdownLatch.await(10, TimeUnit.SECONDS));
7776
}
7877

78+
public void testToString() {
79+
var deterministicTaskQueue = new DeterministicTaskQueue();
80+
81+
assertEquals(
82+
"ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]",
83+
new ThreadedActionListener<Void>(deterministicTaskQueue.getThreadPool().generic(), randomBoolean(), ActionListener.noop())
84+
.toString()
85+
);
86+
87+
assertEquals(
88+
"ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]/onResponse",
89+
PlainActionFuture.get(future -> new ThreadedActionListener<Void>(deterministicTaskQueue.getThreadPool(s -> {
90+
future.onResponse(s.toString());
91+
return s;
92+
}).generic(), randomBoolean(), ActionListener.noop()).onResponse(null))
93+
);
94+
95+
assertEquals(
96+
"ThreadedActionListener[DeterministicTaskQueue/forkingExecutor/NoopActionListener]/onFailure",
97+
PlainActionFuture.get(future -> new ThreadedActionListener<Void>(deterministicTaskQueue.getThreadPool(s -> {
98+
future.onResponse(s.toString());
99+
return s;
100+
}).generic(), randomBoolean(), ActionListener.noop()).onFailure(new ElasticsearchException("test")))
101+
);
102+
}
103+
79104
}

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.action.support.ActionTestUtils;
2020
import org.elasticsearch.action.support.IndicesOptions;
2121
import org.elasticsearch.action.support.PlainActionFuture;
22-
import org.elasticsearch.action.support.ThreadedActionListener;
2322
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
2423
import org.elasticsearch.cluster.ClusterName;
2524
import org.elasticsearch.cluster.ClusterState;
@@ -243,12 +242,6 @@ class Action extends TransportMasterNodeAction<Request, Response> {
243242
);
244243
}
245244

246-
@Override
247-
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
248-
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
249-
super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false));
250-
}
251-
252245
@Override
253246
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
254247
listener.onResponse(new Response()); // default implementation, overridden in specific tests

0 commit comments

Comments
 (0)