1717
1818import static org .junit .Assert .assertEquals ;
1919
20+ import java .util .*;
2021import java .util .concurrent .CountDownLatch ;
2122import java .util .concurrent .atomic .AtomicInteger ;
2223
2324import org .junit .Test ;
2425
25- import rx .Producer ;
26- import rx .Scheduler ;
26+ import rx .*;
2727import rx .exceptions .MissingBackpressureException ;
2828import rx .functions .Action0 ;
2929import rx .observers .TestSubscriber ;
@@ -36,18 +36,25 @@ protected RxRingBuffer createRingBuffer() {
3636 return new RxRingBuffer ();
3737 }
3838
39+ @ Test (timeout = 20000 )
40+ public void testConcurrencyLoop () throws InterruptedException {
41+ for (int i = 0 ; i < 50 ; i ++) {
42+ testConcurrency ();
43+ }
44+ }
45+
3946 /**
4047 * Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread.
4148 */
42- @ Test
49+ @ Test ( timeout = 10000 )
4350 public void testConcurrency () throws InterruptedException {
4451 final RxRingBuffer b = createRingBuffer ();
45- final CountDownLatch emitLatch = new CountDownLatch (255 );
46- final CountDownLatch drainLatch = new CountDownLatch (2 );
52+ final CountDownLatch emitLatch = new CountDownLatch (127 );
53+ int drainers = 3 ;
54+ final CountDownLatch drainLatch = new CountDownLatch (drainers );
4755
4856 final Scheduler .Worker w1 = Schedulers .newThread ().createWorker ();
49- Scheduler .Worker w2 = Schedulers .newThread ().createWorker ();
50- Scheduler .Worker w3 = Schedulers .newThread ().createWorker ();
57+ List <Scheduler .Worker > drainerWorkers = new ArrayList <Scheduler .Worker >();
5158
5259 final AtomicInteger emit = new AtomicInteger ();
5360 final AtomicInteger poll = new AtomicInteger ();
@@ -110,7 +117,12 @@ public void call() {
110117 ts .requestMore (emitted );
111118 emitted = 0 ;
112119 } else {
113- if (emitLatch .getCount () == 0 ) {
120+ try {
121+ Thread .sleep (1 );
122+ } catch (InterruptedException ex ) {
123+ // ignored
124+ }
125+ if (emitLatch .getCount () == 0 && b .isEmpty ()) {
114126 // this works with SynchronizedQueue, if changing to a non-blocking Queue
115127 // then this will likely need to change like the SpmcTest version
116128 drainLatch .countDown ();
@@ -124,14 +136,18 @@ public void call() {
124136
125137 };
126138
127- w2 .schedule (drainer );
128- w3 .schedule (drainer );
139+ for (int i = 0 ; i < drainers ; i ++) {
140+ Scheduler .Worker w = Schedulers .newThread ().createWorker ();
141+ w .schedule (drainer );
142+ drainerWorkers .add (w );
143+ }
129144
130145 emitLatch .await ();
131146 drainLatch .await ();
132147
133- w2 .unsubscribe ();
134- w3 .unsubscribe ();
148+ for (Scheduler .Worker w : drainerWorkers ) {
149+ w .unsubscribe ();
150+ }
135151 w1 .unsubscribe (); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3
136152
137153 System .out .println ("emit: " + emit .get () + " poll: " + poll .get ());
0 commit comments