Skip to content

Commit c28df8e

Browse files
authored
Parallelize master stats in TransportNodeStatsAction (#113236)
Today we reach out to the master for its stats after collecting all the node-level stats. With #113140 we can do these things in parallel very simply. This commit does so.
1 parent 8d223cb commit c28df8e

File tree

1 file changed

+41
-21
lines changed

1 file changed

+41
-21
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
1717
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1818
import org.elasticsearch.action.support.ActionFilters;
19+
import org.elasticsearch.action.support.SubscribableListener;
1920
import org.elasticsearch.action.support.nodes.TransportNodesAction;
21+
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
2022
import org.elasticsearch.client.internal.node.NodeClient;
2123
import org.elasticsearch.cluster.node.DiscoveryNode;
2224
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
@@ -47,7 +49,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<
4749
NodesStatsResponse,
4850
TransportNodesStatsAction.NodeStatsRequest,
4951
NodeStats,
50-
Void> {
52+
SubscribableListener<TransportGetAllocationStatsAction.Response>> {
5153

5254
public static final ActionType<NodesStatsResponse> TYPE = new ActionType<>("cluster:monitor/nodes/stats");
5355

@@ -77,37 +79,55 @@ public TransportNodesStatsAction(
7779

7880
@Override
7981
protected NodesStatsResponse newResponse(NodesStatsRequest request, List<NodeStats> responses, List<FailedNodeException> failures) {
80-
return new NodesStatsResponse(clusterService.getClusterName(), responses, failures);
82+
assert false;
83+
throw new UnsupportedOperationException("use newResponseAsync instead");
84+
}
85+
86+
@Override
87+
protected SubscribableListener<TransportGetAllocationStatsAction.Response> createActionContext(Task task, NodesStatsRequest request) {
88+
return SubscribableListener.newForked(l -> {
89+
var metrics = request.getNodesStatsRequestParameters().requestedMetrics();
90+
if (metrics.contains(Metric.FS) || metrics.contains(Metric.ALLOCATIONS)) {
91+
new ParentTaskAssigningClient(client, clusterService.localNode(), task).execute(
92+
TransportGetAllocationStatsAction.TYPE,
93+
new TransportGetAllocationStatsAction.Request(
94+
Objects.requireNonNullElse(request.timeout(), RestUtils.REST_MASTER_TIMEOUT_DEFAULT),
95+
new TaskId(clusterService.localNode().getId(), task.getId()),
96+
metrics
97+
),
98+
l
99+
);
100+
} else {
101+
l.onResponse(null);
102+
}
103+
});
81104
}
82105

83106
@Override
84107
protected void newResponseAsync(
85108
Task task,
86109
NodesStatsRequest request,
87-
Void actionContext,
110+
SubscribableListener<TransportGetAllocationStatsAction.Response> actionContext,
88111
List<NodeStats> responses,
89112
List<FailedNodeException> failures,
90113
ActionListener<NodesStatsResponse> listener
91114
) {
92-
var metrics = request.getNodesStatsRequestParameters().requestedMetrics();
93-
if (metrics.contains(Metric.FS) || metrics.contains(Metric.ALLOCATIONS)) {
94-
client.execute(
95-
TransportGetAllocationStatsAction.TYPE,
96-
new TransportGetAllocationStatsAction.Request(
97-
Objects.requireNonNullElse(request.timeout(), RestUtils.REST_MASTER_TIMEOUT_DEFAULT),
98-
new TaskId(clusterService.localNode().getId(), task.getId()),
99-
metrics
100-
),
101-
listener.delegateFailure(
102-
(l, r) -> ActionListener.respondAndRelease(
103-
l,
104-
newResponse(request, merge(responses, r.getNodeAllocationStats(), r.getDiskThresholdSettings()), failures)
105-
)
115+
actionContext
116+
// merge in the stats from the master, if available
117+
.andThenApply(
118+
getAllocationStatsResponse -> new NodesStatsResponse(
119+
clusterService.getClusterName(),
120+
getAllocationStatsResponse == null
121+
? responses
122+
: merge(
123+
responses,
124+
getAllocationStatsResponse.getNodeAllocationStats(),
125+
getAllocationStatsResponse.getDiskThresholdSettings()
126+
),
127+
failures
106128
)
107-
);
108-
} else {
109-
ActionListener.run(listener, l -> ActionListener.respondAndRelease(l, newResponse(request, responses, failures)));
110-
}
129+
)
130+
.addListener(listener);
111131
}
112132

113133
private static List<NodeStats> merge(

0 commit comments

Comments
 (0)