Skip to content

Commit fc0b06a

Browse files
committed
Improve failure handling
1 parent c6e7c04 commit fc0b06a

File tree

7 files changed

+86
-44
lines changed

7 files changed

+86
-44
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
7373
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
7474
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
75-
import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction;
7675
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
7776
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
7877
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
@@ -642,7 +641,6 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
642641
actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class);
643642
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
644643
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
645-
actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class);
646644
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
647645
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
648646
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
142142

143143
if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) {
144144
builder.startObject("ccs");
145-
if (remoteClustersStats != null) {
145+
if (remoteClustersStats != null && remoteClustersStats.isEmpty() == false) {
146146
builder.field("clusters", remoteClustersStats);
147147
}
148148
ccsMetrics.toXContent(builder, params);

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public class RemoteClusterStatsResponse extends BaseNodesResponse<ClusterStatsNo
3131
private final long indicesBytes;
3232
private final long heapBytes;
3333
private final long memBytes;
34-
private String remoteName;
3534

3635
public Set<String> getVersions() {
3736
return versions;
@@ -61,14 +60,6 @@ public long getMemBytes() {
6160
return memBytes;
6261
}
6362

64-
public String getRemoteName() {
65-
return remoteName;
66-
}
67-
68-
public void setRemoteName(String remoteName) {
69-
this.remoteName = remoteName;
70-
}
71-
7263
public RemoteClusterStatsResponse(
7364
ClusterName clusterName,
7465
String clusterUUID,
@@ -135,20 +126,4 @@ protected List<ClusterStatsNodeResponse> readNodesFrom(StreamInput in) throws IO
135126

136127
@Override
137128
protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException {}
138-
139-
/**
140-
* Default empty response, can be used in case the cluster did not respond.
141-
*/
142-
public static final RemoteClusterStatsResponse EMPTY = new RemoteClusterStatsResponse(
143-
ClusterName.DEFAULT,
144-
"",
145-
ClusterHealthStatus.RED,
146-
Set.of(),
147-
0,
148-
0,
149-
0,
150-
0,
151-
0,
152-
0
153-
);
154129
}

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.ActionType;
1717
import org.elasticsearch.action.FailedNodeException;
18+
import org.elasticsearch.action.search.RemoteClusterActionListener;
1819
import org.elasticsearch.action.support.ActionFilters;
19-
import org.elasticsearch.action.support.GroupedActionListener;
2020
import org.elasticsearch.action.support.PlainActionFuture;
2121
import org.elasticsearch.cluster.ClusterSnapshotStats;
2222
import org.elasticsearch.cluster.ClusterState;
@@ -40,15 +40,16 @@
4040
import org.elasticsearch.transport.Transports;
4141
import org.elasticsearch.usage.UsageService;
4242

43-
import java.util.Collection;
4443
import java.util.HashMap;
4544
import java.util.List;
4645
import java.util.Map;
4746
import java.util.concurrent.ExecutionException;
4847
import java.util.function.BiFunction;
4948
import java.util.function.BooleanSupplier;
50-
import java.util.stream.Collectors;
5149

50+
/**
51+
* Transport action implementing _cluster/stats API.
52+
*/
5253
public class TransportClusterStatsAction extends TransportClusterStatsBaseAction<ClusterStatsResponse> {
5354

5455
public static final ActionType<ClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/stats");
@@ -203,32 +204,27 @@ private Map<String, RemoteClusterStatsResponse> getStatsFromRemotes(ClusterStats
203204
}
204205
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();
205206

206-
var remotesListener = new PlainActionFuture<Collection<RemoteClusterStatsResponse>>();
207-
GroupedActionListener<RemoteClusterStatsResponse> groupListener = new GroupedActionListener<>(remotes.size(), remotesListener);
207+
var remotesListener = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();
208+
var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesListener);
208209

209210
for (String clusterAlias : remotes) {
210211
ClusterStatsRequest remoteRequest = request.subRequest();
211212
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
212213
clusterAlias,
213214
remoteClientResponseExecutor,
214-
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
215+
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
215216
);
216-
// TODO: this should collect all successful requests, not fail once one of them fails
217217
remoteClusterClient.execute(
218218
TransportRemoteClusterStatsAction.REMOTE_TYPE,
219219
remoteRequest,
220-
groupListener.delegateFailure((l, r) -> {
221-
r.setRemoteName(clusterAlias);
222-
l.onResponse(r);
223-
})
220+
groupListener.remoteListener(clusterAlias)
224221
);
225222

226223
}
227224

228225
try {
229-
Collection<RemoteClusterStatsResponse> remoteStats = remotesListener.get();
230-
// Convert the list to map
231-
return remoteStats.stream().collect(Collectors.toMap(RemoteClusterStatsResponse::getRemoteName, r -> r));
226+
// TODO: how do we report errors?
227+
return remotesListener.get();
232228
} catch (InterruptedException | ExecutionException e) {
233229
logger.log(Level.ERROR, "Failed to get remote cluster stats: ", ExceptionsHelper.unwrapCause(e));
234230
return Map.of();

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@
2929
import java.util.HashSet;
3030
import java.util.List;
3131

32+
/**
33+
* Transport action for remote cluster stats. It returs a reduced answer since most of the stats from the remote
34+
* cluster are not needed.
35+
*/
3236
public class TransportRemoteClusterStatsAction extends TransportClusterStatsBaseAction<RemoteClusterStatsResponse> {
3337

34-
public static final ActionType<RemoteClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/remote_stats");
38+
public static final ActionType<RemoteClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/stats/remote");
3539
public static final RemoteClusterActionType<RemoteClusterStatsResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
3640
TYPE.name(),
3741
RemoteClusterStatsResponse::new
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.action.search;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.DelegatingActionListener;
12+
import org.elasticsearch.common.util.concurrent.CountDown;
13+
14+
import java.util.Map;
15+
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
/**
19+
* Action listener for operations that are performed on a group of remote clusters.
20+
* It will wait for all operations to complete and then delegate to the upstream listener.
21+
* Does not fail if one of the operations fails.
22+
* <br>
23+
* Returns a map of the results per cluster name via {@link #remoteListener(String)} method.
24+
* This is the listener that should be used to perform the individual operation on the remote cluster.
25+
*
26+
* @param <T> the type of the individual per-cluster result
27+
*/
28+
public class RemoteClusterActionListener<T> extends DelegatingActionListener<T, Map<String, T>> {
29+
private final CountDown countDown;
30+
private final Map<String, T> results;
31+
private final AtomicReference<Exception> failure = new AtomicReference<>();
32+
33+
public RemoteClusterActionListener(int groupSize, ActionListener<Map<String, T>> delegate) {
34+
super(delegate);
35+
if (groupSize <= 0) {
36+
assert false : "illegal group size [" + groupSize + "]";
37+
throw new IllegalArgumentException("groupSize must be greater than 0 but was " + groupSize);
38+
}
39+
results = new ConcurrentHashMap<>(groupSize);
40+
countDown = new CountDown(groupSize);
41+
}
42+
43+
public ActionListener<T> remoteListener(String clusterAlias) {
44+
return delegateFailure((l, r) -> {
45+
results.put(clusterAlias, r);
46+
l.onResponse(r);
47+
});
48+
}
49+
50+
@Override
51+
public void onResponse(T element) {
52+
if (countDown.countDown()) {
53+
delegate.onResponse(results);
54+
}
55+
}
56+
57+
@Override
58+
public void onFailure(Exception e) {
59+
// TODO: how do we report the failures?
60+
final var firstException = failure.compareAndExchange(null, e);
61+
if (firstException != null && firstException != e) {
62+
firstException.addSuppressed(e);
63+
}
64+
if (countDown.countDown()) {
65+
delegate.onResponse(results);
66+
}
67+
}
68+
}

x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,8 @@ public void testToXContent() throws IOException {
433433
MappingStats.of(metadata, () -> {}),
434434
AnalysisStats.of(metadata, () -> {}),
435435
VersionStats.of(metadata, singletonList(mockNodeResponse)),
436-
ClusterSnapshotStats.EMPTY
436+
ClusterSnapshotStats.EMPTY,
437+
Map.of()
437438
);
438439

439440
final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);

0 commit comments

Comments
 (0)