diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index b7e8de1f4fa36..5a936941255c5 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -10,19 +10,25 @@ package org.elasticsearch.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.LeakTracker; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -496,4 +502,37 @@ static ActionListener withRef(ActionListener list return releaseAfter(listener, ref::decRef); } + /** + * Wrap the {@code delegate} listener so that it completes with a {@link ElasticsearchTimeoutException} if it has not been completed + * by the time the {@code timeout} elapses, and will also invoke the {@code cleanupOnTimeout} callback in this case. Completing the + * returned listener before the timeout has elapsed will cancel the timeout. + */ + static ActionListener addTimeout( + @Nullable TimeValue timeout, + ThreadPool threadPool, + Executor executor, + ActionListener delegate, + Runnable cleanupOnTimeout + ) { + if (timeout == null) { + return delegate; + } else { + var result = new SubscribableListener(); + result.addListener(delegate); + result.addListener(new ActionListener() { + @Override + public void onResponse(T t) {} + + @Override + public void onFailure(Exception e) { + if (e instanceof ElasticsearchTimeoutException) { + cleanupOnTimeout.run(); + } + } + }); + result.addTimeout(timeout, threadPool, executor); + return result; + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index ce55b787c7b0c..58e94faac2744 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -129,8 +129,18 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request, node, TYPE.name(), nodeRequest, - TransportRequestOptions.timeout(request.getTimeout()), - new ActionListenerResponseHandler<>(listener, GetTaskResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + request.getTimeout(), + threadPool, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + listener, + () -> { /* TODO cancel the remote tasks? */} + ), + GetTaskResponse::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ) ); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index b9586571fa9c9..34e57054b2c49 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -133,21 +133,35 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe } else { DiscoveryNode ingestNode = getRandomIngestNode(ingestNodes.values()); logger.trace("forwarding request [{}] to ingest node [{}]", actionName, ingestNode); - ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>( - listener, - SimulatePipelineResponse::new, - TransportResponseHandler.TRANSPORT_WORKER - ); if (task == null) { - transportService.sendRequest(ingestNode, actionName, request, handler); + transportService.sendRequest( + ingestNode, + actionName, + request, + new ActionListenerResponseHandler<>( + listener, + SimulatePipelineResponse::new, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); } else { transportService.sendChildRequest( ingestNode, actionName, request, task, - TransportRequestOptions.timeout(ingestNodeTransportActionTimeout), - handler + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + ingestNodeTransportActionTimeout, + transportService.getThreadPool(), + TransportResponseHandler.TRANSPORT_WORKER, + listener, + () -> { /* TODO cancel the remote task? */} + ), + SimulatePipelineResponse::new, + TransportResponseHandler.TRANSPORT_WORKER + ) ); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java index cefb27376f9ea..ac92a0264b9a1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java @@ -143,8 +143,18 @@ private void sendUnpromotableRequests( transportService.getLocalNode(), TransportUnpromotableShardRefreshAction.NAME, unpromotableReplicaRequest, - TransportRequestOptions.timeout(postWriteRefreshTimeout), - new ActionListenerResponseHandler<>(listener.safeMap(r -> wasForced), in -> ActionResponse.Empty.INSTANCE, refreshExecutor) + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + postWriteRefreshTimeout, + transportService.getThreadPool(), + refreshExecutor, + listener.safeMap(r -> wasForced), + () -> { /* TODO cancel the remote task? */} + ), + in -> ActionResponse.Empty.INSTANCE, + refreshExecutor + ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java index 84f9c42d27ece..32cb506504826 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java @@ -1112,9 +1112,15 @@ private Scheduler.Cancellable sendTransportRequest masterEligibleNode, actionName, transportActionRequest, - TransportRequestOptions.timeout(transportTimeout), + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)), + ActionListener.addTimeout( + transportTimeout, + transportService.getThreadPool(), + clusterCoordinationExecutor, + ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)), + () -> { /* TODO cancel the remote task? */} + ), responseReader, clusterCoordinationExecutor ) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java index bf450a5f76de8..6d6a2cc88d1ae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/MasterHistoryService.java @@ -137,25 +137,31 @@ public void onResponse(Releasable releasable) { node, MasterHistoryAction.NAME, new MasterHistoryAction.Request(), - TransportRequestOptions.timeout(remoteMasterHistoryTimeout), - new ActionListenerResponseHandler<>(ActionListener.runBefore(new ActionListener<>() { - - @Override - public void onResponse(MasterHistoryAction.Response response) { - long endTime = System.nanoTime(); - logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); - remoteHistoryOrException = new RemoteHistoryOrException( - response.getMasterHistory(), - currentTimeMillisSupplier.getAsLong() - ); - } - - @Override - public void onFailure(Exception e) { - logger.warn("Exception in master history request to master node", e); - remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong()); - } - }, () -> Releasables.close(releasable)), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + remoteMasterHistoryTimeout, + transportService.getThreadPool(), + TransportResponseHandler.TRANSPORT_WORKER, + ActionListener.runBefore(new ActionListener<>() { + @Override + public void onResponse(MasterHistoryAction.Response response) { + long endTime = System.nanoTime(); + logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); + remoteHistoryOrException = new RemoteHistoryOrException( + response.getMasterHistory(), + currentTimeMillisSupplier.getAsLong() + ); + } + + @Override + public void onFailure(Exception e) { + logger.warn("Exception in master history request to master node", e); + remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong()); + } + }, () -> Releasables.close(releasable)), + () -> { /* TODO cancel the remote task? */} + ), MasterHistoryAction.Response::new, TransportResponseHandler.TRANSPORT_WORKER ) diff --git a/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java b/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java index b4784e3b8a22a..36d40d1c282f7 100644 --- a/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java +++ b/server/src/main/java/org/elasticsearch/health/node/action/TransportHealthNodeAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -120,31 +119,42 @@ protected void doExecute(Task task, final Request request, ActionListener handler = new ActionListenerResponseHandler<>( - listener, - responseReader, - TransportResponseHandler.TRANSPORT_WORKER - ) { - @Override - public void handleException(final TransportException exception) { - logger.trace( - () -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode), - exception - ); - listener.onFailure(exception); - } - }; + final ActionListener listenerWithExceptionLogging = logger.isTraceEnabled() + ? listener.delegateResponse((delegate, e) -> { + logger.trace(() -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode), e); + delegate.onFailure(e); + }) + : listener; if (task != null) { transportService.sendChildRequest( healthNode, actionName, request, task, - TransportRequestOptions.timeout(healthNodeTransportActionTimeout), - handler + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout( + healthNodeTransportActionTimeout, + threadPool, + TransportResponseHandler.TRANSPORT_WORKER, + listenerWithExceptionLogging, + () -> { /* TODO cancel the remote task? */} + ), + responseReader, + TransportResponseHandler.TRANSPORT_WORKER + ) ); } else { - transportService.sendRequest(healthNode, actionName, request, handler); + transportService.sendRequest( + healthNode, + actionName, + request, + new ActionListenerResponseHandler<>( + listenerWithExceptionLogging, + responseReader, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); } } } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 785a56a41ddbe..9656bdc38b440 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -615,35 +615,39 @@ public void handshake( connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, - TransportRequestOptions.timeout(handshakeTimeout), - new ActionListenerResponseHandler<>(listener.delegateFailure((l, response) -> { - if (clusterNamePredicate.test(response.clusterName) == false) { - l.onFailure( - new IllegalStateException( - "handshake with [" - + node - + "] failed: remote cluster name [" - + response.clusterName.value() - + "] does not match " - + clusterNamePredicate - ) - ); - } else if (response.version.isCompatible(localNode.getVersion()) == false) { - l.onFailure( - new IllegalStateException( - "handshake with [" - + node - + "] failed: remote node version [" - + response.version - + "] is incompatible with local node version [" - + localNode.getVersion() - + "]" - ) - ); - } else { - l.onResponse(response); - } - }), HandshakeResponse::new, threadPool.generic()) + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + ActionListener.addTimeout(handshakeTimeout, threadPool, threadPool.generic(), listener.delegateFailure((l, response) -> { + if (clusterNamePredicate.test(response.clusterName) == false) { + l.onFailure( + new IllegalStateException( + "handshake with [" + + node + + "] failed: remote cluster name [" + + response.clusterName.value() + + "] does not match " + + clusterNamePredicate + ) + ); + } else if (response.version.isCompatible(localNode.getVersion()) == false) { + l.onFailure( + new IllegalStateException( + "handshake with [" + + node + + "] failed: remote node version [" + + response.version + + "] is incompatible with local node version [" + + localNode.getVersion() + + "]" + ) + ); + } else { + l.onResponse(response); + } + }), () -> {/* cannot cancel handshake, no cleanup to do */}), + HandshakeResponse::new, + threadPool.generic() + ) ); } diff --git a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java index 50b8c860109d6..d18ba54087509 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java @@ -10,13 +10,16 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ReachabilityChecker; import org.hamcrest.Matcher; @@ -35,6 +38,7 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; public class ActionListenerTests extends ESTestCase { @@ -707,4 +711,87 @@ public String toString() { public static Matcher isMappedActionListener() { return instanceOf(ActionListenerImplementations.MappedActionListener.class); } + + public void testAddTimeout() { + final var deterministicTaskQueue = new DeterministicTaskQueue(); + final var threadpool = deterministicTaskQueue.getThreadPool(); + final var expectException = randomBoolean(); + final var expectedOutcome = new Exception("simulated"); + final var timeout = TimeValue.timeValueMillis(randomFrom(0, 1, between(0, 100_000))); + + final var listenerDoesNotTimeOutComplete = new AtomicBoolean(); + final var listenerDoesNotTimeOut = ActionListener.addTimeout( + timeout.millis() == 0 ? null : timeout, + threadpool, + deterministicTaskQueue::scheduleNow, + new ActionListener() { + @Override + public void onResponse(Exception e) { + assertFalse(expectException); + onComplete(e); + } + + @Override + public void onFailure(Exception e) { + assertTrue(expectException); + onComplete(e); + } + + private void onComplete(Object result) { + assertSame(result, expectedOutcome); + assertTrue(listenerDoesNotTimeOutComplete.compareAndSet(false, true)); + if (timeout.millis() != 0) { + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), lessThan(timeout.millis())); + } + } + }, + () -> fail("should not clean up") + ); + final Runnable completer = expectException + ? () -> listenerDoesNotTimeOut.onFailure(expectedOutcome) + : () -> listenerDoesNotTimeOut.onResponse(expectedOutcome); + if (timeout.millis() == 0) { + // special case null timeout + deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + between(0, 100_000), completer); + } else { + deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.getCurrentTimeMillis() + randomLongBetween(0L, timeout.millis() - 1), + completer + ); + } + + final var listenerTimesOutComplete = new AtomicBoolean(); + final var listenerTimesOutCleansUp = new AtomicBoolean(); + final var listenerTimesOut = ActionListener.addTimeout( + timeout, + threadpool, + deterministicTaskQueue::scheduleNow, + new ActionListener<>() { + @Override + public void onResponse(Object o) { + fail("should not complete successfully"); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, instanceOf(ElasticsearchTimeoutException.class)); + assertTrue(listenerTimesOutComplete.compareAndSet(false, true)); + assertEquals(timeout.millis(), deterministicTaskQueue.getCurrentTimeMillis()); + } + }, + () -> assertTrue(listenerTimesOutCleansUp.compareAndSet(false, true)) + ); + + if (randomBoolean()) { + deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.getCurrentTimeMillis() + timeout.millis() + between(1, 200_000), + () -> listenerTimesOut.onResponse(new Object()) + ); + } + + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertTrue(listenerDoesNotTimeOutComplete.get()); + assertTrue(listenerTimesOutComplete.get()); + assertTrue(listenerTimesOutCleansUp.get()); + } }