Skip to content

Commit 8fccdfe

Browse files
authored
SOLR-15122 Replace sleeps with phaser await (#2291)
1 parent 40c5d6b commit 8fccdfe

File tree

4 files changed

+64
-96
lines changed

4 files changed

+64
-96
lines changed

solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.IOException;
3131
import java.lang.invoke.MethodHandles;
3232
import java.util.Set;
33+
import java.util.concurrent.Phaser;
3334

3435
/**
3536
* This implementation allows Solr to dynamically change the underlying implementation
@@ -40,7 +41,7 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
4041

4142
private ClusterEventProducer delegate;
4243
// support for tests to make sure the update is completed
43-
private volatile int version;
44+
private volatile Phaser phaser;
4445

4546
public DelegatingClusterEventProducer(CoreContainer cc) {
4647
super(cc);
@@ -56,6 +57,16 @@ public void close() throws IOException {
5657
super.close();
5758
}
5859

60+
/**
61+
* A phaser that will advance phases every time {@link #setDelegate(ClusterEventProducer)} is called.
62+
* Useful for allowing tests to know when a new delegate is finished getting set.
63+
*/
64+
@VisibleForTesting
65+
public void setDelegationPhaser(Phaser phaser) {
66+
phaser.register();
67+
this.phaser = phaser;
68+
}
69+
5970
public void setDelegate(ClusterEventProducer newDelegate) {
6071
if (log.isDebugEnabled()) {
6172
log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
@@ -90,7 +101,11 @@ public void setDelegate(ClusterEventProducer newDelegate) {
90101
log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
91102
}
92103
}
93-
this.version++;
104+
Phaser localPhaser = phaser; // volatile read
105+
if (localPhaser != null) {
106+
assert localPhaser.getRegisteredParties() == 1;
107+
localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
108+
}
94109
}
95110

96111
@Override
@@ -142,9 +157,4 @@ public synchronized void stop() {
142157
delegate.stop();
143158
state = delegate.getState();
144159
}
145-
146-
@VisibleForTesting
147-
public int getVersion() {
148-
return version;
149-
}
150160
}

solr/core/src/java/org/apache/solr/cluster/placement/impl/DelegatingPlacementPluginFactory.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import org.apache.solr.cluster.placement.PlacementPluginConfig;
2222
import org.apache.solr.cluster.placement.PlacementPluginFactory;
2323

24+
import java.util.concurrent.Phaser;
25+
2426
/**
2527
* Helper class to support dynamic reloading of plugin implementations.
2628
*/
2729
public final class DelegatingPlacementPluginFactory implements PlacementPluginFactory<PlacementPluginFactory.NoConfig> {
28-
2930
private volatile PlacementPluginFactory<? extends PlacementPluginConfig> delegate;
3031
// support for tests to make sure the update is completed
31-
private volatile int version;
32+
private volatile Phaser phaser;
3233

3334
@Override
3435
public PlacementPlugin createPluginInstance() {
@@ -39,18 +40,27 @@ public PlacementPlugin createPluginInstance() {
3940
}
4041
}
4142

43+
/**
44+
* A phaser that will advance phases every time {@link #setDelegate(PlacementPluginFactory)} is called.
45+
* Useful for allowing tests to know when a new delegate is finished getting set.
46+
*/
47+
@VisibleForTesting
48+
public void setDelegationPhaser(Phaser phaser) {
49+
phaser.register();
50+
this.phaser = phaser;
51+
}
52+
4253
public void setDelegate(PlacementPluginFactory<? extends PlacementPluginConfig> delegate) {
4354
this.delegate = delegate;
44-
this.version++;
55+
Phaser localPhaser = phaser; // volatile read
56+
if (localPhaser != null) {
57+
assert localPhaser.getRegisteredParties() == 1;
58+
localPhaser.arrive(); // we should be the only ones registered, so this will advance phase each time
59+
}
4560
}
4661

4762
@VisibleForTesting
4863
public PlacementPluginFactory<? extends PlacementPluginConfig> getDelegate() {
4964
return delegate;
5065
}
51-
52-
@VisibleForTesting
53-
public int getVersion() {
54-
return version;
55-
}
5666
}

solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,12 @@
2323
import org.apache.solr.client.solrj.request.beans.PluginMeta;
2424
import org.apache.solr.client.solrj.response.V2Response;
2525
import org.apache.solr.cloud.ClusterSingleton;
26-
import org.apache.solr.cloud.Overseer;
2726
import org.apache.solr.cloud.SolrCloudTestCase;
2827
import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
2928
import org.apache.solr.cluster.events.impl.DelegatingClusterEventProducer;
3029
import org.apache.solr.common.cloud.ClusterProperties;
31-
import org.apache.solr.common.util.TimeSource;
3230
import org.apache.solr.common.util.Utils;
3331
import org.apache.solr.util.LogLevel;
34-
import org.apache.solr.util.TimeOut;
3532
import org.junit.After;
3633
import org.junit.Before;
3734
import org.junit.BeforeClass;
@@ -47,8 +44,8 @@
4744
import java.util.List;
4845
import java.util.Map;
4946
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.Phaser;
5048
import java.util.concurrent.TimeUnit;
51-
import java.util.concurrent.TimeoutException;
5249

5350
import static java.util.Collections.singletonMap;
5451
import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
@@ -59,9 +56,8 @@
5956
*/
6057
@LogLevel("org.apache.solr.cluster.events=DEBUG")
6158
public class ClusterEventProducerTest extends SolrCloudTestCase {
62-
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
63-
6459
private AllEventsListener eventsListener;
60+
private Phaser phaser;
6561

6662
@BeforeClass
6763
public static void setupCluster() throws Exception {
@@ -77,6 +73,12 @@ public void setUp() throws Exception {
7773
cluster.deleteAllCollections();
7874
eventsListener = new AllEventsListener();
7975
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
76+
ClusterEventProducer clusterEventProducer = cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer();
77+
assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
78+
clusterEventProducer instanceof DelegatingClusterEventProducer);
79+
DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
80+
phaser = new Phaser();
81+
wrapper.setDelegationPhaser(phaser);
8082
}
8183

8284
@After
@@ -102,7 +104,7 @@ public void teardown() throws Exception {
102104

103105
@Test
104106
public void testEvents() throws Exception {
105-
int version = waitForVersionChange(-1, 10);
107+
int version = phaser.getPhase();
106108

107109
PluginMeta plugin = new PluginMeta();
108110
plugin.klass = DefaultClusterEventProducer.class.getName();
@@ -114,7 +116,7 @@ public void testEvents() throws Exception {
114116
V2Response rsp = req.process(cluster.getSolrClient());
115117
assertEquals(0, rsp.getStatus());
116118

117-
version = waitForVersionChange(version, 10);
119+
phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
118120

119121
// NODES_DOWN
120122

@@ -281,7 +283,7 @@ public void close() throws IOException {
281283

282284
@Test
283285
public void testListenerPlugins() throws Exception {
284-
int version = waitForVersionChange(-1, 10);
286+
int version = phaser.getPhase();
285287

286288
PluginMeta plugin = new PluginMeta();
287289
plugin.klass = DefaultClusterEventProducer.class.getName();
@@ -292,7 +294,7 @@ public void testListenerPlugins() throws Exception {
292294
.build();
293295
V2Response rsp = req.process(cluster.getSolrClient());
294296
assertEquals(0, rsp.getStatus());
295-
version = waitForVersionChange(-1, 10);
297+
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
296298

297299
plugin = new PluginMeta();
298300
plugin.name = "testplugin";
@@ -350,7 +352,7 @@ public void testListenerPlugins() throws Exception {
350352
.withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
351353
.build();
352354
req.process(cluster.getSolrClient());
353-
version = waitForVersionChange(-1, 10);
355+
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
354356

355357
dummyEventLatch = new CountDownLatch(1);
356358
lastEvent = null;
@@ -371,7 +373,7 @@ public void testListenerPlugins() throws Exception {
371373
.build();
372374
rsp = req.process(cluster.getSolrClient());
373375
assertEquals(0, rsp.getStatus());
374-
version = waitForVersionChange(-1, 10);
376+
phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
375377

376378
dummyEventLatch = new CountDownLatch(1);
377379
lastEvent = null;
@@ -384,29 +386,4 @@ public void testListenerPlugins() throws Exception {
384386
assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
385387
assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
386388
}
387-
388-
private int waitForVersionChange(int currentVersion, int timeoutSec) throws Exception {
389-
TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
390-
Overseer overseer = cluster.getOpenOverseer();
391-
if (overseer == null) {
392-
throw new Exception("no overseer");
393-
}
394-
ClusterEventProducer clusterEventProducer = overseer.getCoreContainer().getClusterEventProducer();
395-
assertTrue("not a delegating producer? " + clusterEventProducer.getClass(),
396-
clusterEventProducer instanceof DelegatingClusterEventProducer);
397-
DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer) clusterEventProducer;
398-
while (!timeout.hasTimedOut()) {
399-
int newVersion = wrapper.getVersion();
400-
if (newVersion < currentVersion) {
401-
throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
402-
" newVersion=" + newVersion);
403-
} else if (currentVersion < newVersion) {
404-
log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
405-
return newVersion;
406-
}
407-
timeout.sleep(200);
408-
}
409-
throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
410-
411-
}
412389
}

solr/core/src/test/org/apache/solr/cluster/placement/impl/PlacementPluginIntegrationTest.java

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,20 @@
4141
import org.apache.solr.cluster.placement.plugins.MinimizeCoresPlacementFactory;
4242
import org.apache.solr.common.cloud.ClusterState;
4343
import org.apache.solr.common.cloud.DocCollection;
44-
import org.apache.solr.common.util.TimeSource;
4544
import org.apache.solr.core.CoreContainer;
4645
import org.apache.solr.util.LogLevel;
47-
import org.apache.solr.util.TimeOut;
4846

4947
import org.junit.After;
5048
import org.junit.BeforeClass;
5149
import org.junit.Test;
52-
import org.slf4j.Logger;
53-
import org.slf4j.LoggerFactory;
5450

55-
import java.lang.invoke.MethodHandles;
5651
import java.util.Arrays;
5752
import java.util.HashMap;
5853
import java.util.HashSet;
5954
import java.util.Map;
6055
import java.util.Optional;
6156
import java.util.Set;
57+
import java.util.concurrent.Phaser;
6258
import java.util.concurrent.TimeUnit;
6359
import java.util.concurrent.TimeoutException;
6460
import java.util.concurrent.atomic.AtomicInteger;
@@ -70,8 +66,6 @@
7066
*/
7167
@LogLevel("org.apache.solr.cluster.placement.impl=DEBUG")
7268
public class PlacementPluginIntegrationTest extends SolrCloudTestCase {
73-
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
74-
7569
private static final String COLLECTION = PlacementPluginIntegrationTest.class.getSimpleName() + "_collection";
7670

7771
private static SolrCloudManager cloudManager;
@@ -145,14 +139,15 @@ public void testMinimizeCores() throws Exception {
145139
}
146140

147141
@Test
148-
@SuppressWarnings("unchecked")
149142
public void testDynamicReconfiguration() throws Exception {
150143
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
151144
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
152145
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
146+
Phaser phaser = new Phaser();
147+
wrapper.setDelegationPhaser(phaser);
153148

154-
int version = wrapper.getVersion();
155-
log.debug("--initial version={}", version);
149+
int version = phaser.getPhase();
150+
assertTrue("wrong version " + version, version > -1);
156151

157152
PluginMeta plugin = new PluginMeta();
158153
plugin.name = PlacementPluginFactory.PLUGIN_NAME;
@@ -164,9 +159,7 @@ public void testDynamicReconfiguration() throws Exception {
164159
.build();
165160
req.process(cluster.getSolrClient());
166161

167-
version = waitForVersionChange(version, wrapper, 10);
168-
169-
assertTrue("wrong version " + version, version > 0);
162+
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
170163
PlacementPluginFactory<? extends PlacementPluginConfig> factory = wrapper.getDelegate();
171164
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof MinimizeCoresPlacementFactory);
172165

@@ -180,7 +173,7 @@ public void testDynamicReconfiguration() throws Exception {
180173
.build();
181174
req.process(cluster.getSolrClient());
182175

183-
version = waitForVersionChange(version, wrapper, 10);
176+
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
184177

185178
factory = wrapper.getDelegate();
186179
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
@@ -197,7 +190,7 @@ public void testDynamicReconfiguration() throws Exception {
197190
.build();
198191
req.process(cluster.getSolrClient());
199192

200-
version = waitForVersionChange(version, wrapper, 10);
193+
version = phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
201194
factory = wrapper.getDelegate();
202195
assertTrue("wrong type " + factory.getClass().getName(), factory instanceof AffinityPlacementFactory);
203196
config = ((AffinityPlacementFactory) factory).getConfig();
@@ -212,22 +205,16 @@ public void testDynamicReconfiguration() throws Exception {
212205
.withPayload(singletonMap("add", plugin))
213206
.build();
214207
req.process(cluster.getSolrClient());
215-
try {
216-
int newVersion = waitForVersionChange(version, wrapper, 5);
217-
if (newVersion != version) {
218-
fail("factory configuration updated but plugin name was wrong: " + plugin);
219-
}
220-
} catch (TimeoutException te) {
221-
// expected
222-
}
208+
final int oldVersion = version;
209+
expectThrows(TimeoutException.class, () -> phaser.awaitAdvanceInterruptibly(oldVersion, 5, TimeUnit.SECONDS));
223210
// remove plugin
224211
req = new V2Request.Builder("/cluster/plugin")
225212
.forceV2(true)
226213
.POST()
227214
.withPayload("{remove: '" + PlacementPluginFactory.PLUGIN_NAME + "'}")
228215
.build();
229216
req.process(cluster.getSolrClient());
230-
waitForVersionChange(version, wrapper, 10);
217+
phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
231218
factory = wrapper.getDelegate();
232219
assertNull("no factory should be present", factory);
233220
}
@@ -237,9 +224,10 @@ public void testWithCollectionIntegration() throws Exception {
237224
PlacementPluginFactory<? extends PlacementPluginConfig> pluginFactory = cc.getPlacementPluginFactory();
238225
assertTrue("wrong type " + pluginFactory.getClass().getName(), pluginFactory instanceof DelegatingPlacementPluginFactory);
239226
DelegatingPlacementPluginFactory wrapper = (DelegatingPlacementPluginFactory) pluginFactory;
227+
Phaser phaser = new Phaser();
228+
wrapper.setDelegationPhaser(phaser);
240229

241-
int version = wrapper.getVersion();
242-
log.debug("--initial version={}", version);
230+
int version = phaser.getPhase();
243231

244232
Set<String> nodeSet = new HashSet<>();
245233
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
@@ -261,7 +249,7 @@ public void testWithCollectionIntegration() throws Exception {
261249
.build();
262250
req.process(cluster.getSolrClient());
263251

264-
version = waitForVersionChange(version, wrapper, 10);
252+
phaser.awaitAdvanceInterruptibly(version, 10, TimeUnit.SECONDS);
265253

266254
CollectionAdminResponse rsp = CollectionAdminRequest.createCollection(SECONDARY_COLLECTION, "conf", 1, 3)
267255
.process(cluster.getSolrClient());
@@ -398,21 +386,4 @@ public void testAttributeFetcherImpl() throws Exception {
398386
});
399387
});
400388
}
401-
402-
private int waitForVersionChange(int currentVersion, DelegatingPlacementPluginFactory wrapper, int timeoutSec) throws Exception {
403-
TimeOut timeout = new TimeOut(timeoutSec, TimeUnit.SECONDS, TimeSource.NANO_TIME);
404-
405-
while (!timeout.hasTimedOut()) {
406-
int newVersion = wrapper.getVersion();
407-
if (newVersion < currentVersion) {
408-
throw new Exception("Invalid version - went back! currentVersion=" + currentVersion +
409-
" newVersion=" + newVersion);
410-
} else if (currentVersion < newVersion) {
411-
log.debug("--current version was {}, new version is {}", currentVersion, newVersion);
412-
return newVersion;
413-
}
414-
timeout.sleep(200);
415-
}
416-
throw new TimeoutException("version didn't change in time, currentVersion=" + currentVersion);
417-
}
418389
}

0 commit comments

Comments
 (0)