Skip to content

Commit 45bda44

Browse files
Merge pull request #1496 from benjchristensen/ring-buffer
Change RxRingBuffer Queue Usage
2 parents 5e855f3 + 9d8cac3 commit 45bda44

13 files changed

+543
-684
lines changed

rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import rx.Subscription;
2222
import rx.exceptions.MissingBackpressureException;
2323
import rx.internal.operators.NotificationLite;
24-
import rx.internal.util.unsafe.SpmcArrayQueue;
25-
import rx.internal.util.unsafe.SpscArrayQueue;
2624
import rx.internal.util.unsafe.UnsafeAccess;
2725

2826
/**
@@ -33,15 +31,28 @@ public class RxRingBuffer implements Subscription {
3331

3432
public static RxRingBuffer getSpscInstance() {
3533
if (UnsafeAccess.isUnsafeAvailable()) {
36-
return new RxRingBuffer(SPSC_POOL, SIZE);
34+
// using SynchronizedQueue until issues are solved with SpscArrayQueue offer rejection
35+
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
36+
// BackpressureException when using SpscArrayQueue
37+
// return new RxRingBuffer(SPSC_POOL, SIZE); // this is the one we were trying to use
38+
// return new RxRingBuffer(new SpscArrayQueue<Object>(SIZE), SIZE);
39+
// the performance of this is sufficient (actually faster in some cases)
40+
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
3741
} else {
3842
return new RxRingBuffer();
3943
}
4044
}
4145

4246
public static RxRingBuffer getSpmcInstance() {
4347
if (UnsafeAccess.isUnsafeAvailable()) {
44-
return new RxRingBuffer(SPMC_POOL, SIZE);
48+
// using SynchronizedQueue until issues are solved with SpmcArrayQueue offer rejection
49+
// RxRingBufferSpmcTest.testConcurrency occasionally fails with a
50+
// BackpressureException when using SpmcArrayQueue/MpmcArrayQueue
51+
// return new RxRingBuffer(SPMC_POOL, SIZE); // this is the one we were trying to use
52+
// return new RxRingBuffer(new SpmcArrayQueue<Object>(SIZE), SIZE);
53+
// return new RxRingBuffer(new MpmcArrayQueue<Object>(SIZE), SIZE);
54+
// the performance of this is sufficient (actually faster in some cases)
55+
return new RxRingBuffer(new SynchronizedQueue<Object>(SIZE), SIZE);
4556
} else {
4657
return new RxRingBuffer();
4758
}
@@ -75,7 +86,7 @@ public static RxRingBuffer getSpmcInstance() {
7586
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s
7687
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s
7788
*
78-
* With SynchronizedQueue (synchronized LinkedList)
89+
* With SynchronizedQueue (synchronized LinkedList ... no object pooling)
7990
*
8091
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s
8192
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s
@@ -119,11 +130,11 @@ public static RxRingBuffer getSpmcInstance() {
119130
* With SpmcArrayQueue
120131
* - requires access to Unsafe
121132
*
122-
* Benchmark Mode Samples Score Score error Units
123-
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 1835494.523 63874.461 ops/s
124-
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 45545.599 1882.146 ops/s
125-
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 38126258.816 474874.236 ops/s
126-
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 42507.743 240.530 ops/s
133+
* Benchmark Mode Samples Score Score error Units
134+
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 27630345.474 769219.142 ops/s
135+
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 80052.046 4059.541 ops/s
136+
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 44449524.222 563068.793 ops/s
137+
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 65231.253 1805.732 ops/s
127138
*
128139
* With SpmcArrayQueue and ObjectPool (object pool improves createUseAndDestroy1 by 10x)
129140
*
@@ -135,17 +146,7 @@ public static RxRingBuffer getSpmcInstance() {
135146
*
136147
* --------------
137148
*
138-
* When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations and get these numbers:
139-
*
140-
* Benchmark Mode Samples Score Score error Units
141-
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1 thrpt 5 17813072.116 672207.872 ops/s
142-
* r.i.RxRingBufferPerf.spmcCreateUseAndDestroy1000 thrpt 5 46794.691 1146.195 ops/s
143-
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1 thrpt 5 32117630.315 749011.552 ops/s
144-
* r.i.RxRingBufferPerf.spmcRingBufferAddRemove1000 thrpt 5 47257.476 1081.623 ops/s
145-
* r.i.RxRingBufferPerf.spscCreateUseAndDestroy1 thrpt 5 24729994.601 353101.940 ops/s
146-
* r.i.RxRingBufferPerf.spscCreateUseAndDestroy1000 thrpt 5 73101.460 2406.377 ops/s
147-
* r.i.RxRingBufferPerf.spscRingBufferAddRemove1 thrpt 5 83548821.062 752738.756 ops/s
148-
* r.i.RxRingBufferPerf.spscRingBufferAddRemove1000 thrpt 5 70549.816 1377.227 ops/s
149+
* When UnsafeAccess.isUnsafeAvailable() == true we can use the Spmc/SpscArrayQueue implementations.
149150
*
150151
* } </pre>
151152
*/
@@ -169,30 +170,13 @@ public static RxRingBuffer getSpmcInstance() {
169170

170171
public static final int SIZE = 1024;
171172

172-
private static ObjectPool<Queue<Object>> SPSC_POOL = new ObjectPool<Queue<Object>>() {
173-
174-
@Override
175-
protected SpscArrayQueue<Object> createObject() {
176-
return new SpscArrayQueue<Object>(SIZE);
177-
}
178-
179-
};
180-
181-
private static ObjectPool<Queue<Object>> SPMC_POOL = new ObjectPool<Queue<Object>>() {
182-
183-
@Override
184-
protected SpmcArrayQueue<Object> createObject() {
185-
return new SpmcArrayQueue<Object>(SIZE);
186-
}
187-
188-
};
189-
190173
private RxRingBuffer(Queue<Object> queue, int size) {
191174
this.queue = queue;
192175
this.pool = null;
193176
this.size = size;
194177
}
195178

179+
@SuppressWarnings("unused")
196180
private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
197181
this.pool = pool;
198182
this.queue = pool.borrowObject();
@@ -213,7 +197,7 @@ public void unsubscribe() {
213197
release();
214198
}
215199

216-
/* for unit tests */RxRingBuffer() {
200+
/* package accessible for unit tests */RxRingBuffer() {
217201
this(new SynchronizedQueue<Object>(SIZE), SIZE);
218202
}
219203

@@ -260,7 +244,7 @@ public int count() {
260244
}
261245
return queue.size();
262246
}
263-
247+
264248
public boolean isEmpty() {
265249
if (queue == null) {
266250
return true;
@@ -294,7 +278,7 @@ public Object poll() {
294278
}
295279
return o;
296280
}
297-
281+
298282
public Object peek() {
299283
if (queue == null) {
300284
// we are unsubscribed and have released the undelrying queue

rxjava-core/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java

Lines changed: 86 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,27 @@
2121
import java.util.AbstractQueue;
2222
import java.util.Iterator;
2323

24-
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E>{
24+
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
2525
long p00, p01, p02, p03, p04, p05, p06, p07;
2626
long p30, p31, p32, p33, p34, p35, p36, p37;
2727
}
2828

29+
/**
30+
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
31+
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
32+
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
33+
* padding.
34+
* <p>
35+
* Offset calculation is separate from access to enable the reuse of a give compute offset.
36+
* <p>
37+
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
38+
* LoadLoad barrier.
39+
* <p>
40+
*
41+
* @author nitsanw
42+
*
43+
* @param <E>
44+
*/
2945
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
3046
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
3147
protected static final int BUFFER_PAD = 32;
@@ -51,87 +67,108 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
5167

5268
@SuppressWarnings("unchecked")
5369
public ConcurrentCircularArrayQueue(int capacity) {
54-
this.capacity = Pow2.findNextPositivePowerOfTwo(capacity);
70+
this.capacity = Pow2.roundToPowerOfTwo(capacity);
5571
mask = this.capacity - 1;
5672
// pad data on either end with some empty slots.
5773
buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
5874
}
5975

60-
public ConcurrentCircularArrayQueue(ConcurrentCircularArrayQueue<E> c) {
61-
this.capacity = c.capacity;
62-
this.mask = c.mask;
63-
// pad data on either end with some empty slots.
64-
this.buffer = c.buffer;
65-
}
66-
67-
protected final long calcOffset(long index) {
76+
/**
77+
* @param index desirable element index
78+
* @return the offset in bytes within the array for a given index.
79+
*/
80+
protected final long calcElementOffset(long index) {
6881
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
6982
}
7083

84+
/**
85+
* A plain store (no ordering/fences) of an element to a given offset
86+
*
87+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
88+
* @param e a kitty
89+
*/
7190
protected final void spElement(long offset, E e) {
72-
UNSAFE.putObject(buffer, offset, e);
73-
}
74-
75-
protected final void soElement(long offset, E e) {
76-
UNSAFE.putOrderedObject(buffer, offset, e);
77-
}
78-
79-
protected final void svElement(long offset, E e) {
80-
UNSAFE.putObjectVolatile(buffer, offset, e);
81-
}
82-
83-
@SuppressWarnings("unchecked")
84-
protected final E lpElement(long offset) {
85-
return (E) UNSAFE.getObject(buffer, offset);
86-
}
87-
88-
@SuppressWarnings("unchecked")
89-
protected final E lvElement(long offset) {
90-
return (E) UNSAFE.getObjectVolatile(buffer, offset);
91+
spElement(buffer, offset, e);
9192
}
9293

94+
/**
95+
* A plain store (no ordering/fences) of an element to a given offset
96+
*
97+
* @param buffer this.buffer
98+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
99+
* @param e an orderly kitty
100+
*/
93101
protected final void spElement(E[] buffer, long offset, E e) {
94102
UNSAFE.putObject(buffer, offset, e);
95103
}
96104

105+
/**
106+
* An ordered store(store + StoreStore barrier) of an element to a given offset
107+
*
108+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
109+
* @param e an orderly kitty
110+
*/
111+
protected final void soElement(long offset, E e) {
112+
soElement(buffer, offset, e);
113+
}
114+
115+
/**
116+
* An ordered store(store + StoreStore barrier) of an element to a given offset
117+
*
118+
* @param buffer this.buffer
119+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
120+
* @param e an orderly kitty
121+
*/
97122
protected final void soElement(E[] buffer, long offset, E e) {
98123
UNSAFE.putOrderedObject(buffer, offset, e);
99124
}
100125

101-
protected final void svElement(E[] buffer, long offset, E e) {
102-
UNSAFE.putObjectVolatile(buffer, offset, e);
126+
/**
127+
* A plain load (no ordering/fences) of an element from a given offset.
128+
*
129+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
130+
* @return the element at the offset
131+
*/
132+
protected final E lpElement(long offset) {
133+
return lpElement(buffer, offset);
103134
}
104135

136+
/**
137+
* A plain load (no ordering/fences) of an element from a given offset.
138+
*
139+
* @param buffer this.buffer
140+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
141+
* @return the element at the offset
142+
*/
105143
@SuppressWarnings("unchecked")
106144
protected final E lpElement(E[] buffer, long offset) {
107145
return (E) UNSAFE.getObject(buffer, offset);
108146
}
109147

148+
/**
149+
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
150+
*
151+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
152+
* @return the element at the offset
153+
*/
154+
protected final E lvElement(long offset) {
155+
return lvElement(buffer, offset);
156+
}
157+
158+
/**
159+
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
160+
*
161+
* @param buffer this.buffer
162+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
163+
* @return the element at the offset
164+
*/
110165
@SuppressWarnings("unchecked")
111166
protected final E lvElement(E[] buffer, long offset) {
112167
return (E) UNSAFE.getObjectVolatile(buffer, offset);
113168
}
114169

115-
@Override
116-
public boolean offer(E e) {
117-
throw new UnsupportedOperationException();
118-
}
119-
120-
@Override
121-
public E poll() {
122-
throw new UnsupportedOperationException();
123-
}
124-
@Override
125-
public E peek() {
126-
throw new UnsupportedOperationException();
127-
}
128170
@Override
129171
public Iterator<E> iterator() {
130172
throw new UnsupportedOperationException();
131173
}
132-
133-
@Override
134-
public int size() {
135-
throw new UnsupportedOperationException();
136-
}
137174
}

0 commit comments

Comments
 (0)