23
23
import org .apache .solr .client .solrj .request .beans .PluginMeta ;
24
24
import org .apache .solr .client .solrj .response .V2Response ;
25
25
import org .apache .solr .cloud .ClusterSingleton ;
26
+ import org .apache .solr .cloud .Overseer ;
26
27
import org .apache .solr .cloud .SolrCloudTestCase ;
27
28
import org .apache .solr .cluster .events .impl .DefaultClusterEventProducer ;
29
+ import org .apache .solr .cluster .events .impl .DelegatingClusterEventProducer ;
28
30
import org .apache .solr .common .cloud .ClusterProperties ;
31
+ import org .apache .solr .common .util .TimeSource ;
29
32
import org .apache .solr .common .util .Utils ;
30
33
import org .apache .solr .util .LogLevel ;
34
+ import org .apache .solr .util .TimeOut ;
31
35
import org .junit .After ;
32
36
import org .junit .Before ;
33
37
import org .junit .BeforeClass ;
44
48
import java .util .Map ;
45
49
import java .util .concurrent .CountDownLatch ;
46
50
import java .util .concurrent .TimeUnit ;
51
+ import java .util .concurrent .TimeoutException ;
47
52
48
53
import static java .util .Collections .singletonMap ;
49
54
import static org .apache .solr .client .solrj .SolrRequest .METHOD .GET ;
54
59
*/
55
60
@ LogLevel ("org.apache.solr.cluster.events=DEBUG" )
56
61
public class ClusterEventProducerTest extends SolrCloudTestCase {
62
+ private static final Logger log = LoggerFactory .getLogger (MethodHandles .lookup ().lookupClass ());
57
63
58
64
private AllEventsListener eventsListener ;
59
65
@@ -96,6 +102,8 @@ public void teardown() throws Exception {
96
102
97
103
@ Test
98
104
public void testEvents () throws Exception {
105
+ int version = waitForVersionChange (-1 , 10 );
106
+
99
107
PluginMeta plugin = new PluginMeta ();
100
108
plugin .klass = DefaultClusterEventProducer .class .getName ();
101
109
plugin .name = ClusterEventProducer .PLUGIN_NAME ;
@@ -106,6 +114,8 @@ public void testEvents() throws Exception {
106
114
V2Response rsp = req .process (cluster .getSolrClient ());
107
115
assertEquals (0 , rsp .getStatus ());
108
116
117
+ version = waitForVersionChange (version , 10 );
118
+
109
119
// NODES_DOWN
110
120
111
121
eventsListener .setExpectedType (ClusterEvent .EventType .NODES_DOWN );
@@ -115,10 +125,12 @@ public void testEvents() throws Exception {
115
125
for (JettySolrRunner jetty : cluster .getJettySolrRunners ()) {
116
126
if (cluster .getOpenOverseer ().getCoreContainer ().getZkController ().getNodeName ().equals (jetty .getNodeName ())) {
117
127
continue ;
128
+ } else {
129
+ nonOverseerJetty = jetty ;
130
+ break ;
118
131
}
119
- nonOverseerJetty = jetty ;
120
- break ;
121
132
}
133
+
122
134
String nodeName = nonOverseerJetty .getNodeName ();
123
135
cluster .stopJettySolrRunner (nonOverseerJetty );
124
136
cluster .waitForJettyToStop (nonOverseerJetty );
@@ -269,6 +281,8 @@ public void close() throws IOException {
269
281
270
282
@ Test
271
283
public void testListenerPlugins () throws Exception {
284
+ int version = waitForVersionChange (-1 , 10 );
285
+
272
286
PluginMeta plugin = new PluginMeta ();
273
287
plugin .klass = DefaultClusterEventProducer .class .getName ();
274
288
plugin .name = ClusterEventProducer .PLUGIN_NAME ;
@@ -278,6 +292,7 @@ public void testListenerPlugins() throws Exception {
278
292
.build ();
279
293
V2Response rsp = req .process (cluster .getSolrClient ());
280
294
assertEquals (0 , rsp .getStatus ());
295
+ version = waitForVersionChange (-1 , 10 );
281
296
282
297
plugin = new PluginMeta ();
283
298
plugin .name = "testplugin" ;
@@ -335,6 +350,7 @@ public void testListenerPlugins() throws Exception {
335
350
.withPayload (Collections .singletonMap ("remove" , ClusterEventProducer .PLUGIN_NAME ))
336
351
.build ();
337
352
req .process (cluster .getSolrClient ());
353
+ version = waitForVersionChange (-1 , 10 );
338
354
339
355
dummyEventLatch = new CountDownLatch (1 );
340
356
lastEvent = null ;
@@ -355,6 +371,7 @@ public void testListenerPlugins() throws Exception {
355
371
.build ();
356
372
rsp = req .process (cluster .getSolrClient ());
357
373
assertEquals (0 , rsp .getStatus ());
374
+ version = waitForVersionChange (-1 , 10 );
358
375
359
376
dummyEventLatch = new CountDownLatch (1 );
360
377
lastEvent = null ;
@@ -367,4 +384,29 @@ public void testListenerPlugins() throws Exception {
367
384
assertNotNull ("lastEvent should be COLLECTIONS_REMOVED" , lastEvent );
368
385
assertEquals ("lastEvent should be COLLECTIONS_REMOVED" , ClusterEvent .EventType .COLLECTIONS_REMOVED , lastEvent .getType ());
369
386
}
387
+
388
+ private int waitForVersionChange (int currentVersion , int timeoutSec ) throws Exception {
389
+ TimeOut timeout = new TimeOut (timeoutSec , TimeUnit .SECONDS , TimeSource .NANO_TIME );
390
+ Overseer overseer = cluster .getOpenOverseer ();
391
+ if (overseer == null ) {
392
+ throw new Exception ("no overseer" );
393
+ }
394
+ ClusterEventProducer clusterEventProducer = overseer .getCoreContainer ().getClusterEventProducer ();
395
+ assertTrue ("not a delegating producer? " + clusterEventProducer .getClass (),
396
+ clusterEventProducer instanceof DelegatingClusterEventProducer );
397
+ DelegatingClusterEventProducer wrapper = (DelegatingClusterEventProducer ) clusterEventProducer ;
398
+ while (!timeout .hasTimedOut ()) {
399
+ int newVersion = wrapper .getVersion ();
400
+ if (newVersion < currentVersion ) {
401
+ throw new Exception ("Invalid version - went back! currentVersion=" + currentVersion +
402
+ " newVersion=" + newVersion );
403
+ } else if (currentVersion < newVersion ) {
404
+ log .debug ("--current version was {}, new version is {}" , currentVersion , newVersion );
405
+ return newVersion ;
406
+ }
407
+ timeout .sleep (200 );
408
+ }
409
+ throw new TimeoutException ("version didn't change in time, currentVersion=" + currentVersion );
410
+
411
+ }
370
412
}
0 commit comments