Skip to content

Commit 9d8cac3

Browse files
Reverting use of JCTools Until Problem Is Solved
1 parent 07fb328 commit 9d8cac3

File tree

3 files changed

+56
-73
lines changed

3 files changed

+56
-73
lines changed

rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import rx.Subscription;
2222
import rx.exceptions.MissingBackpressureException;
2323
import rx.internal.operators.NotificationLite;
24-
import rx.internal.util.unsafe.SpmcArrayQueue;
25-
import rx.internal.util.unsafe.SpscArrayQueue;
2624
import rx.internal.util.unsafe.UnsafeAccess;
2725

2826
/**
@@ -33,15 +31,28 @@ public class RxRingBuffer implements Subscription {
3331

3432
public static RxRingBuffer getSpscInstance() {
3533
if (UnsafeAccess.isUnsafeAvailable()) {
36-
return new RxRingBuffer(SPSC_POOL, SIZE);
34+
// using SynchronizedQueue until issues are solved with SpscArrayQueue offer rejection
35+
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
36+
// BackpressureException when using SpscArrayQueue
37+
// return new RxRingBuffer(SPSC_POOL, SIZE); // this is the one we were trying to use
38+
// return new RxRingBuffer(new SpscArrayQueue<Object>(SIZE), SIZE);
39+
// the performance of this is sufficient (actually faster in some cases)
40+
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
3741
} else {
3842
return new RxRingBuffer();
3943
}
4044
}
4145

4246
public static RxRingBuffer getSpmcInstance() {
4347
if (UnsafeAccess.isUnsafeAvailable()) {
44-
return new RxRingBuffer(SPMC_POOL, SIZE);
48+
// using SynchronizedQueue until issues are solved with SpmcArrayQueue offer rejection
49+
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
50+
// BackpressureException when using SpmcArrayQueue/MpmcArrayQueue
51+
// return new RxRingBuffer(SPMC_POOL, SIZE); // this is the one we were trying to use
52+
// return new RxRingBuffer(new SpmcArrayQueue<Object>(SIZE), SIZE);
53+
// return new RxRingBuffer(new MpmcArrayQueue<Object>(SIZE), SIZE);
54+
// the performance of this is sufficient (actually faster in some cases)
55+
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
4556
} else {
4657
return new RxRingBuffer();
4758
}
@@ -75,7 +86,7 @@ public static RxRingBuffer getSpmcInstance() {
7586
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s
7687
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s
7788
*
78-
* With SynchronizedQueue (synchronized LinkedList)
89+
* With SynchronizedQueue (synchronized LinkedList ... no object pooling)
7990
*
8091
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s
8192
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s
@@ -119,11 +130,11 @@ public static RxRingBuffer getSpmcInstance() {
119130
* With SpmcArrayQueue
120131
* - requires access to Unsafe
121132
*
122-
* Benchmark Mode Samples Score Score error Units
123-
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 1835494.523 63874.461 ops/s
124-
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 45545.599 1882.146 ops/s
125-
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 38126258.816 474874.236 ops/s
126-
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 42507.743 240.530 ops/s
133+
* Benchmark Mode Samples Score Score error Units
134+
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 27630345.474 769219.142 ops/s
135+
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 80052.046 4059.541 ops/s
136+
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 44449524.222 563068.793 ops/s
137+
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 65231.253 1805.732 ops/s
127138
*
128139
* With SpmcArrayQueue and ObjectPool (object pool improves createUseAndDestroy1 by 10x)
129140
*
@@ -135,17 +146,7 @@ public static RxRingBuffer getSpmcInstance() {
135146
*
136147
* --------------
137148
*
138-
* When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations and get these numbers:
139-
*
140-
* Benchmark Mode Samples Score Score error Units
141-
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 17813072.116 672207.872 ops/s
142-
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 46794.691 1146.195 ops/s
143-
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 32117630.315 749011.552 ops/s
144-
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 47257.476 1081.623 ops/s
145-
* r.i.RxRingBufferPerf.spscCreateUseAndDestroy1 thrpt 5 24729994.601 353101.940 ops/s
146-
* r.i.RxRingBufferPerf.spscCreateUseAndDestroy1000 thrpt 5 73101.460 2406.377 ops/s
147-
* r.i.RxRingBufferPerf.spscRingBufferAddRemove1 thrpt 5 83548821.062 752738.756 ops/s
148-
* r.i.RxRingBufferPerf.spscRingBufferAddRemove1000 thrpt 5 70549.816 1377.227 ops/s
149+
* When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations.
149150
*
150151
* } </pre>
151152
*/
@@ -169,30 +170,13 @@ public static RxRingBuffer getSpmcInstance() {
169170

170171
public static final int SIZE = 1024;
171172

172-
private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {
173-
174-
@Override
175-
protected SpscArrayQueue<Object> createObject() {
176-
return new SpscArrayQueue<Object>(SIZE);
177-
}
178-
179-
};
180-
181-
private static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {
182-
183-
@Override
184-
protected SpmcArrayQueue<Object> createObject() {
185-
return new SpmcArrayQueue<Object>(SIZE);
186-
}
187-
188-
};
189-
190173
private RxRingBuffer(Queue<Object> queue, int size) {
191174
this.queue = queue;
192175
this.pool = null;
193176
this.size = size;
194177
}
195178

179+
@SuppressWarnings("unused")
196180
private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
197181
this.pool = pool;
198182
this.queue = pool.borrowObject();
@@ -213,7 +197,7 @@ public void unsubscribe() {
213197
release();
214198
}
215199

216-
/* for unit tests */RxRingBuffer() {
200+
/* package accessible for unit tests */RxRingBuffer() {
217201
this(new SynchronizedQueue<Object>(SIZE), SIZE);
218202
}
219203

@@ -260,7 +244,7 @@ public int count() {
260244
}
261245
return queue.size();
262246
}
263-
247+
264248
public boolean isEmpty() {
265249
if (queue == null) {
266250
return true;
@@ -294,7 +278,7 @@ public Object poll() {
294278
}
295279
return o;
296280
}
297-
281+
298282
public Object peek() {
299283
if (queue == null) {
300284
// we are unsubscribed and have released the undelrying queue

rxjava-core/src/main/java/rx/internal/util/unsafe/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ This package contains code that relies on sun.misc.Unsafe. Before using it you M
22

33
Much of the code in this package comes from or is inspired by the JCTools project: https://github.com/JCTools/JCTools
44

5-
It is manually embedded here rather than added as a binary dependency since rxjava-core is aiming to have 0 dependencies and keep the binary size as small as possible.
5+
Once JCTools publishes artifacts (https://github.com/JCTools/JCTools/issues/17) RxJava may add JCTools as a "shadow" dependency.
6+
RxJava has a "zero dependency" policy for the core library, so if we do add it as a dependency, it won't be an externally visible dependency that results in a separate jar.
67

7-
The license for this code is https://github.com/JCTools/JCTools/blob/master/LICENSE
8+
The license for the JCTools code is https://github.com/JCTools/JCTools/blob/master/LICENSE
89

910
As of June 10 2014 when this code was copied the LICENSE read as:
1011

rxjava-core/src/test/java/rx/internal/util/RxRingBufferSpmcTest.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ protected RxRingBuffer createRingBuffer() {
3939
/**
4040
* Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread.
4141
*/
42-
@Test(timeout = 2000)
42+
@Test
4343
public void testConcurrency() throws InterruptedException {
4444
final RxRingBuffer b = createRingBuffer();
45-
final CountDownLatch latch = new CountDownLatch(255);
45+
final CountDownLatch emitLatch = new CountDownLatch(255);
46+
final CountDownLatch drainLatch = new CountDownLatch(2);
4647

4748
final Scheduler.Worker w1 = Schedulers.newThread().createWorker();
4849
Scheduler.Worker w2 = Schedulers.newThread().createWorker();
@@ -58,25 +59,25 @@ public void testConcurrency() throws InterruptedException {
5859

5960
@Override
6061
public void request(final long n) {
61-
System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
62+
// System.out.println("request[" + c.incrementAndGet() + "]: " + n + " Thread: " + Thread.currentThread());
6263
w1.schedule(new Action0() {
6364

6465
@Override
6566
public void call() {
66-
if (latch.getCount() == 0) {
67+
if (emitLatch.getCount() == 0) {
6768
return;
6869
}
6970
for (int i = 0; i < n; i++) {
7071
try {
71-
emit.incrementAndGet();
7272
b.onNext("one");
73+
emit.incrementAndGet();
7374
} catch (MissingBackpressureException e) {
7475
System.out.println("BackpressureException => item: " + i + " requested: " + n + " emit: " + emit.get() + " poll: " + poll.get());
7576
backpressureExceptions.incrementAndGet();
7677
}
7778
}
7879
// we'll release after n batches
79-
latch.countDown();
80+
emitLatch.countDown();
8081
}
8182

8283
});
@@ -94,11 +95,12 @@ public void call() {
9495

9596
});
9697

97-
w2.schedule(new Action0() {
98+
Action0 drainer = new Action0() {
9899

99100
@Override
100101
public void call() {
101102
int emitted = 0;
103+
int shutdownCount = 0;
102104
while (true) {
103105
Object o = b.poll();
104106
if (o != null) {
@@ -108,39 +110,35 @@ public void call() {
108110
if (emitted > 0) {
109111
ts.requestMore(emitted);
110112
emitted = 0;
113+
} else {
114+
if (emitLatch.getCount() == 0) {
115+
shutdownCount++;
116+
// hack to handle the non-blocking queues
117+
// which can have a race condition between offer and poll
118+
// so poll can return null and then have a value the next loop around
119+
// ... even after emitLatch.getCount() == 0 ... no idea why.
120+
if (shutdownCount > 5) {
121+
drainLatch.countDown();
122+
return;
123+
}
124+
}
111125
}
112126
}
113127
}
114128

115129
}
116130

117-
});
131+
};
118132

119-
w3.schedule(new Action0() {
133+
w2.schedule(drainer);
134+
w3.schedule(drainer);
120135

121-
@Override
122-
public void call() {
123-
int emitted = 0;
124-
while (true) {
125-
Object o = b.poll();
126-
if (o != null) {
127-
emitted++;
128-
poll.incrementAndGet();
129-
} else {
130-
if (emitted > 0) {
131-
ts.requestMore(emitted);
132-
emitted = 0;
133-
}
134-
}
135-
}
136-
}
137-
138-
});
136+
emitLatch.await();
137+
drainLatch.await();
139138

140-
latch.await();
141-
w1.unsubscribe();
142139
w2.unsubscribe();
143140
w3.unsubscribe();
141+
w1.unsubscribe(); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3
144142

145143
System.out.println("emit: " + emit.get() + " poll: " + poll.get());
146144
assertEquals(0, backpressureExceptions.get());

0 commit comments

Comments
 (0)