Skip to content

Commit c71a30e

Browse files
Move more work that blocks on PeerFinder mutex to coordinating thread (#95368)
follow up to #95341 Moving a couple of things that eventually use the peer finder and involve cluster coordination work to the coordinating thread. This avoids blocking on its mutex on the scheduler and transport threads and causing needless contention by running work on the generic pool that can very well wait for its turn on the cluster coordinating pool. Also this takes some more load off of the transport threads by moving response serializations to the coordinating pool.
1 parent 6802f91 commit c71a30e

File tree

3 files changed

+9
-20
lines changed

3 files changed

+9
-20
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,7 +1111,8 @@ private <R extends ActionResponse, T> Scheduler.Cancellable sendTransportRequest
11111111
TransportRequestOptions.timeout(transportTimeout),
11121112
new ActionListenerResponseHandler<>(
11131113
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
1114-
transportActionType.getResponseReader()
1114+
transportActionType.getResponseReader(),
1115+
ThreadPool.Names.CLUSTER_COORDINATION
11151116
)
11161117
);
11171118
}
@@ -1165,7 +1166,7 @@ public void run() {
11651166
public String toString() {
11661167
return "delayed retrieval of coordination diagnostics info from " + masterEligibleNode;
11671168
}
1168-
}, remoteRequestInitialDelay, ThreadPool.Names.SAME);
1169+
}, remoteRequestInitialDelay, ThreadPool.Names.CLUSTER_COORDINATION);
11691170
}
11701171

11711172
void cancelPollingRemoteMasterStabilityDiagnostic() {

server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.settings.Setting;
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.transport.TransportAddress;
22-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2322
import org.elasticsearch.common.util.set.Sets;
2423
import org.elasticsearch.core.Nullable;
2524
import org.elasticsearch.core.Releasable;
@@ -107,7 +106,7 @@ public PeerFinder(
107106

108107
transportService.registerRequestHandler(
109108
REQUEST_PEERS_ACTION_NAME,
110-
Names.GENERIC,
109+
Names.CLUSTER_COORDINATION,
111110
false,
112111
false,
113112
PeersRequest::new,
@@ -287,20 +286,9 @@ private boolean handleWakeUp() {
287286
}
288287
});
289288

290-
transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
289+
transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.CLUSTER_COORDINATION, new Runnable() {
291290
@Override
292-
public boolean isForceExecution() {
293-
return true;
294-
}
295-
296-
@Override
297-
public void onFailure(Exception e) {
298-
assert false : e;
299-
logger.debug("unexpected exception in wakeup", e);
300-
}
301-
302-
@Override
303-
protected void doRun() {
291+
public void run() {
304292
synchronized (mutex) {
305293
if (handleWakeUp() == false) {
306294
return;
@@ -469,7 +457,7 @@ private void requestPeers() {
469457

470458
final List<DiscoveryNode> knownNodes = List.copyOf(getFoundPeersUnderLock());
471459

472-
final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<PeersResponse>() {
460+
final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<>() {
473461

474462
@Override
475463
public PeersResponse read(StreamInput in) throws IOException {
@@ -507,7 +495,7 @@ public void handleException(TransportException exp) {
507495

508496
@Override
509497
public String executor() {
510-
return Names.GENERIC;
498+
return Names.CLUSTER_COORDINATION;
511499
}
512500
};
513501
transportService.sendRequest(

server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
179179
}
180180

181181
if (resolveInProgress.compareAndSet(false, true)) {
182-
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
182+
transportService.getThreadPool().executor(ThreadPool.Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() {
183183
@Override
184184
public void onFailure(Exception e) {
185185
logger.debug("failure when resolving unicast hosts list", e);

0 commit comments

Comments
 (0)