Skip to content

Commit da8bd52

Browse files
Use a user configurable timeout than a boolean
1 parent d2c727a commit da8bd52

File tree

2 files changed

+35
-36
lines changed

2 files changed

+35
-36
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
168168
private final Client client;
169169
private final UsageService usageService;
170170
private final boolean collectTelemetry;
171-
private final boolean alwaysEstablishConnection;
172-
private static final long FORCE_CONNECT_CLUSTER_DEFAULT_TIMEOUT = 3L;
171+
private final TimeValue forceConnectTimeoutSecs;
173172

174173
@Inject
175174
public TransportSearchAction(
@@ -218,8 +217,8 @@ public TransportSearchAction(
218217
this.searchResponseMetrics = searchResponseMetrics;
219218
this.client = client;
220219
this.usageService = usageService;
221-
alwaysEstablishConnection = settings.getAsBoolean("serverless.force_cluster_reconnect", false);
222-
logger.info("Should force reconnect cluster: {}", alwaysEstablishConnection);
220+
forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null);
221+
logger.info("Should force reconnect cluster: {}", forceConnectTimeoutSecs);
223222
}
224223

225224
private Map<String, OriginalIndices> buildPerIndexOriginalIndices(
@@ -452,7 +451,7 @@ void executeRequest(
452451
searchPhaseProvider.apply(l)
453452
),
454453
transportService,
455-
alwaysEstablishConnection
454+
forceConnectTimeoutSecs
456455
);
457456
} else {
458457
final SearchContextId searchContext = resolvedIndices.getSearchContextId();
@@ -513,7 +512,7 @@ void executeRequest(
513512
searchPhaseProvider.apply(finalDelegate)
514513
);
515514
}),
516-
alwaysEstablishConnection
515+
forceConnectTimeoutSecs
517516
);
518517
}
519518
}
@@ -644,21 +643,20 @@ public static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
644643
/**
645644
* Return a subscribable listener with optional timeout depending on force reconnect setting is registered or
646645
* not.
647-
* @param alwaysEstablishConnection If we're running in a context where we always need to re-connect.
648-
* This value determines if we need to add a short timeout to avoid waiting
649-
* for long durations to reconnect.
646+
* @param forceConnectTimeoutSecs Timeout in seconds that determines how long we'll wait to establish a connection
647+
* to a remote.
650648
* @param threadPool The thread pool that'll be used for the timeout.
651649
* @param timeoutExecutor The executor that should be used for the timeout.
652650
* @return SubscribableListener A listener with optionally added timeout.
653651
*/
654652
private static SubscribableListener<Transport.Connection> getListenerWithOptionalTimeout(
655-
boolean alwaysEstablishConnection,
653+
TimeValue forceConnectTimeoutSecs,
656654
ThreadPool threadPool,
657655
Executor timeoutExecutor
658656
) {
659657
var subscribableListener = new SubscribableListener<Transport.Connection>();
660-
if (alwaysEstablishConnection) {
661-
subscribableListener.addTimeout(TimeValue.timeValueSeconds(FORCE_CONNECT_CLUSTER_DEFAULT_TIMEOUT), threadPool, timeoutExecutor);
658+
if (forceConnectTimeoutSecs != null) {
659+
subscribableListener.addTimeout(forceConnectTimeoutSecs, threadPool, timeoutExecutor);
662660
}
663661

664662
return subscribableListener;
@@ -667,12 +665,13 @@ private static SubscribableListener<Transport.Connection> getListenerWithOptiona
667665
/**
668666
* The default disconnected strategy for Elasticsearch is RECONNECT_UNLESS_SKIP_UNAVAILABLE. So we either force
669667
* connect if required (like in CPS) or when skip unavailable is false for a cluster.
670-
* @param alwaysEstablishConnection If we're running in a context where we always need to re-connect.
668+
* @param forceConnectTimeoutSecs The timeout value from the force connect setting.
669+
* If it is set, use it as it takes precedence.
671670
* @param skipUnavailable The usual skip unavailable setting.
672671
* @return boolean If we should always force reconnect.
673672
*/
674-
private static boolean shouldEstablishConnection(boolean alwaysEstablishConnection, boolean skipUnavailable) {
675-
return alwaysEstablishConnection || skipUnavailable == false;
673+
private static boolean shouldEstablishConnection(TimeValue forceConnectTimeoutSecs, boolean skipUnavailable) {
674+
return forceConnectTimeoutSecs != null || skipUnavailable == false;
676675
}
677676

678677
/**
@@ -691,7 +690,7 @@ static void ccsRemoteReduce(
691690
ActionListener<SearchResponse> listener,
692691
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer,
693692
TransportService transportService,
694-
boolean shouldForceReconnectCluster
693+
TimeValue forceConnectTimeoutSecs
695694
) {
696695
final var remoteClientResponseExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION);
697696
if (resolvedIndices.getLocalIndices() == null && resolvedIndices.getRemoteClusterIndices().size() == 1) {
@@ -710,7 +709,7 @@ static void ccsRemoteReduce(
710709
true
711710
);
712711

713-
var connectionListener = getListenerWithOptionalTimeout(shouldForceReconnectCluster, threadPool, remoteClientResponseExecutor);
712+
var connectionListener = getListenerWithOptionalTimeout(forceConnectTimeoutSecs, threadPool, remoteClientResponseExecutor);
714713
var searchListener = new ActionListener<SearchResponse>() {
715714
@Override
716715
public void onResponse(SearchResponse searchResponse) {
@@ -770,7 +769,7 @@ public void onFailure(Exception e) {
770769

771770
remoteClusterService.maybeEnsureConnectedAndGetConnection(
772771
clusterAlias,
773-
shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable),
772+
shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable),
774773
connectionListener
775774
);
776775
} else {
@@ -809,7 +808,7 @@ public void onFailure(Exception e) {
809808
);
810809

811810
SubscribableListener<Transport.Connection> connectionListener = getListenerWithOptionalTimeout(
812-
shouldForceReconnectCluster,
811+
forceConnectTimeoutSecs,
813812
threadPool,
814813
remoteClientResponseExecutor
815814
);
@@ -828,7 +827,7 @@ public void onFailure(Exception e) {
828827

829828
remoteClusterService.maybeEnsureConnectedAndGetConnection(
830829
clusterAlias,
831-
shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable),
830+
shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable),
832831
connectionListener
833832
);
834833
}
@@ -897,7 +896,7 @@ static void collectSearchShards(
897896
SearchTimeProvider timeProvider,
898897
TransportService transportService,
899898
ActionListener<Map<String, SearchShardsResponse>> listener,
900-
boolean shouldForceReconnectCluster
899+
TimeValue forceConnectTimeoutSecs
901900
) {
902901
RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
903902
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
@@ -929,7 +928,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
929928

930929
var threadPool = transportService.getThreadPool();
931930
var connectionListener = getListenerWithOptionalTimeout(
932-
shouldForceReconnectCluster,
931+
forceConnectTimeoutSecs,
933932
threadPool,
934933
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION)
935934
);
@@ -977,7 +976,7 @@ Map<String, SearchShardsResponse> createFinalResponse() {
977976

978977
remoteClusterService.maybeEnsureConnectedAndGetConnection(
979978
clusterAlias,
980-
shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable),
979+
shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable),
981980
connectionListener
982981
);
983982
}

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ public void testCCSRemoteReduceMergeFails() throws Exception {
548548
listener,
549549
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
550550
service,
551-
false
551+
null
552552
);
553553
if (localIndices == null) {
554554
assertNull(setOnce.get());
@@ -625,7 +625,7 @@ public void testCCSRemoteReduce() throws Exception {
625625
listener,
626626
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
627627
service,
628-
false
628+
null
629629
);
630630
if (localIndices == null) {
631631
assertNull(setOnce.get());
@@ -683,7 +683,7 @@ public void testCCSRemoteReduce() throws Exception {
683683
listener,
684684
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
685685
service,
686-
false
686+
null
687687
);
688688
if (localIndices == null) {
689689
assertNull(setOnce.get());
@@ -773,7 +773,7 @@ public void testCCSRemoteReduceWhereRemoteClustersFail() throws Exception {
773773
listener,
774774
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
775775
service,
776-
false
776+
null
777777
);
778778
if (localIndices == null) {
779779
assertNull(setOnce.get());
@@ -874,7 +874,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
874874
listener,
875875
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
876876
service,
877-
false
877+
null
878878
);
879879
if (localIndices == null) {
880880
assertNull(setOnce.get());
@@ -888,7 +888,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
888888
assertNotNull(failure.get());
889889
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
890890
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
891-
// assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
891+
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
892892
}
893893

894894
// setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
@@ -927,7 +927,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
927927
listener,
928928
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
929929
service,
930-
false
930+
null
931931
);
932932
if (localIndices == null) {
933933
assertNull(setOnce.get());
@@ -1002,7 +1002,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
10021002
listener,
10031003
(r, l) -> setOnce.set(Tuple.tuple(r, l)),
10041004
service,
1005-
false
1005+
null
10061006
);
10071007
if (localIndices == null) {
10081008
assertNull(setOnce.get());
@@ -1098,7 +1098,7 @@ public void testCollectSearchShards() throws Exception {
10981098
timeProvider,
10991099
service,
11001100
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
1101-
false
1101+
null
11021102
);
11031103
awaitLatch(latch, 5, TimeUnit.SECONDS);
11041104
assertNotNull(response.get());
@@ -1128,7 +1128,7 @@ public void testCollectSearchShards() throws Exception {
11281128
timeProvider,
11291129
service,
11301130
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
1131-
false
1131+
null
11321132
);
11331133
awaitLatch(latch, 5, TimeUnit.SECONDS);
11341134
assertEquals(numClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
@@ -1177,7 +1177,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
11771177
timeProvider,
11781178
service,
11791179
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch),
1180-
false
1180+
null
11811181
);
11821182
awaitLatch(latch, 5, TimeUnit.SECONDS);
11831183
assertEquals(numDisconnectedClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED));
@@ -1208,7 +1208,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
12081208
timeProvider,
12091209
service,
12101210
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
1211-
false
1211+
null
12121212
);
12131213
awaitLatch(latch, 5, TimeUnit.SECONDS);
12141214
assertNotNull(response.get());
@@ -1255,7 +1255,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce
12551255
timeProvider,
12561256
service,
12571257
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch),
1258-
false
1258+
null
12591259
);
12601260
awaitLatch(latch, 5, TimeUnit.SECONDS);
12611261
assertEquals(0, clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED));

0 commit comments

Comments
 (0)