Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/136890.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136890
summary: "Cat API: added endpoint for Circuit Breakers"
area: Infra/REST API
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@
import org.elasticsearch.rest.action.cat.RestAliasAction;
import org.elasticsearch.rest.action.cat.RestAllocationAction;
import org.elasticsearch.rest.action.cat.RestCatAction;
import org.elasticsearch.rest.action.cat.RestCatCircuitBreakerAction;
import org.elasticsearch.rest.action.cat.RestCatComponentTemplateAction;
import org.elasticsearch.rest.action.cat.RestCatRecoveryAction;
import org.elasticsearch.rest.action.cat.RestFielddataAction;
Expand Down Expand Up @@ -1020,6 +1021,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction());
registerHandler.accept(new RestAliasAction());
registerHandler.accept(new RestThreadPoolAction());
registerHandler.accept(new RestCatCircuitBreakerAction());
registerHandler.accept(new RestPluginsAction());
registerHandler.accept(new RestFielddataAction());
registerHandler.accept(new RestNodeAttrsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.action.cat;

import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestActionListener;
import org.elasticsearch.rest.action.RestResponseListener;

import java.util.List;
import java.util.Set;

import static org.elasticsearch.common.util.set.Sets.addToCopy;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;

@ServerlessScope(Scope.INTERNAL)
public class RestCatCircuitBreakerAction extends AbstractCatAction {

private static final Set<String> RESPONSE_PARAMS = addToCopy(AbstractCatAction.RESPONSE_PARAMS, "circuit_breaker_patterns");

@Override
public String getName() {
return "cat_circuitbreaker_action";
}

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_cat/circuit_breaker"), new Route(GET, "/_cat/circuit_breaker/{circuit_breaker_patterns}"));
}

@Override
protected void documentation(StringBuilder sb) {
sb.append("/_cat/circuit_breaker\n");
sb.append("/_cat/circuit_breaker/{circuit_breaker_patterns}\n");
}

@Override
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

@Override
protected RestChannelConsumer doCatRequest(RestRequest request, NodeClient client) {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
clusterStateRequest.clear().nodes(true);

return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(
clusterStateResponse.getState().nodes().stream().map(DiscoveryNode::getId).toArray(String[]::new)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of a pity to have DiscoveryNode here, get the ID, and then have BaseNodeRequest#resolveNodes find the DiscoveryNode again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little subtle, but this should be unnecessary: AFAICT we're always getting these stats from all nodes, but that's the default behaviour, we can just pass the empty array. See org.elasticsearch.action.support.nodes.BaseNodesRequest#resolveNodes which calls org.elasticsearch.cluster.node.DiscoveryNodes#resolveNodes which does this:

if (nodes == null || nodes.length == 0) {
return stream().map(DiscoveryNode::getId).toArray(String[]::new);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! After looking through the code I saw what you were pointing at, it is indeed redundant to get the nodes from the ClusterState with a separate request. I have removed it

);
nodesStatsRequest.clear().addMetric(NodesStatsRequestParameters.Metric.BREAKER);
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<>(channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to avoid this kind of nesting, and possibly generate better response in case of failure, you might consider ListenableFuture. It makes reasoning more complicated though (IMO). See RestShardsAction for an example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please prefer SubscribableListener over ListenableFuture. They're basically the same, except ListenableFuture adds multiple layers of wrapping around the exceptions it encounters for largely-historical reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip @DaveCTurner!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll keep that in mind for the future. For now, after removing the first request there's only 1 action executed so it shouldn't be needed.

@Override
public RestResponse buildResponse(final NodesStatsResponse nodesStatsResponse) throws Exception {
return RestTable.buildResponse(buildTable(request, nodesStatsResponse), channel);
}
});
}
});
}

@Override
protected Table getTableWithHeader(RestRequest request) {
Table table = new Table();
table.startHeaders();
table.addCell("node_id", "default:true;alias:id;desc:persistent node id");
table.addCell("node_name", "default:false;alias:nn;desc:node name");
table.addCell("breaker", "default:true;alias:br;desc:breaker name");
table.addCell("limit", "default:true;alias:l;desc:limit size");
table.addCell("limit_bytes", "default:false;alias:lb;desc:limit size in bytes");
table.addCell("estimated", "default:true;alias:e;desc:estimated size");
table.addCell("estimated_bytes", "default:false;alias:eb;desc:estimated size in bytes");
table.addCell("tripped", "default:true;alias:t;desc:tripped count");
table.addCell("overhead", "default:false;alias:o;desc:overhead");
table.endHeaders();
return table;
}

private Table buildTable(RestRequest request, NodesStatsResponse nodesStatsResponse) {
final Table table = getTableWithHeader(request);
final String[] circuitBreakers = request.paramAsStringArray("circuit_breaker_patterns", new String[] { "*" });

for (final NodeStats nodeStats : nodesStatsResponse.getNodes()) {
if (nodeStats.getBreaker() == null) {
continue;
}
for (final CircuitBreakerStats circuitBreakerStats : nodeStats.getBreaker().getAllStats()) {
if (Regex.simpleMatch(circuitBreakers, circuitBreakerStats.getName()) == false) {
continue;
}
table.startRow();
table.addCell(nodeStats.getNode().getId());
table.addCell(nodeStats.getNode().getName());
table.addCell(circuitBreakerStats.getName());
table.addCell(ByteSizeValue.ofBytes(circuitBreakerStats.getLimit()));
table.addCell(circuitBreakerStats.getLimit());
table.addCell(ByteSizeValue.ofBytes(circuitBreakerStats.getEstimated()));
table.addCell(circuitBreakerStats.getEstimated());
table.addCell(circuitBreakerStats.getTrippedCount());
table.addCell(circuitBreakerStats.getOverhead());
table.endRow();
}
}
return table;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.rest.action.cat;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestResponseUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.client.NoOpNodeClient;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.test.rest.RestActionTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Stream;

import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RestCatCircuitBreakerActionTests extends RestActionTestCase {

private RestCatCircuitBreakerAction action;
private NodeClient nodeClient;

@Override
public void setUp() throws Exception {
super.setUp();
action = new RestCatCircuitBreakerAction();
ClusterStateResponse clusterStateResponse = createClusterStateResponse();
NodesStatsResponse nodeStatsResponse = mock(NodesStatsResponse.class);
List<NodeStats> allNodeStats = createNodeStatsList();
when(nodeStatsResponse.getNodes()).thenReturn(allNodeStats);
try (var threadPool = createThreadPool()) {
nodeClient = buildNodeClient(threadPool, clusterStateResponse, nodeStatsResponse);
}
}

public void testRestCatCircuitBreakerActionSetup() {
assertEquals("cat_circuitbreaker_action", action.getName());
assertEquals(2, action.routes().size());
assertEquals(GET, action.routes().getFirst().getMethod());
assertEquals("/_cat/circuit_breaker", action.routes().getFirst().getPath());
assertEquals(GET, action.routes().get(1).getMethod());
assertEquals("/_cat/circuit_breaker/{circuit_breaker_patterns}", action.routes().get(1).getPath());

StringBuilder sb = new StringBuilder();
action.documentation(sb);
assertEquals("/_cat/circuit_breaker\n/_cat/circuit_breaker/{circuit_breaker_patterns}\n", sb.toString());
}

public void testRestCatCircuitBreakerAction() throws Exception {
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(GET)
.withPath("/_cat/circuit_breaker")
.build();
FakeRestChannel channel = new FakeRestChannel(restRequest, true, 0);

action.handleRequest(restRequest, channel, nodeClient);

assertThat(channel.responses().get(), equalTo(1));
try (RestResponse response = channel.capturedResponse()) {
assertThat(response.status(), equalTo(RestStatus.OK));
String responseContent = RestResponseUtils.getBodyContent(response).utf8ToString();
assertEquals(
"node-1 request 1.4mb 750kb 0\n"
+ "node-1 normal 2.4mb 1.2mb 1\n"
+ "node-2 request 1.4mb 1.3mb 25\n"
+ "node-3 big 1.5gb 768mb 5\n",
responseContent
);
}
}

public void testRestCatCircuitBreakerActionWithPatternMatching() throws Exception {
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(GET)
.withPath("/_cat/circuit_breaker/request")
.withParams(Map.of("circuit_breaker_patterns", "request"))
.build();
FakeRestChannel channel = new FakeRestChannel(restRequest, true, 0);

action.handleRequest(restRequest, channel, nodeClient);

assertThat(channel.responses().get(), equalTo(1));
try (RestResponse response = channel.capturedResponse()) {
assertThat(response.status(), equalTo(RestStatus.OK));
String responseContent = RestResponseUtils.getBodyContent(response).utf8ToString();
assertEquals("node-1 request 1.4mb 750kb 0\n" + "node-2 request 1.4mb 1.3mb 25\n", responseContent);
}
}

private ClusterStateResponse createClusterStateResponse() {
DiscoveryNode node1 = createDiscoveryNode("node-1", "test-node-1");
DiscoveryNode node2 = createDiscoveryNode("node-2", "test-node-2");
DiscoveryNode node3 = createDiscoveryNode("node-3", "test-node-3");
DiscoveryNodes discoveryNodes = createDiscoveryNodes(node1, node2, node3);
ClusterState clusterState = createClusterState(discoveryNodes);
return createClusterStateResponse(clusterState);
}

private DiscoveryNode createDiscoveryNode(final String nodeId, final String nodeName) {
DiscoveryNode node = mock(DiscoveryNode.class);
when(node.getId()).thenReturn(nodeId);
when(node.getName()).thenReturn(nodeName);
return node;
}

private DiscoveryNodes createDiscoveryNodes(final DiscoveryNode... nodes) {
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(discoveryNodes.stream()).thenReturn(Stream.of(nodes));
return discoveryNodes;
}

private ClusterState createClusterState(final DiscoveryNodes discoveryNodes) {
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.nodes()).thenReturn(discoveryNodes);
return clusterState;
}

private ClusterStateResponse createClusterStateResponse(final ClusterState clusterState) {
ClusterStateResponse clusterStateResponse = mock(ClusterStateResponse.class);
when(clusterStateResponse.getState()).thenReturn(clusterState);
return clusterStateResponse;
}

private List<NodeStats> createNodeStatsList() {
return List.of(
createNodeStats(
"node-1",
"test-node-1",
createCircuitBreakerStats(
createBreakerStats("request", 1536000L, 768000L, 1.5, 0L),
createBreakerStats("normal", 2560000L, 1280000L, 1.0, 1L)
)
),
createNodeStats(
"node-2",
"test-node-2",
createCircuitBreakerStats(createBreakerStats("request", 1536000L, 1459200L, 1.5, 25L))
),
createNodeStats("node-3", "test-node-3", createCircuitBreakerStats(createBreakerStats("big", 1610612736L, 805306368L, 1.2, 5L)))
);
}

private NodeStats createNodeStats(final String nodeId, final String nodeName, final AllCircuitBreakerStats breakerStats) {
NodeStats nodeStats = mock(NodeStats.class);
DiscoveryNode node = mock(DiscoveryNode.class);
when(node.getId()).thenReturn(nodeId);
when(node.getName()).thenReturn(nodeName);
when(nodeStats.getNode()).thenReturn(node);
when(nodeStats.getBreaker()).thenReturn(breakerStats);
return nodeStats;
}

private CircuitBreakerStats createBreakerStats(
final String name,
final long limit,
final long estimated,
final double overhead,
final long trippedCount
) {
CircuitBreakerStats breaker = mock(CircuitBreakerStats.class);
when(breaker.getName()).thenReturn(name);
when(breaker.getLimit()).thenReturn(limit);
when(breaker.getEstimated()).thenReturn(estimated);
when(breaker.getOverhead()).thenReturn(overhead);
when(breaker.getTrippedCount()).thenReturn(trippedCount);
return breaker;
}

private AllCircuitBreakerStats createCircuitBreakerStats(final CircuitBreakerStats... breakers) {
AllCircuitBreakerStats allStats = mock(AllCircuitBreakerStats.class);
when(allStats.getAllStats()).thenReturn(breakers);
return allStats;
}

private NoOpNodeClient buildNodeClient(
ThreadPool threadPool,
ClusterStateResponse clusterStateResponse,
NodesStatsResponse nodesStatsResponse
) {
return new NoOpNodeClient(threadPool) {
@Override
@SuppressWarnings("unchecked")
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (request instanceof ClusterStateRequest) {
listener.onResponse((Response) clusterStateResponse);
} else if (request instanceof NodesStatsRequest) {
listener.onResponse((Response) nodesStatsResponse);
} else {
throw new AssertionError(String.format(Locale.ROOT, "Unexpected action type: %s request: %s", action, request));
}
}
};
}
}