Skip to content

Commit 7cf16cd

Browse files
dsmileyCopilot
andauthored
SOLR-17877: Move distributedCollectionCommandRunner to ZkController (#3507)
A refactoring to improve separation-of-concerns. --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: David Smiley <[email protected]>
1 parent 7f7683d commit 7cf16cd

27 files changed

+98
-145
lines changed

solr/core/src/java/org/apache/solr/cloud/ZkController.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
6767
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
6868
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
69+
import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner;
6970
import org.apache.solr.cloud.overseer.ClusterStateMutator;
7071
import org.apache.solr.cloud.overseer.OverseerAction;
7172
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -223,6 +224,8 @@ public String toString() {
223224

224225
private final DistributedClusterStateUpdater distributedClusterStateUpdater;
225226

227+
private final Optional<DistributedCollectionConfigSetCommandRunner> distributedCommandRunner;
228+
226229
private LeaderElector overseerElector;
227230

228231
private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
@@ -370,6 +373,11 @@ public ZkController(
370373
// Refuse to start if ZK has a non empty /clusterstate.json or a /solr.xml file
371374
checkNoOldClusterstate(zkClient);
372375

376+
this.distributedCommandRunner =
377+
cloudConfig.getDistributedCollectionConfigSetExecution()
378+
? Optional.of(new DistributedCollectionConfigSetCommandRunner(cc, zkClient))
379+
: Optional.empty();
380+
373381
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
374382
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
375383
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
@@ -681,6 +689,22 @@ public NodesSysPropsCacher getSysPropsCacher() {
681689
return sysPropsCacher;
682690
}
683691

692+
/** Non-empty if the Collection API is executed in a distributed way (Overseer is disabled). */
693+
public Optional<DistributedCollectionConfigSetCommandRunner> getDistributedCommandRunner() {
694+
return this.distributedCommandRunner;
695+
}
696+
697+
/** Waits for pending tasks to complete. Should be called before {@link #close()}. */
698+
public void waitForPendingTasksToComplete() {
699+
if (distributedCommandRunner.isPresent()) {
700+
// Local (i.e. distributed) Collection API processing
701+
distributedCommandRunner.get().stopAndWaitForPendingTasksToComplete();
702+
} else {
703+
// Overseer based processing
704+
getOverseerCollectionQueue().allowOverseerPendingTasksToComplete();
705+
}
706+
}
707+
684708
private ContextKey closeExistingElectionContext(CoreDescriptor cd, boolean sessionExpired) {
685709
// look for old context - if we find it, cancel it
686710
String collection = cd.getCloudDescriptor().getCollectionName();
@@ -779,6 +803,7 @@ public void close() {
779803
} finally {
780804

781805
sysPropsCacher.close();
806+
782807
customThreadPool.execute(() -> IOUtils.closeQuietly(cloudManager));
783808
customThreadPool.execute(() -> IOUtils.closeQuietly(cloudSolrClient));
784809

@@ -998,7 +1023,9 @@ private void init() {
9981023
checkForExistingEphemeralNode();
9991024
registerLiveNodesListener();
10001025

1001-
// start the overseer first as following code may need it's processing
1026+
// Start the overseer now since the following code may need it's processing.
1027+
// Note: even when using distributed processing, we still create an Overseer anyway since
1028+
// cluster singleton processing is linked to the elected Overseer.
10021029
if (!zkRunOnly) {
10031030
overseerElector = new LeaderElector(zkClient);
10041031
this.overseer =

solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionCommandContext.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,17 @@
2727

2828
public class DistributedCollectionCommandContext implements CollectionCommandContext {
2929
private final CoreContainer coreContainer;
30-
private final DistributedClusterStateUpdater getDistributedClusterStateUpdater;
30+
private final DistributedClusterStateUpdater distributedClusterStateUpdater;
3131
private final ExecutorService executorService;
3232

33-
private final SolrCloudManager solrCloudManager;
34-
private final ZkStateReader zkStateReader;
35-
3633
public DistributedCollectionCommandContext(
3734
CoreContainer coreContainer, ExecutorService executorService) {
35+
// note: coreContainer.getZkController() is not yet instantiated; don't call it right now
3836
this.coreContainer = coreContainer;
39-
this.getDistributedClusterStateUpdater =
37+
this.distributedClusterStateUpdater =
4038
new DistributedClusterStateUpdater(
4139
coreContainer.getConfig().getCloudConfig().getDistributedClusterStateUpdates());
42-
;
4340
this.executorService = executorService;
44-
45-
solrCloudManager = this.coreContainer.getZkController().getSolrCloudManager();
46-
zkStateReader = this.coreContainer.getZkController().getZkStateReader();
4741
}
4842

4943
@Override
@@ -60,7 +54,7 @@ public ShardHandler newShardHandler() {
6054

6155
@Override
6256
public SolrCloudManager getSolrCloudManager() {
63-
return solrCloudManager;
57+
return this.coreContainer.getZkController().getSolrCloudManager();
6458
}
6559

6660
@Override
@@ -70,12 +64,12 @@ public CoreContainer getCoreContainer() {
7064

7165
@Override
7266
public ZkStateReader getZkStateReader() {
73-
return zkStateReader;
67+
return this.coreContainer.getZkController().getZkStateReader();
7468
}
7569

7670
@Override
7771
public DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
78-
return this.getDistributedClusterStateUpdater;
72+
return this.distributedClusterStateUpdater;
7973
}
8074

8175
@Override

solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.solr.cloud.ZkDistributedCollectionLockFactory;
4545
import org.apache.solr.cloud.ZkDistributedConfigSetLockFactory;
4646
import org.apache.solr.common.SolrException;
47+
import org.apache.solr.common.cloud.SolrZkClient;
4748
import org.apache.solr.common.cloud.ZkNodeProps;
4849
import org.apache.solr.common.params.CollectionParams;
4950
import org.apache.solr.common.params.ConfigSetParams;
@@ -106,7 +107,9 @@ public class DistributedCollectionConfigSetCommandRunner {
106107

107108
private volatile boolean shuttingDown = false;
108109

109-
public DistributedCollectionConfigSetCommandRunner(CoreContainer coreContainer) {
110+
public DistributedCollectionConfigSetCommandRunner(
111+
CoreContainer coreContainer, SolrZkClient zkClient) {
112+
// note: coreContainer.getZkController() is not yet instantiated; don't call it right now
110113
this.coreContainer = coreContainer;
111114

112115
if (log.isInfoEnabled()) {
@@ -144,8 +147,7 @@ public DistributedCollectionConfigSetCommandRunner(CoreContainer coreContainer)
144147
new DistributedCollectionCommandContext(
145148
this.coreContainer, this.distributedCollectionApiExecutorService);
146149
commandMapper = new CollApiCmds.CommandMap(ccc);
147-
asyncTaskTracker =
148-
new DistributedApiAsyncTracker(ccc.getZkStateReader().getZkClient(), ZK_ASYNC_ROOT);
150+
asyncTaskTracker = new DistributedApiAsyncTracker(zkClient, ZK_ASYNC_ROOT);
149151
}
150152

151153
/** See {@link DistributedApiAsyncTracker#getAsyncTaskRequestStatus(String)} */

solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu
165165
throws Exception {
166166
// If Collection API execution is distributed, we're not running on the Overseer node so can't
167167
// return any Overseer stats.
168-
if (ccc.getCoreContainer().getDistributedCollectionCommandRunner().isPresent()) {
168+
if (ccc.getCoreContainer().getZkController().getDistributedCommandRunner().isPresent()) {
169169
// TODO: introduce a per node status command allowing insight into how Cluster state updates,
170170
// Collection API and config set API execution went on that node...
171171
return;

solr/core/src/java/org/apache/solr/core/CoreContainer.java

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.List;
4646
import java.util.Locale;
4747
import java.util.Map;
48-
import java.util.Optional;
4948
import java.util.Properties;
5049
import java.util.Set;
5150
import java.util.UUID;
@@ -77,9 +76,7 @@
7776
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
7877
import org.apache.solr.cloud.CloudDescriptor;
7978
import org.apache.solr.cloud.ClusterSingleton;
80-
import org.apache.solr.cloud.OverseerTaskQueue;
8179
import org.apache.solr.cloud.ZkController;
82-
import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner;
8380
import org.apache.solr.cluster.events.ClusterEventProducer;
8481
import org.apache.solr.cluster.events.impl.ClusterEventProducerFactory;
8582
import org.apache.solr.cluster.placement.PlacementPluginConfig;
@@ -326,14 +323,6 @@ && getZkController().getOverseer() != null
326323
private ExecutorService coreContainerAsyncTaskExecutor =
327324
ExecutorUtil.newMDCAwareCachedThreadPool("Core Container Async Task");
328325

329-
/**
330-
* Non-empty if the Collection API is executed in a distributed way and not on Overseer, once the
331-
* CoreContainer has been initialized properly, i.e. method {@link #load()} called. Until then it
332-
* is null, and it is not expected to be read.
333-
*/
334-
private volatile Optional<DistributedCollectionConfigSetCommandRunner>
335-
distributedCollectionCommandRunner;
336-
337326
private enum CoreInitFailedAction {
338327
fromleader,
339328
none
@@ -686,7 +675,6 @@ protected CoreContainer(Object testConstructor) {
686675
cfg = null;
687676
containerProperties = null;
688677
replayUpdatesExecutor = null;
689-
distributedCollectionCommandRunner = Optional.empty();
690678
allowPaths = null;
691679
allowListUrlChecker = null;
692680
indexSearcherExecutor = null;
@@ -900,19 +888,6 @@ private void loadInternal() {
900888
createHandler(
901889
ZK_STATUS_PATH, ZookeeperStatusHandler.class.getName(), ZookeeperStatusHandler.class);
902890

903-
// CoreContainer is initialized enough at this stage so we can set
904-
// distributedCollectionCommandRunner (the construction of
905-
// DistributedCollectionConfigSetCommandRunner uses Zookeeper so can't be done from the
906-
// CoreContainer constructor because there Zookeeper is not yet ready). Given this is used in
907-
// the CollectionsHandler created next line, this is the latest point where
908-
// distributedCollectionCommandRunner can be initialized without refactoring this method...
909-
// TODO: manage to completely build CoreContainer in the constructor and not in the load()
910-
// method... Requires some test refactoring.
911-
this.distributedCollectionCommandRunner =
912-
isZooKeeperAware() && cfg.getCloudConfig().getDistributedCollectionConfigSetExecution()
913-
? Optional.of(new DistributedCollectionConfigSetCommandRunner(this))
914-
: Optional.empty();
915-
916891
collectionsHandler =
917892
createHandler(
918893
COLLECTIONS_HANDLER_PATH, cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
@@ -1246,9 +1221,7 @@ protected void configure() {
12461221
throw new SolrException(ErrorCode.SERVER_ERROR, e);
12471222
}
12481223
}
1249-
if (!distributedCollectionCommandRunner.isPresent()) {
1250-
zkSys.getZkController().checkOverseerDesignate();
1251-
}
1224+
zkSys.getZkController().checkOverseerDesignate();
12521225
}
12531226

12541227
// This is a bit redundant but these are two distinct concepts for all they're accomplished at
@@ -1318,14 +1291,7 @@ public void shutdown() {
13181291

13191292
ZkController zkController = getZkController();
13201293
if (zkController != null) {
1321-
if (distributedCollectionCommandRunner.isPresent()) {
1322-
// Local (i.e. distributed) Collection API processing
1323-
distributedCollectionCommandRunner.get().stopAndWaitForPendingTasksToComplete();
1324-
} else {
1325-
// Overseer based processing
1326-
OverseerTaskQueue overseerCollectionQueue = zkController.getOverseerCollectionQueue();
1327-
overseerCollectionQueue.allowOverseerPendingTasksToComplete();
1328-
}
1294+
zkController.waitForPendingTasksToComplete();
13291295
}
13301296
if (log.isInfoEnabled()) {
13311297
log.info("Shutting down CoreContainer instance={}", System.identityHashCode(this));
@@ -2595,11 +2561,6 @@ public PlacementPluginFactory<? extends PlacementPluginConfig> getPlacementPlugi
25952561
return placementPluginFactory;
25962562
}
25972563

2598-
public Optional<DistributedCollectionConfigSetCommandRunner>
2599-
getDistributedCollectionCommandRunner() {
2600-
return this.distributedCollectionCommandRunner;
2601-
}
2602-
26032564
/**
26042565
* A general-purpose HTTP/2 Solr client.
26052566
*

0 commit comments

Comments
 (0)