Skip to content

Commit 1b1c925

Browse files
authored
SOLR-17582 Stream CLUSTERSTATUS API response (#2916)
The CLUSTERSTATUS API will now stream each collection's status to the response, fetching and computing it on the fly. To avoid a backwards compatibility concern, this won't work for wt=javabin.
1 parent b2d18ca commit 1b1c925

File tree

3 files changed

+129
-70
lines changed

3 files changed

+129
-70
lines changed

solr/CHANGES.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ Other Changes
146146
================== 9.9.0 ==================
147147
New Features
148148
---------------------
149-
(No changes)
149+
* SOLR-17582: The CLUSTERSTATUS API will now stream each collection's status to the response,
150+
fetching and computing it on the fly. To avoid a backwards compatibilty concern, this won't work
151+
for wt=javabin. (Matthew Biscocho, David Smiley)
150152

151153
Improvements
152154
---------------------

solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java

Lines changed: 90 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.solr.handler.admin;
1818

1919
import java.util.ArrayList;
20-
import java.util.Arrays;
2120
import java.util.Collection;
2221
import java.util.Collections;
2322
import java.util.HashMap;
@@ -27,15 +26,15 @@
2726
import java.util.Objects;
2827
import java.util.Set;
2928
import java.util.stream.Stream;
29+
import org.apache.solr.common.MapWriter;
3030
import org.apache.solr.common.SolrException;
3131
import org.apache.solr.common.cloud.Aliases;
3232
import org.apache.solr.common.cloud.ClusterState;
3333
import org.apache.solr.common.cloud.DocCollection;
34-
import org.apache.solr.common.cloud.DocRouter;
3534
import org.apache.solr.common.cloud.PerReplicaStates;
3635
import org.apache.solr.common.cloud.Replica;
37-
import org.apache.solr.common.cloud.Slice;
3836
import org.apache.solr.common.cloud.ZkStateReader;
37+
import org.apache.solr.common.params.CommonParams;
3938
import org.apache.solr.common.params.ShardParams;
4039
import org.apache.solr.common.params.SolrParams;
4140
import org.apache.solr.common.util.NamedList;
@@ -180,6 +179,8 @@ private void fetchClusterStatusForCollOrAlias(
180179
String routeKey = solrParams.get(ShardParams._ROUTE_);
181180
String shard = solrParams.get(ZkStateReader.SHARD_ID_PROP);
182181

182+
Set<String> requestedShards = (shard != null) ? Set.of(shard.split(",")) : null;
183+
183184
Stream<DocCollection> collectionStream;
184185
if (collection == null) {
185186
collectionStream = clusterState.collectionStream();
@@ -205,54 +206,35 @@ private void fetchClusterStatusForCollOrAlias(
205206
}
206207
}
207208

208-
// TODO use an Iterable to stream the data to the client instead of gathering it all in mem
209-
210-
NamedList<Object> collectionProps = new SimpleOrderedMap<>();
211-
212-
collectionStream.forEach(
213-
clusterStateCollection -> {
214-
Map<String, Object> collectionStatus;
215-
String name = clusterStateCollection.getName();
216-
217-
Set<String> requestedShards = new HashSet<>();
218-
if (routeKey != null) {
219-
DocRouter router = clusterStateCollection.getRouter();
220-
Collection<Slice> slices =
221-
router.getSearchSlices(routeKey, null, clusterStateCollection);
222-
for (Slice slice : slices) {
223-
requestedShards.add(slice.getName());
224-
}
225-
}
226-
if (shard != null) {
227-
String[] paramShards = shard.split(",");
228-
requestedShards.addAll(Arrays.asList(paramShards));
229-
}
230-
231-
byte[] bytes = Utils.toJSON(clusterStateCollection);
232-
@SuppressWarnings("unchecked")
233-
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
234-
collectionStatus = getCollectionStatus(docCollection, name, requestedShards);
235-
236-
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
237-
collectionStatus.put(
238-
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());
239-
240-
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
241-
collectionStatus.put("aliases", collectionVsAliases.get(name));
242-
}
243-
String configName = clusterStateCollection.getConfigName();
244-
collectionStatus.put("configName", configName);
245-
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
246-
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
247-
collectionStatus.put("PRS", prs);
248-
}
249-
collectionProps.add(name, collectionStatus);
250-
});
251-
252-
// now we need to walk the collectionProps tree to cross-check replica state with live nodes
253-
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionProps);
254-
255-
clusterStatus.add("collections", collectionProps);
209+
// Because of back-compat for SolrJ, create the whole response into a NamedList
210+
// Otherwise stream with MapWriter to save memory
211+
if (CommonParams.JAVABIN.equals(solrParams.get(CommonParams.WT))) {
212+
NamedList<Object> collectionProps = new SimpleOrderedMap<>();
213+
collectionStream.forEach(
214+
collectionState -> {
215+
collectionProps.add(
216+
collectionState.getName(),
217+
buildResponseForCollection(
218+
collectionState, collectionVsAliases, routeKey, liveNodes, requestedShards));
219+
});
220+
clusterStatus.add("collections", collectionProps);
221+
} else {
222+
MapWriter collectionPropsWriter =
223+
ew -> {
224+
collectionStream.forEach(
225+
(collectionState) -> {
226+
ew.putNoEx(
227+
collectionState.getName(),
228+
buildResponseForCollection(
229+
collectionState,
230+
collectionVsAliases,
231+
routeKey,
232+
liveNodes,
233+
requestedShards));
234+
});
235+
};
236+
clusterStatus.add("collections", collectionPropsWriter);
237+
}
256238
}
257239

258240
private void addAliasMap(Aliases aliases, NamedList<Object> clusterStatus) {
@@ -307,23 +289,20 @@ private Map<String, Object> getCollectionStatus(
307289
*/
308290
@SuppressWarnings("unchecked")
309291
protected void crossCheckReplicaStateWithLiveNodes(
310-
List<String> liveNodes, NamedList<Object> collectionProps) {
311-
for (Map.Entry<String, Object> next : collectionProps) {
312-
Map<String, Object> collMap = (Map<String, Object>) next.getValue();
313-
Map<String, Object> shards = (Map<String, Object>) collMap.get("shards");
314-
for (Object nextShard : shards.values()) {
315-
Map<String, Object> shardMap = (Map<String, Object>) nextShard;
316-
Map<String, Object> replicas = (Map<String, Object>) shardMap.get("replicas");
317-
for (Object nextReplica : replicas.values()) {
318-
Map<String, Object> replicaMap = (Map<String, Object>) nextReplica;
319-
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP))
320-
!= Replica.State.DOWN) {
321-
// not down, so verify the node is live
322-
String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP);
323-
if (!liveNodes.contains(node_name)) {
324-
// node is not live, so this replica is actually down
325-
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
326-
}
292+
List<String> liveNodes, Map<String, Object> collectionProps) {
293+
var shards = (Map<String, Object>) collectionProps.get("shards");
294+
for (Object nextShard : shards.values()) {
295+
var shardMap = (Map<String, Object>) nextShard;
296+
var replicas = (Map<String, Object>) shardMap.get("replicas");
297+
for (Object nextReplica : replicas.values()) {
298+
var replicaMap = (Map<String, Object>) nextReplica;
299+
if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP))
300+
!= Replica.State.DOWN) {
301+
// not down, so verify the node is live
302+
String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP);
303+
if (!liveNodes.contains(node_name)) {
304+
// node is not live, so this replica is actually down
305+
replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
327306
}
328307
}
329308
}
@@ -368,4 +347,47 @@ public static Map<String, Object> postProcessCollectionJSON(Map<String, Object>
368347
collection.put("health", Health.combine(healthStates).toString());
369348
return collection;
370349
}
350+
351+
private Map<String, Object> buildResponseForCollection(
352+
DocCollection clusterStateCollection,
353+
Map<String, List<String>> collectionVsAliases,
354+
String routeKey,
355+
List<String> liveNodes,
356+
Set<String> requestedShards) {
357+
Map<String, Object> collectionStatus;
358+
Set<String> shards = new HashSet<>();
359+
String name = clusterStateCollection.getName();
360+
361+
if (routeKey != null)
362+
clusterStateCollection
363+
.getRouter()
364+
.getSearchSlices(routeKey, null, clusterStateCollection)
365+
.forEach((slice) -> shards.add(slice.getName()));
366+
367+
if (requestedShards != null) shards.addAll(requestedShards);
368+
369+
byte[] bytes = Utils.toJSON(clusterStateCollection);
370+
@SuppressWarnings("unchecked")
371+
Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
372+
collectionStatus = getCollectionStatus(docCollection, name, shards);
373+
374+
collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
375+
collectionStatus.put(
376+
"creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());
377+
378+
if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
379+
collectionStatus.put("aliases", collectionVsAliases.get(name));
380+
}
381+
String configName = clusterStateCollection.getConfigName();
382+
collectionStatus.put("configName", configName);
383+
if (solrParams.getBool("prs", false) && clusterStateCollection.isPerReplicaState()) {
384+
PerReplicaStates prs = clusterStateCollection.getPerReplicaStates();
385+
collectionStatus.put("PRS", prs);
386+
}
387+
388+
// now we need to walk the collectionProps tree to cross-check replica state with live nodes
389+
crossCheckReplicaStateWithLiveNodes(liveNodes, collectionStatus);
390+
391+
return collectionStatus;
392+
}
371393
}

solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.solr.cloud.api.collections;
1818

19+
import com.fasterxml.jackson.databind.ObjectMapper;
1920
import java.io.IOException;
2021
import java.time.Instant;
2122
import java.util.ArrayList;
@@ -28,6 +29,7 @@
2829
import org.apache.solr.client.solrj.SolrClient;
2930
import org.apache.solr.client.solrj.SolrServerException;
3031
import org.apache.solr.client.solrj.impl.CloudSolrClient;
32+
import org.apache.solr.client.solrj.impl.NoOpResponseParser;
3133
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
3234
import org.apache.solr.client.solrj.request.QueryRequest;
3335
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
@@ -81,7 +83,6 @@ public void test() throws Exception {
8183
client.request(req);
8284
createCollection(null, COLLECTION_NAME1, 1, 1, client, null, "conf1");
8385
}
84-
8586
waitForCollection(ZkStateReader.from(cloudClient), COLLECTION_NAME, 2);
8687
waitForCollection(ZkStateReader.from(cloudClient), COLLECTION_NAME1, 1);
8788
waitForRecoveriesToFinish(COLLECTION_NAME, false);
@@ -91,6 +92,7 @@ public void test() throws Exception {
9192
clusterStatusNoCollection();
9293
clusterStatusWithCollection();
9394
clusterStatusWithCollectionAndShard();
95+
clusterStatusWithCollectionAndShardJSON();
9496
clusterStatusWithCollectionAndMultipleShards();
9597
clusterStatusWithCollectionHealthState();
9698
clusterStatusWithRouteKey();
@@ -648,6 +650,39 @@ private void clusterStatusAliasTest() throws Exception {
648650
}
649651
}
650652

653+
@SuppressWarnings("unchecked")
654+
private void clusterStatusWithCollectionAndShardJSON() throws IOException, SolrServerException {
655+
656+
try (CloudSolrClient client = createCloudClient(null)) {
657+
ObjectMapper mapper = new ObjectMapper();
658+
659+
ModifiableSolrParams params = new ModifiableSolrParams();
660+
params.set("action", CollectionParams.CollectionAction.CLUSTERSTATUS.toString());
661+
params.set("collection", COLLECTION_NAME);
662+
params.set("shard", SHARD1);
663+
params.set("wt", "json");
664+
QueryRequest request = new QueryRequest(params);
665+
request.setResponseParser(new NoOpResponseParser("json"));
666+
request.setPath("/admin/collections");
667+
NamedList<Object> rsp = client.request(request);
668+
String actualResponse = (String) rsp.get("response");
669+
670+
Map<String, Object> result = mapper.readValue(actualResponse, Map.class);
671+
672+
var cluster = (Map<String, Object>) result.get("cluster");
673+
assertNotNull("Cluster state should not be null", cluster);
674+
var collections = (Map<String, Object>) cluster.get("collections");
675+
assertNotNull("Collections should not be null in cluster state", collections);
676+
assertNotNull(collections.get(COLLECTION_NAME));
677+
assertEquals(1, collections.size());
678+
var collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
679+
var shardStatus = (Map<String, Object>) collection.get("shards");
680+
assertEquals(1, shardStatus.size());
681+
Map<String, Object> selectedShardStatus = (Map<String, Object>) shardStatus.get(SHARD1);
682+
assertNotNull(selectedShardStatus);
683+
}
684+
}
685+
651686
private void clusterStatusRolesTest() throws Exception {
652687
try (CloudSolrClient client = createCloudClient(null)) {
653688
client.connect();

0 commit comments

Comments
 (0)