Skip to content

Commit 6e9185a

Browse files
committed
SOLR-15055: Re-implement 'withCollection'.
1 parent 695e789 commit 6e9185a

37 files changed

+971
-146
lines changed

solr/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ New Features
2525

2626
* SOLR-15019: Replica placement API needs a way to fetch existing replica metrics. (ab, ilan)
2727

28+
* SOLR-15055: Re-implement 'withCollection'. This also adds the placement plugin support
29+
for rejecting replica / collection deletions that would violate placement constraints. (ab, ilan)
30+
2831
Improvements
2932
----------------------
3033
* LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta)

solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Set;
3030

3131
import org.apache.commons.lang3.StringUtils;
32-
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
3332
import org.apache.solr.cloud.overseer.ClusterStateMutator;
3433
import org.apache.solr.cloud.overseer.CollectionMutator;
3534
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -40,6 +39,7 @@
4039
import org.apache.solr.common.cloud.Slice;
4140
import org.apache.solr.common.cloud.ZkNodeProps;
4241
import org.apache.solr.common.cloud.ZkStateReader;
42+
import org.apache.solr.common.params.CollectionAdminParams;
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
4545

@@ -74,8 +74,8 @@ class ExclusiveSliceProperty {
7474
ExclusiveSliceProperty(ClusterState clusterState, ZkNodeProps message) {
7575
this.clusterState = clusterState;
7676
String tmp = message.getStr(ZkStateReader.PROPERTY_PROP);
77-
if (StringUtils.startsWith(tmp, OverseerCollectionMessageHandler.COLL_PROP_PREFIX) == false) {
78-
tmp = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + tmp;
77+
if (!StringUtils.startsWith(tmp, CollectionAdminParams.PROPERTY_PREFIX)) {
78+
tmp = CollectionAdminParams.PROPERTY_PREFIX + tmp;
7979
}
8080
this.property = tmp.toLowerCase(Locale.ROOT);
8181
collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);

solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.apache.solr.cloud.ActiveReplicaWatcher;
4646
import org.apache.solr.cloud.Overseer;
4747
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
48-
import org.apache.solr.cluster.placement.PlacementPlugin;
4948
import org.apache.solr.common.SolrCloseableLatch;
5049
import org.apache.solr.common.SolrException;
5150
import org.apache.solr.common.cloud.ClusterState;
@@ -61,6 +60,7 @@
6160
import org.apache.solr.common.params.ShardParams;
6261
import org.apache.solr.common.util.NamedList;
6362
import org.apache.solr.common.util.Utils;
63+
import org.apache.solr.core.CoreContainer;
6464
import org.apache.solr.handler.component.ShardHandler;
6565
import org.apache.zookeeper.KeeperException;
6666
import org.slf4j.Logger;
@@ -141,7 +141,7 @@ List<ZkNodeProps> addReplica(ClusterState clusterState, ZkNodeProps message, @Su
141141
}
142142

143143
List<CreateReplica> createReplicas = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, replicaTypesVsCount,
144-
ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance())
144+
ocmh.overseer.getCoreContainer())
145145
.stream()
146146
.map(replicaPosition -> assignReplicaDetails(ocmh.cloudManager, clusterState, message, replicaPosition))
147147
.collect(Collectors.toList());
@@ -302,7 +302,7 @@ public static CreateReplica assignReplicaDetails(SolrCloudManager cloudManager,
302302
public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
303303
String collectionName, ZkNodeProps message,
304304
EnumMap<Replica.Type, Integer> replicaTypeVsCount,
305-
PlacementPlugin placementPlugin) throws IOException, InterruptedException {
305+
CoreContainer coreContainer) throws IOException, InterruptedException {
306306
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
307307
boolean skipNodeAssignment = message.getBool(CollectionAdminParams.SKIP_NODE_ASSIGNMENT, false);
308308
String sliceName = message.getStr(SHARD_ID_PROP);
@@ -326,7 +326,7 @@ public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloud
326326
if (!skipCreateReplicaInClusterState && !skipNodeAssignment) {
327327

328328
positions = Assign.getNodesForNewReplicas(clusterState, collection.getName(), sliceName, numNrtReplicas,
329-
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager, placementPlugin);
329+
numTlogReplicas, numPullReplicas, createNodeSetStr, cloudManager, coreContainer);
330330
}
331331

332332
if (positions == null) {

solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.solr.common.cloud.ZkNodeProps;
5252
import org.apache.solr.common.cloud.ZkStateReader;
5353
import org.apache.solr.common.util.StrUtils;
54+
import org.apache.solr.core.CoreContainer;
5455
import org.apache.solr.util.NumberUtils;
5556
import org.apache.zookeeper.CreateMode;
5657
import org.apache.zookeeper.KeeperException;
@@ -270,7 +271,7 @@ public int weight() {
270271
public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
271272
String shard, int nrtReplicas, int tlogReplicas, int pullReplicas,
272273
Object createNodeSet, SolrCloudManager cloudManager,
273-
PlacementPlugin placementPlugin) throws IOException, InterruptedException, AssignmentException {
274+
CoreContainer coreContainer) throws IOException, InterruptedException, AssignmentException {
274275
log.debug("getNodesForNewReplicas() shard: {} , nrtReplicas : {} , tlogReplicas: {} , pullReplicas: {} , createNodeSet {}"
275276
, shard, nrtReplicas, tlogReplicas, pullReplicas, createNodeSet);
276277
DocCollection coll = clusterState.getCollection(collectionName);
@@ -296,7 +297,7 @@ public static List<ReplicaPosition> getNodesForNewReplicas(ClusterState clusterS
296297
.assignPullReplicas(pullReplicas)
297298
.onNodes(createNodeList)
298299
.build();
299-
AssignStrategy assignStrategy = createAssignStrategy(placementPlugin, clusterState, coll);
300+
AssignStrategy assignStrategy = createAssignStrategy(coreContainer, clusterState, coll);
300301
return assignStrategy.assign(cloudManager, assignRequest);
301302
}
302303

@@ -379,9 +380,46 @@ public AssignmentException(String message, Throwable cause, boolean enableSuppre
379380
}
380381
}
381382

383+
/**
384+
* Strategy for assigning replicas to nodes.
385+
*/
382386
public interface AssignStrategy {
387+
388+
/**
389+
* Assign new replicas to nodes.
390+
* @param solrCloudManager current instance of {@link SolrCloudManager}.
391+
* @param assignRequest assign request.
392+
* @return list of {@link ReplicaPosition}-s for new replicas.
393+
* @throws AssignmentException when assignment request cannot produce any valid assignments.
394+
*/
383395
List<ReplicaPosition> assign(SolrCloudManager solrCloudManager, AssignRequest assignRequest)
384-
throws Assign.AssignmentException, IOException, InterruptedException;
396+
throws AssignmentException, IOException, InterruptedException;
397+
398+
/**
399+
* Verify that deleting a collection doesn't violate the replica assignment constraints.
400+
* @param solrCloudManager current instance of {@link SolrCloudManager}.
401+
* @param collection collection to delete.
402+
* @throws AssignmentException when deleting the collection would violate replica assignment constraints.
403+
* @throws IOException on general errors.
404+
*/
405+
default void verifyDeleteCollection(SolrCloudManager solrCloudManager, DocCollection collection)
406+
throws AssignmentException, IOException, InterruptedException {
407+
408+
}
409+
410+
/**
411+
* Verify that deleting these replicas doesn't violate the replica assignment constraints.
412+
* @param solrCloudManager current instance of {@link SolrCloudManager}.
413+
* @param collection collection to delete replicas from.
414+
* @param shardName shard name.
415+
* @param replicas replicas to delete.
416+
* @throws AssignmentException when deleting the replicas would violate replica assignment constraints.
417+
* @throws IOException on general errors.
418+
*/
419+
default void verifyDeleteReplicas(SolrCloudManager solrCloudManager, DocCollection collection, String shardName, Set<Replica> replicas)
420+
throws AssignmentException, IOException, InterruptedException {
421+
422+
}
385423
}
386424

387425
public static class AssignRequest {
@@ -495,7 +533,8 @@ private ImmutableMap<Replica.Type, Integer> countsPerReplicaType(AssignRequest a
495533
* <p>If {@link PlacementPlugin} instance is null this call will return {@link LegacyAssignStrategy}, otherwise
496534
* {@link PlacementPluginAssignStrategy} will be used.</p>
497535
*/
498-
public static AssignStrategy createAssignStrategy(PlacementPlugin placementPlugin, ClusterState clusterState, DocCollection collection) {
536+
public static AssignStrategy createAssignStrategy(CoreContainer coreContainer, ClusterState clusterState, DocCollection collection) {
537+
PlacementPlugin placementPlugin = coreContainer.getPlacementPluginFactory().createPluginInstance();
499538
if (placementPlugin != null) {
500539
// If a cluster wide placement plugin is configured (and that's the only way to define a placement plugin)
501540
return new PlacementPluginAssignStrategy(collection, placementPlugin);

solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.solr.cloud.ZkController;
4242
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
4343
import org.apache.solr.cloud.overseer.ClusterStateMutator;
44-
import org.apache.solr.cluster.placement.PlacementPlugin;
4544
import org.apache.solr.common.SolrException;
4645
import org.apache.solr.common.SolrException.ErrorCode;
4746
import org.apache.solr.common.cloud.Aliases;
@@ -63,6 +62,7 @@
6362
import org.apache.solr.common.util.SimpleOrderedMap;
6463
import org.apache.solr.common.util.TimeSource;
6564
import org.apache.solr.common.util.Utils;
65+
import org.apache.solr.core.CoreContainer;
6666
import org.apache.solr.handler.component.ShardHandler;
6767
import org.apache.solr.handler.component.ShardRequest;
6868
import org.apache.solr.util.TimeOut;
@@ -169,8 +169,8 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin
169169

170170
List<ReplicaPosition> replicaPositions = null;
171171
try {
172-
replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName),
173-
message, shardNames, ocmh.overseer.getCoreContainer().getPlacementPluginFactory().createPluginInstance());
172+
replicaPositions = buildReplicaPositions(ocmh.overseer.getCoreContainer(), ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName),
173+
message, shardNames);
174174
} catch (Assign.AssignmentException e) {
175175
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
176176
new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@@ -288,10 +288,10 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin
288288
}
289289
}
290290

291-
private static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
291+
private static List<ReplicaPosition> buildReplicaPositions(CoreContainer coreContainer, SolrCloudManager cloudManager, ClusterState clusterState,
292292
DocCollection docCollection,
293293
ZkNodeProps message,
294-
List<String> shardNames, PlacementPlugin placementPlugin) throws IOException, InterruptedException, Assign.AssignmentException {
294+
List<String> shardNames) throws IOException, InterruptedException, Assign.AssignmentException {
295295
final String collectionName = message.getStr(NAME);
296296
// look at the replication factor and see if it matches reality
297297
// if it does not, find best nodes to create more cores
@@ -330,7 +330,7 @@ private static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager clou
330330
.assignPullReplicas(numPullReplicas)
331331
.onNodes(nodeList)
332332
.build();
333-
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(placementPlugin, clusterState, docCollection);
333+
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(coreContainer, clusterState, docCollection);
334334
replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
335335
}
336336
return replicaPositions;

solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ public void call(ClusterState state, ZkNodeProps message, @SuppressWarnings({"ra
9292
collection = extCollection;
9393
}
9494

95+
// verify the placement modifications caused by the deletion are allowed
96+
DocCollection coll = state.getCollectionOrNull(collection);
97+
if (coll != null) {
98+
Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(ocmh.overseer.getCoreContainer(), state, coll);
99+
assignStrategy.verifyDeleteCollection(ocmh.cloudManager, coll);
100+
}
101+
95102
final boolean deleteHistory = message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true);
96103

97104
boolean removeCounterNode = true;

solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.solr.cloud.api.collections;
1919

2020

21+
import java.io.IOException;
2122
import java.lang.invoke.MethodHandles;
2223
import java.util.ArrayList;
2324
import java.util.List;
@@ -98,7 +99,7 @@ static void cleanupReplicas(@SuppressWarnings({"rawtypes"})NamedList results,
9899
List<ZkNodeProps> sourceReplicas,
99100
OverseerCollectionMessageHandler ocmh,
100101
String node,
101-
String async) throws InterruptedException {
102+
String async) throws IOException, InterruptedException {
102103
CountDownLatch cleanupLatch = new CountDownLatch(sourceReplicas.size());
103104
for (ZkNodeProps sourceReplica : sourceReplicas) {
104105
String coll = sourceReplica.getStr(COLLECTION_PROP);

0 commit comments

Comments
 (0)