Skip to content
39 changes: 39 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@
package org.elasticsearch.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LeakTracker;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -496,4 +502,37 @@ static <Response> ActionListener<Response> withRef(ActionListener<Response> list
return releaseAfter(listener, ref::decRef);
}

/**
* Wrap the {@code delegate} listener so that it completes with a {@link ElasticsearchTimeoutException} if it has not been completed
* by the time the {@code timeout} elapses, and will also invoke the {@code cleanupOnTimeout} callback in this case. Completing the
* returned listener before the timeout has elapsed will cancel the timeout.
*/
static <T> ActionListener<T> addTimeout(
@Nullable TimeValue timeout,
ThreadPool threadPool,
Executor executor,
ActionListener<T> delegate,
Runnable cleanupOnTimeout
) {
if (timeout == null) {
return delegate;
} else {
var result = new SubscribableListener<T>();
result.addListener(delegate);
result.addListener(new ActionListener<T>() {
@Override
public void onResponse(T t) {}

@Override
public void onFailure(Exception e) {
if (e instanceof ElasticsearchTimeoutException) {
cleanupOnTimeout.run();
}
}
});
result.addTimeout(timeout, threadPool, executor);
return result;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,18 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request,
node,
TYPE.name(),
nodeRequest,
TransportRequestOptions.timeout(request.getTimeout()),
new ActionListenerResponseHandler<>(listener, GetTaskResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
TransportRequestOptions.EMPTY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this parameter ultimately be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also used to select between the different channel types (see org.elasticsearch.transport.TransportRequestOptions.Type) so no we still need it. But the goal is to remove org.elasticsearch.transport.TransportRequestOptions#timeout once it's unused.

new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
request.getTimeout(),
threadPool,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
listener,
() -> { /* TODO cancel the remote tasks? */}
),
GetTaskResponse::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,35 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
} else {
DiscoveryNode ingestNode = getRandomIngestNode(ingestNodes.values());
logger.trace("forwarding request [{}] to ingest node [{}]", actionName, ingestNode);
ActionListenerResponseHandler<SimulatePipelineResponse> handler = new ActionListenerResponseHandler<>(
listener,
SimulatePipelineResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
);
if (task == null) {
transportService.sendRequest(ingestNode, actionName, request, handler);
transportService.sendRequest(
ingestNode,
actionName,
request,
new ActionListenerResponseHandler<>(
listener,
SimulatePipelineResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
} else {
transportService.sendChildRequest(
ingestNode,
actionName,
request,
task,
TransportRequestOptions.timeout(ingestNodeTransportActionTimeout),
handler
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
ingestNodeTransportActionTimeout,
transportService.getThreadPool(),
TransportResponseHandler.TRANSPORT_WORKER,
listener,
() -> { /* TODO cancel the remote task? */}
),
SimulatePipelineResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,18 @@ private void sendUnpromotableRequests(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
TransportRequestOptions.timeout(postWriteRefreshTimeout),
new ActionListenerResponseHandler<>(listener.safeMap(r -> wasForced), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
postWriteRefreshTimeout,
transportService.getThreadPool(),
refreshExecutor,
listener.safeMap(r -> wasForced),
() -> { /* TODO cancel the remote task? */}
),
in -> ActionResponse.Empty.INSTANCE,
refreshExecutor
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,9 +1112,15 @@ private <R extends ActionResponse, T> Scheduler.Cancellable sendTransportRequest
masterEligibleNode,
actionName,
transportActionRequest,
TransportRequestOptions.timeout(transportTimeout),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
ActionListener.addTimeout(
transportTimeout,
transportService.getThreadPool(),
clusterCoordinationExecutor,
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
() -> { /* TODO cancel the remote task? */}
),
responseReader,
clusterCoordinationExecutor
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,31 @@ public void onResponse(Releasable releasable) {
node,
MasterHistoryAction.NAME,
new MasterHistoryAction.Request(),
TransportRequestOptions.timeout(remoteMasterHistoryTimeout),
new ActionListenerResponseHandler<>(ActionListener.runBefore(new ActionListener<>() {

@Override
public void onResponse(MasterHistoryAction.Response response) {
long endTime = System.nanoTime();
logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
remoteHistoryOrException = new RemoteHistoryOrException(
response.getMasterHistory(),
currentTimeMillisSupplier.getAsLong()
);
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception in master history request to master node", e);
remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong());
}
}, () -> Releasables.close(releasable)),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
remoteMasterHistoryTimeout,
transportService.getThreadPool(),
TransportResponseHandler.TRANSPORT_WORKER,
ActionListener.runBefore(new ActionListener<>() {
@Override
public void onResponse(MasterHistoryAction.Response response) {
long endTime = System.nanoTime();
logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
remoteHistoryOrException = new RemoteHistoryOrException(
response.getMasterHistory(),
currentTimeMillisSupplier.getAsLong()
);
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception in master history request to master node", e);
remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong());
}
}, () -> Releasables.close(releasable)),
() -> { /* TODO cancel the remote task? */}
),
MasterHistoryAction.Response::new,
TransportResponseHandler.TRANSPORT_WORKER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -120,31 +119,42 @@ protected void doExecute(Task task, final Request request, ActionListener<Respon
});
} else {
logger.trace("forwarding request [{}] to health node [{}]", actionName, healthNode);
ActionListenerResponseHandler<Response> handler = new ActionListenerResponseHandler<>(
listener,
responseReader,
TransportResponseHandler.TRANSPORT_WORKER
) {
@Override
public void handleException(final TransportException exception) {
logger.trace(
() -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode),
exception
);
listener.onFailure(exception);
}
};
final ActionListener<Response> listenerWithExceptionLogging = logger.isTraceEnabled()
? listener.delegateResponse((delegate, e) -> {
logger.trace(() -> format("failure when forwarding request [%s] to health node [%s]", actionName, healthNode), e);
delegate.onFailure(e);
})
: listener;
if (task != null) {
transportService.sendChildRequest(
healthNode,
actionName,
request,
task,
TransportRequestOptions.timeout(healthNodeTransportActionTimeout),
handler
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
healthNodeTransportActionTimeout,
threadPool,
TransportResponseHandler.TRANSPORT_WORKER,
listenerWithExceptionLogging,
() -> { /* TODO cancel the remote task? */}
),
responseReader,
TransportResponseHandler.TRANSPORT_WORKER
)
);
} else {
transportService.sendRequest(healthNode, actionName, request, handler);
transportService.sendRequest(
healthNode,
actionName,
request,
new ActionListenerResponseHandler<>(
listenerWithExceptionLogging,
responseReader,
TransportResponseHandler.TRANSPORT_WORKER
)
);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,35 +615,39 @@ public void handshake(
connection,
HANDSHAKE_ACTION_NAME,
HandshakeRequest.INSTANCE,
TransportRequestOptions.timeout(handshakeTimeout),
new ActionListenerResponseHandler<>(listener.delegateFailure((l, response) -> {
if (clusterNamePredicate.test(response.clusterName) == false) {
l.onFailure(
new IllegalStateException(
"handshake with ["
+ node
+ "] failed: remote cluster name ["
+ response.clusterName.value()
+ "] does not match "
+ clusterNamePredicate
)
);
} else if (response.version.isCompatible(localNode.getVersion()) == false) {
l.onFailure(
new IllegalStateException(
"handshake with ["
+ node
+ "] failed: remote node version ["
+ response.version
+ "] is incompatible with local node version ["
+ localNode.getVersion()
+ "]"
)
);
} else {
l.onResponse(response);
}
}), HandshakeResponse::new, threadPool.generic())
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(handshakeTimeout, threadPool, threadPool.generic(), listener.delegateFailure((l, response) -> {
if (clusterNamePredicate.test(response.clusterName) == false) {
l.onFailure(
new IllegalStateException(
"handshake with ["
+ node
+ "] failed: remote cluster name ["
+ response.clusterName.value()
+ "] does not match "
+ clusterNamePredicate
)
);
} else if (response.version.isCompatible(localNode.getVersion()) == false) {
l.onFailure(
new IllegalStateException(
"handshake with ["
+ node
+ "] failed: remote node version ["
+ response.version
+ "] is incompatible with local node version ["
+ localNode.getVersion()
+ "]"
)
);
} else {
l.onResponse(response);
}
}), () -> {/* cannot cancel handshake, no cleanup to do */}),
HandshakeResponse::new,
threadPool.generic()
)
);
}

Expand Down
Loading