Skip to content

Commit 40c5d6b

Browse files
authored
SOLR-14253 Avoid writes in ZKSR.waitForState (#2297)
1 parent d693a61 commit 40c5d6b

File tree

4 files changed

+24
-39
lines changed

4 files changed

+24
-39
lines changed

solr/core/src/java/org/apache/solr/cloud/ZkController.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,12 +1698,11 @@ private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
16981698
private void waitForCoreNodeName(CoreDescriptor descriptor) {
16991699
log.debug("waitForCoreNodeName >>> look for our core node name");
17001700
try {
1701-
zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS, c -> {
1702-
String name = ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName());
1703-
if (name == null) return false;
1704-
descriptor.getCloudDescriptor().setCoreNodeName(name);
1705-
return true;
1706-
});
1701+
DocCollection collection = zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS,
1702+
c -> ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName()) != null);
1703+
// Read outside of the predicate to avoid multiple potential writes
1704+
String name = ClusterStateMutator.getAssignedCoreNodeName(collection, getNodeName(), descriptor.getName());
1705+
descriptor.getCloudDescriptor().setCoreNodeName(name);
17071706
} catch (TimeoutException | InterruptedException e) {
17081707
SolrZkClient.checkInterrupted(e);
17091708
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for collection state", e);
@@ -1716,15 +1715,10 @@ private void waitForShardId(final CoreDescriptor cd) {
17161715
log.debug("waiting to find shard id in clusterstate for {}", cd.getName());
17171716
}
17181717
try {
1719-
zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, c -> {
1720-
if (c == null) return false;
1721-
final String shardId = c.getShardId(getNodeName(), cd.getName());
1722-
if (shardId != null) {
1723-
cd.getCloudDescriptor().setShardId(shardId);
1724-
return true;
1725-
}
1726-
return false;
1727-
});
1718+
DocCollection collection = zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS,
1719+
c -> c != null && c.getShardId(getNodeName(), cd.getName()) != null);
1720+
// Read outside of the predicate to avoid multiple potential writes
1721+
cd.getCloudDescriptor().setShardId(collection.getShardId(getNodeName(), cd.getName()));
17281722
} catch (TimeoutException | InterruptedException e) {
17291723
SolrZkClient.checkInterrupted(e);
17301724
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed getting shard id for core: " + cd.getName(), e);
@@ -1814,10 +1808,8 @@ private void checkStateInZk(CoreDescriptor cd) throws InterruptedException, NotI
18141808
}
18151809

18161810
AtomicReference<String> errorMessage = new AtomicReference<>();
1817-
AtomicReference<DocCollection> collectionState = new AtomicReference<>();
18181811
try {
18191812
zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
1820-
collectionState.set(c);
18211813
if (c == null)
18221814
return false;
18231815
Slice slice = c.getSlice(cloudDesc.getShardId());

solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void call(ClusterState state, ZkNodeProps message, @SuppressWarnings({"ra
153153
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
154154

155155
// wait for a while until we don't see the collection
156-
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
156+
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, Objects::isNull);
157157

158158
// we can delete any remaining unique aliases
159159
if (!aliasReferences.isEmpty()) {

solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import java.util.Map;
2828
import java.util.Random;
2929
import java.util.Set;
30+
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.SynchronousQueue;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.TimeoutException;
34-
import java.util.concurrent.atomic.AtomicReference;
3535

3636
import com.google.common.collect.ImmutableMap;
3737
import org.apache.commons.lang3.StringUtils;
@@ -487,21 +487,15 @@ static UpdateResponse softCommit(String url) throws SolrServerException, IOExcep
487487
}
488488

489489
String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
490-
AtomicReference<String> coreNodeName = new AtomicReference<>();
491490
try {
492-
zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> {
493-
String name = ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore);
494-
if (name == null) {
495-
return false;
496-
}
497-
coreNodeName.set(name);
498-
return true;
499-
});
491+
DocCollection collection = zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c ->
492+
ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore) != null
493+
);
494+
return ClusterStateMutator.getAssignedCoreNodeName(collection, msgNodeName, msgCore);
500495
} catch (TimeoutException | InterruptedException e) {
501496
SolrZkClient.checkInterrupted(e);
502497
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for coreNodeName", e);
503498
}
504-
return coreNodeName.get();
505499
}
506500

507501
ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
@@ -609,26 +603,23 @@ void cleanupCollection(String collectionName, @SuppressWarnings({"rawtypes"})Nam
609603

610604
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
611605
assert coreNames.size() > 0;
612-
Map<String, Replica> results = new HashMap<>();
613-
AtomicReference<DocCollection> lastState = new AtomicReference<>();
606+
Map<String, Replica> results = new ConcurrentHashMap<>();
614607

615608
long maxWait = Long.getLong("solr.waitToSeeReplicasInStateTimeoutSeconds", 120); // could be a big cluster
616609
try {
617610
zkStateReader.waitForState(collectionName, maxWait, TimeUnit.SECONDS, c -> {
618611
if (c == null) return false;
619612

613+
// We write into a ConcurrentHashMap, which will be ok if called multiple times by multiple threads
620614
c.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
621-
.filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for...
622-
.filter(r -> !results.containsKey(r.getCoreName())) // ...but not the ones we've seen already...
623-
.forEach(r -> results.put(r.getCoreName(), r)); // ...get added to the map
615+
.filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for...
616+
.forEach(r -> results.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
624617

625-
lastState.set(c);
626618
log.debug("Expecting {} cores, found {}", coreNames, results);
627619
return results.size() == coreNames.size();
628620
});
629621
} catch (TimeoutException e) {
630-
String error = "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + lastState.get();
631-
throw new SolrException(ErrorCode.SERVER_ERROR, error);
622+
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
632623
}
633624

634625
return results;

solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,16 +1698,18 @@ public void waitForState(final String collection, long wait, TimeUnit unit, Coll
16981698
* <p>
16991699
* Note that the predicate may be called again even after it has returned true, so
17001700
* implementors should avoid changing state within the predicate call itself.
1701+
* The predicate may also be called concurrently when multiple state changes are seen in rapid succession.
17011702
* </p>
17021703
*
17031704
* @param collection the collection to watch
17041705
* @param wait how long to wait
17051706
* @param unit the units of the wait parameter
17061707
* @param predicate the predicate to call on state changes
1708+
* @return the state of the doc collection after the predicate succeeds
17071709
* @throws InterruptedException on interrupt
17081710
* @throws TimeoutException on timeout
17091711
*/
1710-
public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
1712+
public DocCollection waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
17111713
throws InterruptedException, TimeoutException {
17121714
if (log.isDebugEnabled()) {
17131715
log.debug("Waiting up to {}ms for state {}", unit.toMillis(wait), predicate);
@@ -1733,7 +1735,7 @@ public void waitForState(final String collection, long wait, TimeUnit unit, Pred
17331735
// wait for the watcher predicate to return true, or time out
17341736
if (!latch.await(wait, unit))
17351737
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
1736-
1738+
return docCollection.get();
17371739
} finally {
17381740
removeDocCollectionWatcher(collection, watcher);
17391741
waitLatches.remove(latch);

0 commit comments

Comments
 (0)