Skip to content

Commit e1ce7ba

Browse files
Merge pull request #1497 from benjchristensen/ring-buffer
spsc ring buffer concurrency test
2 parents f5fe249 + 63465db commit e1ce7ba

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed

rxjava-core/src/test/java/rx/internal/util/RxRingBufferSpscTest.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,131 @@
1515
*/
1616
package rx.internal.util;
1717

18+
import static org.junit.Assert.assertEquals;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import org.junit.Test;
24+
25+
import rx.Producer;
26+
import rx.Scheduler;
27+
import rx.exceptions.MissingBackpressureException;
28+
import rx.functions.Action0;
29+
import rx.observers.TestSubscriber;
30+
import rx.schedulers.Schedulers;
31+
1832
public class RxRingBufferSpscTest extends RxRingBufferBase {
1933

2034
@Override
2135
protected RxRingBuffer createRingBuffer() {
2236
return RxRingBuffer.getSpscInstance();
2337
}
2438

39+
/**
40+
* Single producer, single consumer. The request() ensures it gets scheduled back on the same Producer thread.
41+
*/
42+
@Test
43+
public void testConcurrency() throws InterruptedException {
44+
final RxRingBuffer b = createRingBuffer();
45+
final CountDownLatch emitLatch = new CountDownLatch(255);
46+
final CountDownLatch drainLatch = new CountDownLatch(1);
47+
48+
final Scheduler.Worker w1 = Schedulers.newThread().createWorker();
49+
Scheduler.Worker w2 = Schedulers.newThread().createWorker();
50+
51+
final AtomicInteger emit = new AtomicInteger();
52+
final AtomicInteger poll = new AtomicInteger();
53+
final AtomicInteger backpressureExceptions = new AtomicInteger();
54+
55+
final Producer p = new Producer() {
56+
57+
AtomicInteger c = new AtomicInteger();
58+
59+
@Override
60+
public void request(final long n) {
61+
// System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
62+
w1.schedule(new Action0() {
63+
64+
@Override
65+
public void call() {
66+
// System.out.println(" START request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
67+
if (emitLatch.getCount() == 0) {
68+
return;
69+
}
70+
for (int i = 0; i < n; i++) {
71+
try {
72+
b.onNext("one");
73+
emit.incrementAndGet();
74+
} catch (MissingBackpressureException e) {
75+
System.out.println("BackpressureException => item: " + i + " requested: " + n + " emit: " + emit.get() + " poll: " + poll.get());
76+
backpressureExceptions.incrementAndGet();
77+
}
78+
}
79+
// we'll release after n batches
80+
emitLatch.countDown();
81+
}
82+
83+
});
84+
}
85+
86+
};
87+
final TestSubscriber<String> ts = new TestSubscriber<String>();
88+
w1.schedule(new Action0() {
89+
90+
@Override
91+
public void call() {
92+
ts.requestMore(RxRingBuffer.SIZE);
93+
ts.setProducer(p);
94+
}
95+
96+
});
97+
98+
Action0 drainer = new Action0() {
99+
100+
@Override
101+
public void call() {
102+
int emitted = 0;
103+
int shutdownCount = 0;
104+
while (true) {
105+
Object o = b.poll();
106+
if (o != null) {
107+
emitted++;
108+
poll.incrementAndGet();
109+
} else {
110+
if (emitted > 0) {
111+
ts.requestMore(emitted);
112+
emitted = 0;
113+
} else {
114+
if (emitLatch.getCount() == 0) {
115+
shutdownCount++;
116+
// hack to handle the non-blocking queues
117+
// which can have a race condition between offer and poll
118+
// so poll can return null and then have a value the next loop around
119+
// ... even after emitLatch.getCount() == 0 ... no idea why.
120+
if (shutdownCount > 5) {
121+
drainLatch.countDown();
122+
return;
123+
}
124+
}
125+
}
126+
}
127+
}
128+
129+
}
130+
131+
};
132+
133+
w2.schedule(drainer);
134+
135+
emitLatch.await();
136+
drainLatch.await();
137+
138+
w2.unsubscribe();
139+
w1.unsubscribe(); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3
140+
141+
System.out.println("emit: " + emit.get() + " poll: " + poll.get());
142+
assertEquals(0, backpressureExceptions.get());
143+
assertEquals(emit.get(), poll.get());
144+
}
25145
}

0 commit comments

Comments
 (0)