Skip to content
39 changes: 39 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@
package org.elasticsearch.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LeakTracker;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -496,4 +502,37 @@ static <Response> ActionListener<Response> withRef(ActionListener<Response> list
return releaseAfter(listener, ref::decRef);
}

/**
* Wrap the {@code delegate} listener so that it completes with a {@link ElasticsearchTimeoutException} if it has not been completed
* by the time the {@code timeout} elapses, and will also invoke the {@code cleanupOnTimeout} callback in this case. Completing the
* returned listener before the timeout has elapsed will cancel the timeout.
*/
static <T> ActionListener<T> addTimeout(
@Nullable TimeValue timeout,
ThreadPool threadPool,
Executor executor,
ActionListener<T> delegate,
Runnable cleanupOnTimeout
) {
if (timeout == null) {
return delegate;
} else {
var result = new SubscribableListener<T>();
result.addListener(delegate);
result.addListener(new ActionListener<T>() {
@Override
public void onResponse(T t) {}

@Override
public void onFailure(Exception e) {
if (e instanceof ElasticsearchTimeoutException) {
cleanupOnTimeout.run();
}
}
});
result.addTimeout(timeout, threadPool, executor);
return result;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,18 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request,
node,
TYPE.name(),
nodeRequest,
TransportRequestOptions.timeout(request.getTimeout()),
new ActionListenerResponseHandler<>(listener, GetTaskResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
TransportRequestOptions.EMPTY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this parameter ultimately be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also used to select between the different channel types (see org.elasticsearch.transport.TransportRequestOptions.Type) so no we still need it. But the goal is to remove org.elasticsearch.transport.TransportRequestOptions#timeout once it's unused.

new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
request.getTimeout(),
threadPool,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
listener,
() -> { /* TODO cancel the remote tasks? */}
),
GetTaskResponse::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,35 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
} else {
DiscoveryNode ingestNode = getRandomIngestNode(ingestNodes.values());
logger.trace("forwarding request [{}] to ingest node [{}]", actionName, ingestNode);
ActionListenerResponseHandler<SimulatePipelineResponse> handler = new ActionListenerResponseHandler<>(
listener,
SimulatePipelineResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
);
if (task == null) {
transportService.sendRequest(ingestNode, actionName, request, handler);
transportService.sendRequest(
ingestNode,
actionName,
request,
new ActionListenerResponseHandler<>(
listener,
SimulatePipelineResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
} else {
transportService.sendChildRequest(
ingestNode,
actionName,
request,
task,
TransportRequestOptions.timeout(ingestNodeTransportActionTimeout),
handler
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
ingestNodeTransportActionTimeout,
transportService.getThreadPool(),
TransportResponseHandler.TRANSPORT_WORKER,
listener,
() -> { /* TODO cancel the remote task? */}
),
SimulatePipelineResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,18 @@ private void sendUnpromotableRequests(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
TransportRequestOptions.timeout(postWriteRefreshTimeout),
new ActionListenerResponseHandler<>(listener.safeMap(r -> wasForced), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
postWriteRefreshTimeout,
transportService.getThreadPool(),
refreshExecutor,
listener.safeMap(r -> wasForced),
() -> { /* TODO cancel the remote task? */}
),
in -> ActionResponse.Empty.INSTANCE,
refreshExecutor
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,9 +1112,15 @@ private <R extends ActionResponse, T> Scheduler.Cancellable sendTransportRequest
masterEligibleNode,
actionName,
transportActionRequest,
TransportRequestOptions.timeout(transportTimeout),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
ActionListener.addTimeout(
transportTimeout,
transportService.getThreadPool(),
clusterCoordinationExecutor,
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
() -> { /* TODO cancel the remote task? */}
),
responseReader,
clusterCoordinationExecutor
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,31 @@ public void onResponse(Releasable releasable) {
node,
MasterHistoryAction.NAME,
new MasterHistoryAction.Request(),
TransportRequestOptions.timeout(remoteMasterHistoryTimeout),
new ActionListenerResponseHandler<>(ActionListener.runBefore(new ActionListener<>() {

@Override
public void onResponse(MasterHistoryAction.Response response) {
long endTime = System.nanoTime();
logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
remoteHistoryOrException = new RemoteHistoryOrException(
response.getMasterHistory(),
currentTimeMillisSupplier.getAsLong()
);
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception in master history request to master node", e);
remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong());
}
}, () -> Releasables.close(releasable)),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
remoteMasterHistoryTimeout,
transportService.getThreadPool(),
TransportResponseHandler.TRANSPORT_WORKER,
ActionListener.runBefore(new ActionListener<>() {
@Override
public void onResponse(MasterHistoryAction.Response response) {
long endTime = System.nanoTime();
logger.trace("Received history from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime));
remoteHistoryOrException = new RemoteHistoryOrException(
response.getMasterHistory(),
currentTimeMillisSupplier.getAsLong()
);
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception in master history request to master node", e);
remoteHistoryOrException = new RemoteHistoryOrException(e, currentTimeMillisSupplier.getAsLong());
}
}, () -> { /* TODO cancel the remote task? */}),
() -> Releasables.close(releasable)
),
MasterHistoryAction.Response::new,
TransportResponseHandler.TRANSPORT_WORKER
)
Expand Down
79 changes: 38 additions & 41 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -29,12 +29,10 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -524,54 +522,53 @@ private void requestPeers() {

final List<DiscoveryNode> knownNodes = List.copyOf(getFoundPeersUnderLock());

final TransportResponseHandler<PeersResponse> peersResponseHandler = new TransportResponseHandler<>() {
final TransportResponseHandler<PeersResponse> peersResponseHandler = new ActionListenerResponseHandler<>(
ActionListener.addTimeout(
requestPeersTimeout,
transportService.getThreadPool(),
clusterCoordinationExecutor,
new ActionListener<>() {
@Override
public void onResponse(PeersResponse response) {
logger.trace("{} received {}", Peer.this, response);
synchronized (mutex) {
peersRequestInFlight = false;

@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}
if (isActive() == false) {
logger.trace("Peer#requestPeers inactive: {}", Peer.this);
return;
}

@Override
public void handleResponse(PeersResponse response) {
logger.trace("{} received {}", Peer.this, response);
synchronized (mutex) {
peersRequestInFlight = false;
lastKnownMasterNode = response.getMasterNode();
response.getMasterNode().ifPresent(node -> startProbe(node.getAddress()));
for (DiscoveryNode node : response.getKnownPeers()) {
startProbe(node.getAddress());
}
}

if (isActive() == false) {
logger.trace("Peer#requestPeers inactive: {}", Peer.this);
return;
if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
// Must not hold lock here to avoid deadlock
assert holdsLock() == false : "PeerFinder mutex is held in error";
onActiveMasterFound(discoveryNode, response.getTerm());
}
}

lastKnownMasterNode = response.getMasterNode();
response.getMasterNode().ifPresent(node -> startProbe(node.getAddress()));
for (DiscoveryNode node : response.getKnownPeers()) {
startProbe(node.getAddress());
@Override
public void onFailure(Exception e) {
peersRequestInFlight = false;
logger.warn(() -> format("%s peers request failed", Peer.this), e);
}
}

if (response.getMasterNode().equals(Optional.of(discoveryNode))) {
// Must not hold lock here to avoid deadlock
assert holdsLock() == false : "PeerFinder mutex is held in error";
onActiveMasterFound(discoveryNode, response.getTerm());
}
}

@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.warn(() -> format("%s peers request failed", Peer.this), exp);
}

@Override
public Executor executor() {
return clusterCoordinationExecutor;
}
};
},
() -> { /* TODO cancel the remote task? */}
),
PeersResponse::new,
clusterCoordinationExecutor
);
transportService.sendRequest(
discoveryNode,
REQUEST_PEERS_ACTION_NAME,
new PeersRequest(getLocalNode(), knownNodes),
TransportRequestOptions.timeout(requestPeersTimeout),
TransportRequestOptions.EMPTY,
peersResponseHandler
);
}
Expand Down
Loading