Skip to content

Commit 16d0526

Browse files
committed
Parallelize remotes fetching
1 parent fc0b06a commit 16d0526

File tree

3 files changed

+33
-22
lines changed

3 files changed

+33
-22
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
7373
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
7474
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
75+
import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction;
7576
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
7677
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
7778
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
@@ -641,6 +642,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
641642
actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class);
642643
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
643644
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
645+
actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class);
644646
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
645647
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
646648
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);

server/src/main/java/org/elasticsearch/action/search/RemoteClusterActionListener.java renamed to server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterActionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* in compliance with, at your election, the Elastic License 2.0 or the Server
66
* Side Public License, v 1.
77
*/
8-
package org.elasticsearch.action.search;
8+
package org.elasticsearch.action.admin.cluster.stats;
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.DelegatingActionListener;

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88

99
package org.elasticsearch.action.admin.cluster.stats;
1010

11-
import org.apache.logging.log4j.Level;
1211
import org.apache.logging.log4j.LogManager;
1312
import org.apache.logging.log4j.Logger;
14-
import org.elasticsearch.ExceptionsHelper;
13+
import org.elasticsearch.ElasticsearchException;
14+
import org.elasticsearch.action.ActionFuture;
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.ActionType;
1717
import org.elasticsearch.action.FailedNodeException;
18-
import org.elasticsearch.action.search.RemoteClusterActionListener;
1918
import org.elasticsearch.action.support.ActionFilters;
2019
import org.elasticsearch.action.support.PlainActionFuture;
2120
import org.elasticsearch.cluster.ClusterSnapshotStats;
@@ -43,7 +42,6 @@
4342
import java.util.HashMap;
4443
import java.util.List;
4544
import java.util.Map;
46-
import java.util.concurrent.ExecutionException;
4745
import java.util.function.BiFunction;
4846
import java.util.function.BooleanSupplier;
4947

@@ -89,6 +87,14 @@ public TransportClusterStatsAction(
8987
this.settings = settings;
9088
}
9189

90+
private ActionFuture<Map<String, RemoteClusterStatsResponse>> remoteFuture;
91+
92+
@Override
93+
protected void doExecute(Task task, ClusterStatsRequest request, ActionListener<ClusterStatsResponse> listener) {
94+
remoteFuture = getStatsFromRemotes(request);
95+
super.doExecute(task, request, listener);
96+
}
97+
9298
@Override
9399
protected void newResponseAsync(
94100
final Task task,
@@ -111,8 +117,7 @@ protected void newResponseAsync(
111117
clusterService.threadPool().absoluteTimeInMillis()
112118
);
113119

114-
// TODO: this should not be happening here but leaving it here for now until we figure out proper
115-
// threading/async model for this
120+
// This will wait until remotes are done if it didn't happen yet
116121
var remoteClusterStats = getRemoteClusterStats(request);
117122

118123
final ListenableFuture<MappingStats> mappingStatsStep = new ListenableFuture<>();
@@ -178,7 +183,7 @@ private Map<String, ClusterStatsResponse.RemoteClusterStats> getRemoteClusterSta
178183
return null;
179184
}
180185
Map<String, ClusterStatsResponse.RemoteClusterStats> remoteClustersStats = new HashMap<>();
181-
Map<String, RemoteClusterStatsResponse> remoteData = getStatsFromRemotes(request);
186+
Map<String, RemoteClusterStatsResponse> remoteData = resolveRemoteClusterStats();
182187

183188
for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) {
184189
RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias);
@@ -196,16 +201,27 @@ private Map<String, ClusterStatsResponse.RemoteClusterStats> getRemoteClusterSta
196201
return remoteClustersStats;
197202
}
198203

199-
private Map<String, RemoteClusterStatsResponse> getStatsFromRemotes(ClusterStatsRequest request) {
200-
// TODO: make correct pool
201-
final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
202-
if (request.doRemotes() == false) {
204+
private Map<String, RemoteClusterStatsResponse> resolveRemoteClusterStats() {
205+
try {
206+
return remoteFuture.actionGet();
207+
} catch (ElasticsearchException e) {
208+
logger.warn("Failed to get remote cluster stats", e);
203209
return Map.of();
204210
}
211+
}
212+
213+
private ActionFuture<Map<String, RemoteClusterStatsResponse>> getStatsFromRemotes(ClusterStatsRequest request) {
214+
if (request.doRemotes() == false) {
215+
// this will never be used since getRemoteClusterStats has the same check
216+
return null;
217+
}
218+
219+
// TODO: make correct pool
220+
final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
205221
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();
206222

207-
var remotesListener = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();
208-
var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesListener);
223+
var remotesFuture = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();
224+
var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesFuture);
209225

210226
for (String clusterAlias : remotes) {
211227
ClusterStatsRequest remoteRequest = request.subRequest();
@@ -222,13 +238,6 @@ private Map<String, RemoteClusterStatsResponse> getStatsFromRemotes(ClusterStats
222238

223239
}
224240

225-
try {
226-
// TODO: how do we report errors?
227-
return remotesListener.get();
228-
} catch (InterruptedException | ExecutionException e) {
229-
logger.log(Level.ERROR, "Failed to get remote cluster stats: ", ExceptionsHelper.unwrapCause(e));
230-
return Map.of();
231-
}
241+
return remotesFuture;
232242
}
233-
234243
}

0 commit comments

Comments
 (0)