From fc781a67991e97d0a127231c82b98c374cbe0e95 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 12 Aug 2025 09:43:40 +0100 Subject: [PATCH 1/3] Reduce usage of transport-layer timeouts Transport-layer timeouts are kinda trappy, particularly noting that they do not (reliably) cancel the remote task or perform other necessary cleanup. Really such behaviour should be the responsibility of the caller rather than the transport layer itself. This commit introduces an `ActionListener#addTimeout` utility to allow adding timeout wrappers to arbitrary listeners, and uses it to replace several transport-layer timeouts for requests that have no cancellation functionality anyway. Relates #123568 --- .../elasticsearch/action/ActionListener.java | 39 +++++++++ .../tasks/get/TransportGetTaskAction.java | 14 ++- .../SimulatePipelineTransportAction.java | 30 +++++-- .../support/replication/PostWriteRefresh.java | 14 ++- .../CoordinationDiagnosticsService.java | 10 ++- .../coordination/MasterHistoryService.java | 44 ++++++---- .../elasticsearch/discovery/PeerFinder.java | 79 ++++++++--------- .../action/TransportHealthNodeAction.java | 46 ++++++---- .../transport/TransportService.java | 62 ++++++------- .../action/ActionListenerTests.java | 87 +++++++++++++++++++ 10 files changed, 304 insertions(+), 121 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index b7e8de1f4fa36..5a936941255c5 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -10,19 +10,25 @@ package org.elasticsearch.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.LeakTracker; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -496,4 +502,37 @@ static ActionListener withRef(ActionListener list return releaseAfter(listener, ref::decRef); } + /** + * Wrap the {@code delegate} listener so that it completes with a {@link ElasticsearchTimeoutException} if it has not been completed + * by the time the {@code timeout} elapses, and will also invoke the {@code cleanupOnTimeout} callback in this case. Completing the + * returned listener before the timeout has elapsed will cancel the timeout. + */ + static ActionListener addTimeout( + @Nullable TimeValue timeout, + ThreadPool threadPool, + Executor executor, + ActionListener delegate, + Runnable cleanupOnTimeout + ) { + if (timeout == null) { + return delegate; + } else { + var result = new SubscribableListener(); + result.addListener(delegate); + result.addListener(new ActionListener() { + @Override + public void onResponse(T t) {} + + @Override + public void onFailure(Exception e) { + if (e instanceof ElasticsearchTimeoutException) { + cleanupOnTimeout.run(); + } + } + }); + result.addTimeout(timeout, threadPool, executor); + return result; + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index ce55b787c7b0c..58e94faac2744 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -129,8 +129,18 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request, node, TYPE.name(), nodeRequest, - TransportRequestOptions.timeout(request.getTimeout()), - new ActionListenerResponseHandler<>(listener, GetTaskResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + request.getTimeout(), + threadPool, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + listener, + () -> { /* TODO cancel the remote tasks? */} + ), + GetTaskResponse::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ) ); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index b9586571fa9c9..34e57054b2c49 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -133,21 +133,35 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe } else { DiscoveryNode ingestNode = getRandomIngestNode(ingestNodes.values()); logger.trace("forwarding request [{}] to ingest node [{}]", actionName, ingestNode); - ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>( - listener, - SimulatePipelineResponse::new, - TransportResponseHandler.TRANSPORT_WORKER - ); if (task == null) { - transportService.sendRequest(ingestNode, actionName, request, handler); + transportService.sendRequest( + ingestNode, + actionName, + request, + new ActionListenerResponseHandler<>( + listener, + SimulatePipelineResponse::new, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); } else { transportService.sendChildRequest( ingestNode, actionName, request, task, - TransportRequestOptions.timeout(ingestNodeTransportActionTimeout), - handler + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + ingestNodeTransportActionTimeout, + transportService.getThreadPool(), + TransportResponseHandler.TRANSPORT_WORKER, + listener, + () -> { /* TODO cancel the remote task? */} + ), + SimulatePipelineResponse::new, + TransportResponseHandler.TRANSPORT_WORKER + ) ); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java index cefb27376f9ea..ac92a0264b9a1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java @@ -143,8 +143,18 @@ private void sendUnpromotableRequests( transportService.getLocalNode(), TransportUnpromotableShardRefreshAction.NAME, unpromotableReplicaRequest, - TransportRequestOptions.timeout(postWriteRefreshTimeout), - new ActionListenerResponseHandler<>(listener.safeMap(r -> wasForced), in -> ActionResponse.Empty.INSTANCE, refreshExecutor) + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + postWriteRefreshTimeout, + transportService.getThreadPool(), + refreshExecutor, + listener.safeMap(r -> wasForced), + () -> { /* TODO cancel the remote task? */} + ), + in -> ActionResponse.Empty.INSTANCE, + refreshExecutor + ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java index 84f9c42d27ece..32cb506504826 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java @@ -1112,9 +1112,15 @@ private Scheduler.Cancellable sendTransportRequest masterEligibleNode, actionName, transportActionRequest, - TransportRequestOptions.timeout(transportTimeout), + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)), + ActionListener.addTimeout( + transportTimeout, + transportService.getThreadPool(), + clusterCoordinationExecutor, + ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)), + () -> { /* TODO cancel the remote task? */} + ), responseReader, clusterCoordinationExecutor ) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java index bf450a5f76de8..73eb23a7ff571 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java @@ -137,25 +137,31 @@ public void onResponse(Releasable releasable) { node, MasterHistoryAction.NAME, new MasterHistoryAction.Request(), - TransportRequestOptions.timeout(remoteMasterHistoryTimeout), - new ActionListenerResponseHandler<>(ActionListener.runBefore(new ActionListener<>() { - - @Override - public void onResponse(MasterHistoryAction.Response response) { - long endTime = System.nanoTime(); - logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); - remoteHistoryOrException = new RemoteHistoryOrException( - response.getMasterHistory(), - currentTimeMillisSupplier.getAsLong() - ); - } - - @Override - public void onFailure(Exception e) { - logger.warn("Exception in master history request to master node", e); - remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong()); - } - }, () -> Releasables.close(releasable)), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + remoteMasterHistoryTimeout, + transportService.getThreadPool(), + TransportResponseHandler.TRANSPORT_WORKER, + ActionListener.runBefore(new ActionListener<>() { + @Override + public void onResponse(MasterHistoryAction.Response response) { + long endTime = System.nanoTime(); + logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); + remoteHistoryOrException = new RemoteHistoryOrException( + response.getMasterHistory(), + currentTimeMillisSupplier.getAsLong() + ); + } + + @Override + public void onFailure(Exception e) { + logger.warn("Exception in master history request to master node", e); + remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong()); + } + }, () -> { /* TODO cancel the remote task? */}), + () -> Releasables.close(releasable) + ), MasterHistoryAction.Response::new, TransportResponseHandler.TRANSPORT_WORKER ) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index d73e873853a2d..f5d4665fecf9a 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -13,13 +13,13 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper; import org.elasticsearch.cluster.coordination.PeersResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.ReferenceDocs; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -29,12 +29,10 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool.Names; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -524,54 +522,53 @@ private void requestPeers() { final List knownNodes = List.copyOf(getFoundPeersUnderLock()); - final TransportResponseHandler peersResponseHandler = new TransportResponseHandler<>() { + final TransportResponseHandler peersResponseHandler = new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + requestPeersTimeout, + transportService.getThreadPool(), + clusterCoordinationExecutor, + new ActionListener<>() { + @Override + public void onResponse(PeersResponse response) { + logger.trace("{} received {}", Peer.this, response); + synchronized (mutex) { + peersRequestInFlight = false; - @Override - public PeersResponse read(StreamInput in) throws IOException { - return new PeersResponse(in); - } + if (isActive() == false) { + logger.trace("Peer#requestPeers inactive: {}", Peer.this); + return; + } - @Override - public void handleResponse(PeersResponse response) { - logger.trace("{} received {}", Peer.this, response); - synchronized (mutex) { - peersRequestInFlight = false; + lastKnownMasterNode = response.getMasterNode(); + response.getMasterNode().ifPresent(node -> startProbe(node.getAddress())); + for (DiscoveryNode node : response.getKnownPeers()) { + startProbe(node.getAddress()); + } + } - if (isActive() == false) { - logger.trace("Peer#requestPeers inactive: {}", Peer.this); - return; + if (response.getMasterNode().equals(Optional.of(discoveryNode))) { + // Must not hold lock here to avoid deadlock + assert holdsLock() == false : "PeerFinder mutex is held in error"; + onActiveMasterFound(discoveryNode, response.getTerm()); + } } - lastKnownMasterNode = response.getMasterNode(); - response.getMasterNode().ifPresent(node -> startProbe(node.getAddress())); - for (DiscoveryNode node : response.getKnownPeers()) { - startProbe(node.getAddress()); + @Override + public void onFailure(Exception e) { + peersRequestInFlight = false; + logger.warn(() -> format("%s peers request failed", Peer.this), e); } - } - - if (response.getMasterNode().equals(Optional.of(discoveryNode))) { - // Must not hold lock here to avoid deadlock - assert holdsLock() == false : "PeerFinder mutex is held in error"; - onActiveMasterFound(discoveryNode, response.getTerm()); - } - } - - @Override - public void handleException(TransportException exp) { - peersRequestInFlight = false; - logger.warn(() -> format("%s peers request failed", Peer.this), exp); - } - - @Override - public Executor executor() { - return clusterCoordinationExecutor; - } - }; + }, + () -> { /* TODO cancel the remote task? */} + ), + PeersResponse::new, + clusterCoordinationExecutor + ); transportService.sendRequest( discoveryNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(getLocalNode(), knownNodes), - TransportRequestOptions.timeout(requestPeersTimeout), + TransportRequestOptions.EMPTY, peersResponseHandler ); } diff --git a/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java b/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java index b4784e3b8a22a..36d40d1c282f7 100644 --- a/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java +++ b/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -120,31 +119,42 @@ protected void doExecute(Task task, final Request request, ActionListener handler = new ActionListenerResponseHandler<>( - listener, - responseReader, - TransportResponseHandler.TRANSPORT_WORKER - ) { - @Override - public void handleException(final TransportException exception) { - logger.trace( - () -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode), - exception - ); - listener.onFailure(exception); - } - }; + final ActionListener listenerWithExceptionLogging = logger.isTraceEnabled() + ? listener.delegateResponse((delegate, e) -> { + logger.trace(() -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode), e); + delegate.onFailure(e); + }) + : listener; if (task != null) { transportService.sendChildRequest( healthNode, actionName, request, task, - TransportRequestOptions.timeout(healthNodeTransportActionTimeout), - handler + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + healthNodeTransportActionTimeout, + threadPool, + TransportResponseHandler.TRANSPORT_WORKER, + listenerWithExceptionLogging, + () -> { /* TODO cancel the remote task? */} + ), + responseReader, + TransportResponseHandler.TRANSPORT_WORKER + ) ); } else { - transportService.sendRequest(healthNode, actionName, request, handler); + transportService.sendRequest( + healthNode, + actionName, + request, + new ActionListenerResponseHandler<>( + listenerWithExceptionLogging, + responseReader, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); } } } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index e44291eacbc06..be30100121a9c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -585,35 +585,39 @@ public void handshake( connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, - TransportRequestOptions.timeout(handshakeTimeout), - new ActionListenerResponseHandler<>(listener.delegateFailure((l, response) -> { - if (clusterNamePredicate.test(response.clusterName) == false) { - l.onFailure( - new IllegalStateException( - "handshake with [" - + node - + "] failed: remote cluster name [" - + response.clusterName.value() - + "] does not match " - + clusterNamePredicate - ) - ); - } else if (response.version.isCompatible(localNode.getVersion()) == false) { - l.onFailure( - new IllegalStateException( - "handshake with [" - + node - + "] failed: remote node version [" - + response.version - + "] is incompatible with local node version [" - + localNode.getVersion() - + "]" - ) - ); - } else { - l.onResponse(response); - } - }), HandshakeResponse::new, threadPool.generic()) + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout(handshakeTimeout, threadPool, threadPool.generic(), listener.delegateFailure((l, response) -> { + if (clusterNamePredicate.test(response.clusterName) == false) { + l.onFailure( + new IllegalStateException( + "handshake with [" + + node + + "] failed: remote cluster name [" + + response.clusterName.value() + + "] does not match " + + clusterNamePredicate + ) + ); + } else if (response.version.isCompatible(localNode.getVersion()) == false) { + l.onFailure( + new IllegalStateException( + "handshake with [" + + node + + "] failed: remote node version [" + + response.version + + "] is incompatible with local node version [" + + localNode.getVersion() + + "]" + ) + ); + } else { + l.onResponse(response); + } + }), () -> {/* cannot cancel handshake, no cleanup to do */}), + HandshakeResponse::new, + threadPool.generic() + ) ); } diff --git a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java index 50b8c860109d6..d18ba54087509 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java @@ -10,13 +10,16 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ReachabilityChecker; import org.hamcrest.Matcher; @@ -35,6 +38,7 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; public class ActionListenerTests extends ESTestCase { @@ -707,4 +711,87 @@ public String toString() { public static Matcher isMappedActionListener() { return instanceOf(ActionListenerImplementations.MappedActionListener.class); } + + public void testAddTimeout() { + final var deterministicTaskQueue = new DeterministicTaskQueue(); + final var threadpool = deterministicTaskQueue.getThreadPool(); + final var expectException = randomBoolean(); + final var expectedOutcome = new Exception("simulated"); + final var timeout = TimeValue.timeValueMillis(randomFrom(0, 1, between(0, 100_000))); + + final var listenerDoesNotTimeOutComplete = new AtomicBoolean(); + final var listenerDoesNotTimeOut = ActionListener.addTimeout( + timeout.millis() == 0 ? null : timeout, + threadpool, + deterministicTaskQueue::scheduleNow, + new ActionListener() { + @Override + public void onResponse(Exception e) { + assertFalse(expectException); + onComplete(e); + } + + @Override + public void onFailure(Exception e) { + assertTrue(expectException); + onComplete(e); + } + + private void onComplete(Object result) { + assertSame(result, expectedOutcome); + assertTrue(listenerDoesNotTimeOutComplete.compareAndSet(false, true)); + if (timeout.millis() != 0) { + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), lessThan(timeout.millis())); + } + } + }, + () -> fail("should not clean up") + ); + final Runnable completer = expectException + ? () -> listenerDoesNotTimeOut.onFailure(expectedOutcome) + : () -> listenerDoesNotTimeOut.onResponse(expectedOutcome); + if (timeout.millis() == 0) { + // special case null timeout + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + between(0, 100_000), completer); + } else { + deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0L, timeout.millis() - 1), + completer + ); + } + + final var listenerTimesOutComplete = new AtomicBoolean(); + final var listenerTimesOutCleansUp = new AtomicBoolean(); + final var listenerTimesOut = ActionListener.addTimeout( + timeout, + threadpool, + deterministicTaskQueue::scheduleNow, + new ActionListener<>() { + @Override + public void onResponse(Object o) { + fail("should not complete successfully"); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, instanceOf(ElasticsearchTimeoutException.class)); + assertTrue(listenerTimesOutComplete.compareAndSet(false, true)); + assertEquals(timeout.millis(), deterministicTaskQueue.getCurrentTimeMillis()); + } + }, + () -> assertTrue(listenerTimesOutCleansUp.compareAndSet(false, true)) + ); + + if (randomBoolean()) { + deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.getCurrentTimeMillis() + timeout.millis() + between(1, 200_000), + () -> listenerTimesOut.onResponse(new Object()) + ); + } + + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertTrue(listenerDoesNotTimeOutComplete.get()); + assertTrue(listenerTimesOutComplete.get()); + assertTrue(listenerTimesOutCleansUp.get()); + } } From 64679089b2603210e97fee2c0fdd14ec8c60177e Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 12 Aug 2025 11:32:49 +0100 Subject: [PATCH 2/3] Oops --- .../cluster/coordination/MasterHistoryService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java index 73eb23a7ff571..6d6a2cc88d1ae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java @@ -159,8 +159,8 @@ public void onFailure(Exception e) { logger.warn("Exception in master history request to master node", e); remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong()); } - }, () -> { /* TODO cancel the remote task? */}), - () -> Releasables.close(releasable) + }, () -> Releasables.close(releasable)), + () -> { /* TODO cancel the remote task? */} ), MasterHistoryAction.Response::new, TransportResponseHandler.TRANSPORT_WORKER From aaef3be0b9a2d78c4b7a1fae20e8f9e9d28357ee Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 30 Oct 2025 09:15:23 +0000 Subject: [PATCH 3/3] PeerFinder has no timeout any more --- .../elasticsearch/discovery/PeerFinder.java | 77 ++++++++++--------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 4ebd215020c0d..57921795a0edb 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -13,13 +13,13 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper; import org.elasticsearch.cluster.coordination.PeersResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -29,10 +29,12 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -521,48 +523,49 @@ private void requestPeers() { final List knownNodes = List.copyOf(getFoundPeersUnderLock()); - final TransportResponseHandler peersResponseHandler = new ActionListenerResponseHandler<>( - ActionListener.addTimeout( - requestPeersTimeout, - transportService.getThreadPool(), - clusterCoordinationExecutor, - new ActionListener<>() { - @Override - public void onResponse(PeersResponse response) { - logger.trace("{} received {}", Peer.this, response); - synchronized (mutex) { - peersRequestInFlight = false; + final TransportResponseHandler peersResponseHandler = new TransportResponseHandler<>() { - if (isActive() == false) { - logger.trace("Peer#requestPeers inactive: {}", Peer.this); - return; - } + @Override + public PeersResponse read(StreamInput in) throws IOException { + return new PeersResponse(in); + } - lastKnownMasterNode = response.getMasterNode(); - response.getMasterNode().ifPresent(node -> startProbe(node.getAddress())); - for (DiscoveryNode node : response.getKnownPeers()) { - startProbe(node.getAddress()); - } - } + @Override + public void handleResponse(PeersResponse response) { + logger.trace("{} received {}", Peer.this, response); + synchronized (mutex) { + peersRequestInFlight = false; - if (response.getMasterNode().equals(Optional.of(discoveryNode))) { - // Must not hold lock here to avoid deadlock - assert holdsLock() == false : "PeerFinder mutex is held in error"; - onActiveMasterFound(discoveryNode, response.getTerm()); - } + if (isActive() == false) { + logger.trace("Peer#requestPeers inactive: {}", Peer.this); + return; } - @Override - public void onFailure(Exception e) { - peersRequestInFlight = false; - logger.warn(() -> format("%s peers request failed", Peer.this), e); + lastKnownMasterNode = response.getMasterNode(); + response.getMasterNode().ifPresent(node -> startProbe(node.getAddress())); + for (DiscoveryNode node : response.getKnownPeers()) { + startProbe(node.getAddress()); } - }, - () -> { /* TODO cancel the remote task? */} - ), - PeersResponse::new, - clusterCoordinationExecutor - ); + } + + if (response.getMasterNode().equals(Optional.of(discoveryNode))) { + // Must not hold lock here to avoid deadlock + assert holdsLock() == false : "PeerFinder mutex is held in error"; + onActiveMasterFound(discoveryNode, response.getTerm()); + } + } + + @Override + public void handleException(TransportException exp) { + peersRequestInFlight = false; + logger.warn(() -> format("%s peers request failed", Peer.this), exp); + } + + @Override + public Executor executor() { + return clusterCoordinationExecutor; + } + }; transportService.sendRequest( discoveryNode, REQUEST_PEERS_ACTION_NAME,