Skip to content

Commit ca58f1a

Browse files
SOLR-16403: ClusterSingleton to remove inactive Shards (#1926)
ClusterSingleton that periodically removes state=INACTIVE shards. These occur from shard splits. Co-authored-by: Paul McArthur <[email protected]>
1 parent c8c6001 commit ca58f1a

File tree

8 files changed

+686
-41
lines changed

8 files changed

+686
-41
lines changed

solr/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ New Features
9090

9191
* SOLR-599: Add a new SolrJ client using the JDK’s built-in Http Client. (James Dyer)
9292

93+
* SOLR-16403: A new cluster singleton plugin to automatically remove inactive shards. (Paul McArthur, David Smiley)
94+
9395
Improvements
9496
---------------------
9597
* SOLR-17119: When registering or updating a ConfigurablePlugin through the `/cluster/plugin` API,
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.solr.cluster.maintenance;
19+
20+
import com.google.common.annotations.VisibleForTesting;
21+
import java.lang.invoke.MethodHandles;
22+
import java.util.Collection;
23+
import java.util.HashSet;
24+
import java.util.Objects;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.stream.Collectors;
29+
import org.apache.solr.api.ConfigurablePlugin;
30+
import org.apache.solr.client.solrj.SolrResponse;
31+
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
32+
import org.apache.solr.cloud.ClusterSingleton;
33+
import org.apache.solr.common.cloud.ClusterState;
34+
import org.apache.solr.common.cloud.DocCollection;
35+
import org.apache.solr.common.cloud.Slice;
36+
import org.apache.solr.common.cloud.ZkStateReader;
37+
import org.apache.solr.common.util.ExecutorUtil;
38+
import org.apache.solr.common.util.SolrNamedThreadFactory;
39+
import org.apache.solr.core.CoreContainer;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
/**
44+
* This Cluster Singleton can be configured to periodically find and remove {@link
45+
* org.apache.solr.common.cloud.Slice.State#INACTIVE} Shards that are left behind after a Shard is
46+
* split
47+
*/
48+
public class InactiveShardRemover
49+
implements ClusterSingleton, ConfigurablePlugin<InactiveShardRemoverConfig> {
50+
51+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
52+
53+
public static final String PLUGIN_NAME = ".inactive-shard-remover";
54+
55+
static class DeleteActor {
56+
57+
private final CoreContainer coreContainer;
58+
59+
DeleteActor(final CoreContainer coreContainer) {
60+
this.coreContainer = coreContainer;
61+
}
62+
63+
void delete(final Slice slice) {
64+
CollectionAdminRequest.DeleteShard deleteRequest =
65+
CollectionAdminRequest.deleteShard(slice.getCollection(), slice.getName());
66+
try {
67+
SolrResponse response =
68+
coreContainer.getZkController().getSolrCloudManager().request(deleteRequest);
69+
if (response.getException() != null) {
70+
throw response.getException();
71+
}
72+
} catch (Exception e) {
73+
log.warn("An exception occurred when deleting an inactive shard", e);
74+
}
75+
}
76+
}
77+
78+
private State state = State.STOPPED;
79+
80+
private final CoreContainer coreContainer;
81+
82+
private final DeleteActor deleteActor;
83+
84+
private ScheduledExecutorService executor;
85+
86+
private long scheduleIntervalSeconds;
87+
88+
private long ttlSeconds;
89+
90+
private int maxDeletesPerCycle;
91+
92+
/** Constructor invoked via Reflection */
93+
public InactiveShardRemover(final CoreContainer cc) {
94+
this(cc, new DeleteActor(cc));
95+
}
96+
97+
public InactiveShardRemover(final CoreContainer cc, final DeleteActor deleteActor) {
98+
this.coreContainer = cc;
99+
this.deleteActor = deleteActor;
100+
}
101+
102+
@Override
103+
public void configure(final InactiveShardRemoverConfig cfg) {
104+
Objects.requireNonNull(cfg, "config must be specified");
105+
cfg.validate();
106+
this.scheduleIntervalSeconds = cfg.scheduleIntervalSeconds;
107+
this.maxDeletesPerCycle = cfg.maxDeletesPerCycle;
108+
this.ttlSeconds = cfg.ttlSeconds;
109+
}
110+
111+
@Override
112+
public String getName() {
113+
return PLUGIN_NAME;
114+
}
115+
116+
@Override
117+
public State getState() {
118+
return state;
119+
}
120+
121+
@Override
122+
public void start() throws Exception {
123+
state = State.STARTING;
124+
executor = Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory(PLUGIN_NAME));
125+
executor.scheduleAtFixedRate(
126+
this::deleteInactiveSlices,
127+
scheduleIntervalSeconds,
128+
scheduleIntervalSeconds,
129+
TimeUnit.SECONDS);
130+
state = State.RUNNING;
131+
}
132+
133+
@Override
134+
public void stop() {
135+
if (state == State.RUNNING) {
136+
state = State.STOPPING;
137+
ExecutorUtil.shutdownNowAndAwaitTermination(executor);
138+
}
139+
state = State.STOPPED;
140+
}
141+
142+
@VisibleForTesting
143+
void deleteInactiveSlices() {
144+
final ClusterState clusterState = coreContainer.getZkController().getClusterState();
145+
Collection<Slice> inactiveSlices =
146+
clusterState.getCollectionsMap().values().stream()
147+
.flatMap(v -> collectInactiveSlices(v).stream())
148+
.collect(Collectors.toSet());
149+
150+
if (log.isInfoEnabled()) {
151+
log.info(
152+
"Found {} inactive Shards to delete, {} will be deleted",
153+
inactiveSlices.size(),
154+
Math.min(inactiveSlices.size(), maxDeletesPerCycle));
155+
}
156+
157+
inactiveSlices.stream().limit(maxDeletesPerCycle).forEach(this::deleteShard);
158+
}
159+
160+
private Collection<Slice> collectInactiveSlices(final DocCollection docCollection) {
161+
final Collection<Slice> slices = new HashSet<>(docCollection.getSlices());
162+
slices.removeAll(docCollection.getActiveSlices());
163+
return slices.stream().filter(this::isExpired).collect(Collectors.toSet());
164+
}
165+
166+
private void deleteShard(final Slice s) {
167+
deleteActor.delete(s);
168+
}
169+
170+
/**
171+
* An Inactive Shard is expired if it has not undergone a state change in the period of time
172+
* defined by {@link InactiveShardRemover#ttlSeconds}. If it is expired, it is eligible for
173+
* removal.
174+
*/
175+
private boolean isExpired(final Slice slice) {
176+
177+
final String collectionName = slice.getCollection();
178+
final String sliceName = slice.getName();
179+
180+
if (slice.getState() != Slice.State.INACTIVE) {
181+
return false;
182+
}
183+
184+
final String lastChangeTimestamp = slice.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
185+
if (lastChangeTimestamp == null || lastChangeTimestamp.isEmpty()) {
186+
log.warn(
187+
"Collection {} Shard {} has no last change timestamp and will not be deleted",
188+
collectionName,
189+
sliceName);
190+
return false;
191+
}
192+
193+
final long epochTimestampNs;
194+
try {
195+
epochTimestampNs = Long.parseLong(lastChangeTimestamp);
196+
} catch (NumberFormatException e) {
197+
log.warn(
198+
"Collection {} Shard {} has an invalid last change timestamp and will not be deleted",
199+
collectionName,
200+
sliceName);
201+
return false;
202+
}
203+
204+
long currentEpochTimeNs =
205+
coreContainer.getZkController().getSolrCloudManager().getTimeSource().getEpochTimeNs();
206+
long delta = TimeUnit.NANOSECONDS.toSeconds(currentEpochTimeNs - epochTimestampNs);
207+
208+
boolean expired = delta >= ttlSeconds;
209+
if (log.isDebugEnabled()) {
210+
log.debug(
211+
"collection {} shard {} last state change {} seconds ago. Expired={}",
212+
slice.getCollection(),
213+
slice.getName(),
214+
delta,
215+
expired);
216+
}
217+
return expired;
218+
}
219+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.solr.cluster.maintenance;
19+
20+
import org.apache.solr.common.SolrException;
21+
import org.apache.solr.common.annotation.JsonProperty;
22+
import org.apache.solr.common.util.ReflectMapWriter;
23+
24+
public class InactiveShardRemoverConfig implements ReflectMapWriter {
25+
26+
public static final long DEFAULT_SCHEDULE_INTERVAL_SECONDS = 900L; // 15 minutes
27+
28+
public static final long DEFAULT_TTL_SECONDS = 900L; // 15 minutes
29+
30+
public static final int DEFAULT_MAX_DELETES_PER_CYCLE = 20;
31+
32+
@JsonProperty public long scheduleIntervalSeconds;
33+
34+
@JsonProperty public long ttlSeconds;
35+
36+
@JsonProperty public int maxDeletesPerCycle;
37+
38+
/** Default constructor required for deserialization */
39+
public InactiveShardRemoverConfig() {
40+
this(DEFAULT_SCHEDULE_INTERVAL_SECONDS, DEFAULT_TTL_SECONDS, DEFAULT_MAX_DELETES_PER_CYCLE);
41+
}
42+
43+
public InactiveShardRemoverConfig(
44+
final long scheduleIntervalSeconds, final long ttlSeconds, final int maxDeletesPerCycle) {
45+
this.scheduleIntervalSeconds = scheduleIntervalSeconds;
46+
this.ttlSeconds = ttlSeconds;
47+
this.maxDeletesPerCycle = maxDeletesPerCycle;
48+
}
49+
50+
public void validate() {
51+
if (scheduleIntervalSeconds <= 0) {
52+
throw new SolrException(
53+
SolrException.ErrorCode.BAD_REQUEST, "scheduleIntervalSeconds must be greater than 0");
54+
}
55+
if (maxDeletesPerCycle <= 0) {
56+
throw new SolrException(
57+
SolrException.ErrorCode.BAD_REQUEST, "maxDeletesPerCycle must be greater than 0");
58+
}
59+
}
60+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/** Cluster Singleton plugins that are used to perform maintenance tasks within the cluster. */
19+
package org.apache.solr.cluster.maintenance;

solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,12 @@
1818

1919
import java.io.IOException;
2020
import org.apache.solr.client.solrj.SolrServerException;
21-
import org.apache.solr.client.solrj.cloud.DistributedQueue;
2221
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
2322
import org.apache.solr.client.solrj.request.CoreStatus;
24-
import org.apache.solr.cloud.overseer.OverseerAction;
25-
import org.apache.solr.common.MapWriter;
2623
import org.apache.solr.common.cloud.DocCollection;
2724
import org.apache.solr.common.cloud.Replica;
2825
import org.apache.solr.common.cloud.Slice;
2926
import org.apache.solr.common.cloud.Slice.State;
30-
import org.apache.solr.common.cloud.ZkNodeProps;
31-
import org.apache.solr.common.cloud.ZkStateReader;
3227
import org.apache.solr.util.FileUtils;
3328
import org.junit.After;
3429
import org.junit.Before;
@@ -68,54 +63,20 @@ public void test() throws Exception {
6863
CollectionAdminRequest.deleteShard(collection, "shard1")
6964
.process(cluster.getSolrClient()));
7065

71-
setSliceState(collection, "shard1", Slice.State.INACTIVE);
66+
ShardTestUtil.setSliceState(cluster, collection, "shard1", Slice.State.INACTIVE);
7267

7368
// Can delete an INACTIVE shard
7469
CollectionAdminRequest.deleteShard(collection, "shard1").process(cluster.getSolrClient());
7570
waitForState(
7671
"Expected 'shard1' to be removed", collection, (n, c) -> c.getSlice("shard1") == null);
7772

7873
// Can delete a shard under construction
79-
setSliceState(collection, "shard2", Slice.State.CONSTRUCTION);
74+
ShardTestUtil.setSliceState(cluster, collection, "shard2", Slice.State.CONSTRUCTION);
8075
CollectionAdminRequest.deleteShard(collection, "shard2").process(cluster.getSolrClient());
8176
waitForState(
8277
"Expected 'shard2' to be removed", collection, (n, c) -> c.getSlice("shard2") == null);
8378
}
8479

85-
protected void setSliceState(String collection, String slice, State state) throws Exception {
86-
87-
// TODO can this be encapsulated better somewhere?
88-
MapWriter m =
89-
ew ->
90-
ew.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower())
91-
.put(slice, state.toString())
92-
.put(ZkStateReader.COLLECTION_PROP, collection);
93-
final Overseer overseer = cluster.getOpenOverseer();
94-
if (overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
95-
overseer
96-
.getDistributedClusterStateUpdater()
97-
.doSingleStateUpdate(
98-
DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState,
99-
new ZkNodeProps(m),
100-
cluster.getOpenOverseer().getSolrCloudManager(),
101-
cluster.getOpenOverseer().getZkStateReader());
102-
} else {
103-
DistributedQueue inQueue =
104-
cluster
105-
.getJettySolrRunner(0)
106-
.getCoreContainer()
107-
.getZkController()
108-
.getOverseer()
109-
.getStateUpdateQueue();
110-
inQueue.offer(m);
111-
}
112-
113-
waitForState(
114-
"Expected shard " + slice + " to be in state " + state,
115-
collection,
116-
(n, c) -> c.getSlice(slice).getState() == state);
117-
}
118-
11980
@Test
12081
public void testDirectoryCleanupAfterDeleteShard() throws IOException, SolrServerException {
12182

@@ -162,4 +123,13 @@ public void testDirectoryCleanupAfterDeleteShard() throws IOException, SolrServe
162123
"Instance directory still exists", FileUtils.fileExists(coreStatus.getInstanceDirectory()));
163124
assertTrue("Data directory still exists", FileUtils.fileExists(coreStatus.getDataDirectory()));
164125
}
126+
127+
private void setSliceState(String collectionName, String shardId, Slice.State state)
128+
throws Exception {
129+
ShardTestUtil.setSliceState(cluster, collectionName, shardId, state);
130+
waitForState(
131+
"Expected shard " + shardId + " to be in state " + state,
132+
collectionName,
133+
(n, c) -> c.getSlice(shardId).getState() == state);
134+
}
165135
}

0 commit comments

Comments
 (0)