Skip to content

Commit 6fdf908

Browse files
Force reconnect to remote clusters with a short timeout for CPS
1 parent ff52007 commit 6fdf908

File tree

2 files changed

+177
-69
lines changed

2 files changed

+177
-69
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 145 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.support.ActionFilters;
3131
import org.elasticsearch.action.support.HandledTransportAction;
3232
import org.elasticsearch.action.support.IndicesOptions;
33+
import org.elasticsearch.action.support.SubscribableListener;
3334
import org.elasticsearch.action.support.master.MasterNodeRequest;
3435
import org.elasticsearch.client.internal.Client;
3536
import org.elasticsearch.cluster.ClusterState;
@@ -167,6 +168,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
167168
private final Client client;
168169
private final UsageService usageService;
169170
private final boolean collectTelemetry;
171+
private final boolean alwaysEstablishConnection;
172+
private static final long FORCE_CONNECT_CLUSTER_DEFAULT_TIMEOUT = 3L;
170173

171174
@Inject
172175
public TransportSearchAction(
@@ -215,6 +218,8 @@ public TransportSearchAction(
215218
this.searchResponseMetrics = searchResponseMetrics;
216219
this.client = client;
217220
this.usageService = usageService;
221+
alwaysEstablishConnection = settings.getAsBoolean("serverless.force_cluster_reconnect", false);
222+
logger.info("Should force reconnect cluster: {}", alwaysEstablishConnection);
218223
}
219224

220225
private Map<String, OriginalIndices> buildPerIndexOriginalIndices(
@@ -445,7 +450,9 @@ void executeRequest(
445450
projectState,
446451
clusters,
447452
searchPhaseProvider.apply(l)
448-
)
453+
),
454+
transportService,
455+
alwaysEstablishConnection
449456
);
450457
} else {
451458
final SearchContextId searchContext = resolvedIndices.getSearchContextId();
@@ -505,7 +512,8 @@ void executeRequest(
505512
clusters,
506513
searchPhaseProvider.apply(finalDelegate)
507514
);
508-
})
515+
}),
516+
alwaysEstablishConnection
509517
);
510518
}
511519
}
@@ -633,6 +641,40 @@ public static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
633641
|| source.collapse().getInnerHits().isEmpty();
634642
}
635643

644+
/**
645+
* Return a subscribable listener with optional timeout depending on force reconnect setting is registered or
646+
* not.
647+
* @param alwaysEstablishConnection If we're running in a context where we always need to re-connect.
648+
* This value determines if we need to add a short timeout to avoid waiting
649+
* for long durations to reconnect.
650+
* @param threadPool The thread pool that'll be used for the timeout.
651+
* @param timeoutExecutor The executor that should be used for the timeout.
652+
* @return SubscribableListener A listener with optionally added timeout.
653+
*/
654+
private static SubscribableListener<Transport.Connection> getListenerWithOptionalTimeout(
655+
boolean alwaysEstablishConnection,
656+
ThreadPool threadPool,
657+
Executor timeoutExecutor
658+
) {
659+
var subscribableListener = new SubscribableListener<Transport.Connection>();
660+
if (alwaysEstablishConnection) {
661+
subscribableListener.addTimeout(TimeValue.timeValueSeconds(FORCE_CONNECT_CLUSTER_DEFAULT_TIMEOUT), threadPool, timeoutExecutor);
662+
}
663+
664+
return subscribableListener;
665+
}
666+
667+
/**
668+
* The default disconnected strategy for Elasticsearch is RECONNECT_UNLESS_SKIP_UNAVAILABLE. So we either force
669+
* connect if required (like in CPS) or when skip unavailable is false for a cluster.
670+
* @param alwaysEstablishConnection If we're running in a context where we always need to re-connect.
671+
* @param skipUnavailable The usual skip unavailable setting.
672+
* @return boolean If we should always force reconnect.
673+
*/
674+
private static boolean shouldEstablishConnection(boolean alwaysEstablishConnection, boolean skipUnavailable) {
675+
return alwaysEstablishConnection || skipUnavailable == false;
676+
}
677+
636678
/**
637679
* Handles ccs_minimize_roundtrips=true
638680
*/
@@ -647,7 +689,9 @@ static void ccsRemoteReduce(
647689
RemoteClusterService remoteClusterService,
648690
ThreadPool threadPool,
649691
ActionListener<SearchResponse> listener,
650-
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer
692+
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer,
693+
TransportService transportService,
694+
boolean shouldForceReconnectCluster
651695
) {
652696
final var remoteClientResponseExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
653697
if (resolvedIndices.getLocalIndices() == null && resolvedIndices.getRemoteClusterIndices().size() == 1) {
@@ -665,12 +709,9 @@ static void ccsRemoteReduce(
665709
timeProvider.absoluteStartMillis(),
666710
true
667711
);
668-
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
669-
clusterAlias,
670-
remoteClientResponseExecutor,
671-
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
672-
);
673-
remoteClusterClient.execute(TransportSearchAction.REMOTE_TYPE, ccsSearchRequest, new ActionListener<>() {
712+
713+
var connectionListener = getListenerWithOptionalTimeout(shouldForceReconnectCluster, threadPool, remoteClientResponseExecutor);
714+
var searchListener = new ActionListener<SearchResponse>() {
674715
@Override
675716
public void onResponse(SearchResponse searchResponse) {
676717
// overwrite the existing cluster entry with the updated one
@@ -713,7 +754,25 @@ public void onFailure(Exception e) {
713754
listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e));
714755
}
715756
}
716-
});
757+
};
758+
759+
connectionListener.addListener(
760+
searchListener.delegateFailure(
761+
(responseListener, connection) -> transportService.sendRequest(
762+
connection,
763+
TransportSearchAction.TYPE.name(),
764+
ccsSearchRequest,
765+
TransportRequestOptions.EMPTY,
766+
new ActionListenerResponseHandler<>(responseListener, SearchResponse::new, remoteClientResponseExecutor)
767+
)
768+
)
769+
);
770+
771+
remoteClusterService.maybeEnsureConnectedAndGetConnection(
772+
clusterAlias,
773+
shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable),
774+
connectionListener
775+
);
717776
} else {
718777
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(
719778
searchRequest.source(),
@@ -748,12 +807,30 @@ public void onFailure(Exception e) {
748807
task.getProgressListener(),
749808
listener
750809
);
751-
final var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
810+
811+
SubscribableListener<Transport.Connection> connectionListener = getListenerWithOptionalTimeout(
812+
shouldForceReconnectCluster,
813+
threadPool,
814+
remoteClientResponseExecutor
815+
);
816+
817+
connectionListener.addListener(
818+
ccsListener.delegateFailure(
819+
(responseListener, connection) -> transportService.sendRequest(
820+
connection,
821+
TransportSearchAction.REMOTE_TYPE.name(),
822+
ccsSearchRequest,
823+
TransportRequestOptions.EMPTY,
824+
new ActionListenerResponseHandler<>(responseListener, SearchResponse::new, remoteClientResponseExecutor)
825+
)
826+
)
827+
);
828+
829+
remoteClusterService.maybeEnsureConnectedAndGetConnection(
752830
clusterAlias,
753-
remoteClientResponseExecutor,
754-
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
831+
shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable),
832+
connectionListener
755833
);
756-
remoteClusterClient.execute(TransportSearchAction.REMOTE_TYPE, ccsSearchRequest, ccsListener);
757834
}
758835
if (resolvedIndices.getLocalIndices() != null) {
759836
ActionListener<SearchResponse> ccsListener = createCCSListener(
@@ -819,7 +896,8 @@ static void collectSearchShards(
819896
SearchResponse.Clusters clusters,
820897
SearchTimeProvider timeProvider,
821898
TransportService transportService,
822-
ActionListener<Map<String, SearchShardsResponse>> listener
899+
ActionListener<Map<String, SearchShardsResponse>> listener,
900+
boolean shouldForceReconnectCluster
823901
) {
824902
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
825903
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
@@ -848,49 +926,59 @@ Map<String, SearchShardsResponse> createFinalResponse() {
848926
return searchShardsResponses;
849927
}
850928
};
929+
930+
var threadPool = transportService.getThreadPool();
931+
var connectionListener = getListenerWithOptionalTimeout(
932+
shouldForceReconnectCluster,
933+
threadPool,
934+
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION)
935+
);
936+
937+
connectionListener.addListener(singleListener.delegateFailure((responseListener, connection) -> {
938+
final String[] indices = entry.getValue().indices();
939+
final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
940+
// TODO: support point-in-time
941+
if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
942+
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
943+
indices,
944+
indicesOptions,
945+
query,
946+
routing,
947+
preference,
948+
allowPartialResults,
949+
clusterAlias
950+
);
951+
transportService.sendRequest(
952+
connection,
953+
TransportSearchShardsAction.TYPE.name(),
954+
searchShardsRequest,
955+
TransportRequestOptions.EMPTY,
956+
new ActionListenerResponseHandler<>(responseListener, SearchShardsResponse::new, responseExecutor)
957+
);
958+
} else {
959+
// does not do a can-match
960+
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(
961+
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
962+
indices
963+
).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
964+
transportService.sendRequest(
965+
connection,
966+
TransportClusterSearchShardsAction.TYPE.name(),
967+
searchShardsRequest,
968+
TransportRequestOptions.EMPTY,
969+
new ActionListenerResponseHandler<>(
970+
singleListener.map(SearchShardsResponse::fromLegacyResponse),
971+
ClusterSearchShardsResponse::new,
972+
responseExecutor
973+
)
974+
);
975+
}
976+
}));
977+
851978
remoteClusterService.maybeEnsureConnectedAndGetConnection(
852979
clusterAlias,
853-
skipUnavailable == false,
854-
singleListener.delegateFailureAndWrap((delegate, connection) -> {
855-
final String[] indices = entry.getValue().indices();
856-
final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
857-
// TODO: support point-in-time
858-
if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
859-
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
860-
indices,
861-
indicesOptions,
862-
query,
863-
routing,
864-
preference,
865-
allowPartialResults,
866-
clusterAlias
867-
);
868-
transportService.sendRequest(
869-
connection,
870-
TransportSearchShardsAction.TYPE.name(),
871-
searchShardsRequest,
872-
TransportRequestOptions.EMPTY,
873-
new ActionListenerResponseHandler<>(delegate, SearchShardsResponse::new, responseExecutor)
874-
);
875-
} else {
876-
// does not do a can-match
877-
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(
878-
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
879-
indices
880-
).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
881-
transportService.sendRequest(
882-
connection,
883-
TransportClusterSearchShardsAction.TYPE.name(),
884-
searchShardsRequest,
885-
TransportRequestOptions.EMPTY,
886-
new ActionListenerResponseHandler<>(
887-
delegate.map(SearchShardsResponse::fromLegacyResponse),
888-
ClusterSearchShardsResponse::new,
889-
responseExecutor
890-
)
891-
);
892-
}
893-
})
980+
shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable),
981+
connectionListener
894982
);
895983
}
896984
}
@@ -1990,6 +2078,7 @@ private void recordTelemetry() {
19902078

19912079
/**
19922080
* Extract telemetry data from the search response.
2081+
*
19932082
* @param searchResponse The final response from the search.
19942083
*/
19952084
private void extractCCSTelemetry(SearchResponse searchResponse) {

0 commit comments

Comments
 (0)