Skip to content

Commit 07fb328

Browse files
Update JCTools Code
1 parent d9c628d commit 07fb328

10 files changed

+487
-611
lines changed

rxjava-core/src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java

Lines changed: 86 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,27 @@
2121
import java.util.AbstractQueue;
2222
import java.util.Iterator;
2323

24-
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E>{
24+
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
2525
long p00, p01, p02, p03, p04, p05, p06, p07;
2626
long p30, p31, p32, p33, p34, p35, p36, p37;
2727
}
2828

29+
/**
30+
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
31+
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
32+
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
33+
* padding.
34+
* <p>
35+
* Offset calculation is separate from access to enable the reuse of a give compute offset.
36+
* <p>
37+
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
38+
* LoadLoad barrier.
39+
* <p>
40+
*
41+
* @author nitsanw
42+
*
43+
* @param <E>
44+
*/
2945
public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
3046
protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0);
3147
protected static final int BUFFER_PAD = 32;
@@ -51,87 +67,108 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
5167

5268
@SuppressWarnings("unchecked")
5369
public ConcurrentCircularArrayQueue(int capacity) {
54-
this.capacity = Pow2.findNextPositivePowerOfTwo(capacity);
70+
this.capacity = Pow2.roundToPowerOfTwo(capacity);
5571
mask = this.capacity - 1;
5672
// pad data on either end with some empty slots.
5773
buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
5874
}
5975

60-
public ConcurrentCircularArrayQueue(ConcurrentCircularArrayQueue<E> c) {
61-
this.capacity = c.capacity;
62-
this.mask = c.mask;
63-
// pad data on either end with some empty slots.
64-
this.buffer = c.buffer;
65-
}
66-
67-
protected final long calcOffset(long index) {
76+
/**
77+
* @param index desirable element index
78+
* @return the offset in bytes within the array for a given index.
79+
*/
80+
protected final long calcElementOffset(long index) {
6881
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
6982
}
7083

84+
/**
85+
* A plain store (no ordering/fences) of an element to a given offset
86+
*
87+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
88+
* @param e a kitty
89+
*/
7190
protected final void spElement(long offset, E e) {
72-
UNSAFE.putObject(buffer, offset, e);
73-
}
74-
75-
protected final void soElement(long offset, E e) {
76-
UNSAFE.putOrderedObject(buffer, offset, e);
77-
}
78-
79-
protected final void svElement(long offset, E e) {
80-
UNSAFE.putObjectVolatile(buffer, offset, e);
81-
}
82-
83-
@SuppressWarnings("unchecked")
84-
protected final E lpElement(long offset) {
85-
return (E) UNSAFE.getObject(buffer, offset);
86-
}
87-
88-
@SuppressWarnings("unchecked")
89-
protected final E lvElement(long offset) {
90-
return (E) UNSAFE.getObjectVolatile(buffer, offset);
91+
spElement(buffer, offset, e);
9192
}
9293

94+
/**
95+
* A plain store (no ordering/fences) of an element to a given offset
96+
*
97+
* @param buffer this.buffer
98+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
99+
* @param e an orderly kitty
100+
*/
93101
protected final void spElement(E[] buffer, long offset, E e) {
94102
UNSAFE.putObject(buffer, offset, e);
95103
}
96104

105+
/**
106+
* An ordered store(store + StoreStore barrier) of an element to a given offset
107+
*
108+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
109+
* @param e an orderly kitty
110+
*/
111+
protected final void soElement(long offset, E e) {
112+
soElement(buffer, offset, e);
113+
}
114+
115+
/**
116+
* An ordered store(store + StoreStore barrier) of an element to a given offset
117+
*
118+
* @param buffer this.buffer
119+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
120+
* @param e an orderly kitty
121+
*/
97122
protected final void soElement(E[] buffer, long offset, E e) {
98123
UNSAFE.putOrderedObject(buffer, offset, e);
99124
}
100125

101-
protected final void svElement(E[] buffer, long offset, E e) {
102-
UNSAFE.putObjectVolatile(buffer, offset, e);
126+
/**
127+
* A plain load (no ordering/fences) of an element from a given offset.
128+
*
129+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
130+
* @return the element at the offset
131+
*/
132+
protected final E lpElement(long offset) {
133+
return lpElement(buffer, offset);
103134
}
104135

136+
/**
137+
* A plain load (no ordering/fences) of an element from a given offset.
138+
*
139+
* @param buffer this.buffer
140+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
141+
* @return the element at the offset
142+
*/
105143
@SuppressWarnings("unchecked")
106144
protected final E lpElement(E[] buffer, long offset) {
107145
return (E) UNSAFE.getObject(buffer, offset);
108146
}
109147

148+
/**
149+
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
150+
*
151+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
152+
* @return the element at the offset
153+
*/
154+
protected final E lvElement(long offset) {
155+
return lvElement(buffer, offset);
156+
}
157+
158+
/**
159+
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
160+
*
161+
* @param buffer this.buffer
162+
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
163+
* @return the element at the offset
164+
*/
110165
@SuppressWarnings("unchecked")
111166
protected final E lvElement(E[] buffer, long offset) {
112167
return (E) UNSAFE.getObjectVolatile(buffer, offset);
113168
}
114169

115-
@Override
116-
public boolean offer(E e) {
117-
throw new UnsupportedOperationException();
118-
}
119-
120-
@Override
121-
public E poll() {
122-
throw new UnsupportedOperationException();
123-
}
124-
@Override
125-
public E peek() {
126-
throw new UnsupportedOperationException();
127-
}
128170
@Override
129171
public Iterator<E> iterator() {
130172
throw new UnsupportedOperationException();
131173
}
132-
133-
@Override
134-
public int size() {
135-
throw new UnsupportedOperationException();
136-
}
137174
}

rxjava-core/src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java

Lines changed: 4 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ public abstract class ConcurrentSequencedCircularArrayQueue<E> extends Concurren
2929
throw new IllegalStateException("Unexpected long[] element size");
3030
}
3131
// Including the buffer pad in the array base offset
32-
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class)
33-
+ (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT));
32+
ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class) + (BUFFER_PAD << (ELEMENT_SHIFT - SPARSE_SHIFT));
3433
}
3534
protected final long[] sequenceBuffer;
3635

@@ -39,56 +38,19 @@ public ConcurrentSequencedCircularArrayQueue(int capacity) {
3938
// pad data on either end with some empty slots.
4039
sequenceBuffer = new long[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
4140
for (long i = 0; i < this.capacity; i++) {
42-
soSequenceElement(calcSequenceOffset(i), i);
41+
soSequence(sequenceBuffer, calcSequenceOffset(i), i);
4342
}
4443
}
4544

46-
public ConcurrentSequencedCircularArrayQueue(ConcurrentSequencedCircularArrayQueue<E> c) {
47-
super(c);
48-
this.sequenceBuffer = c.sequenceBuffer;
49-
}
50-
5145
protected final long calcSequenceOffset(long index) {
5246
return ARRAY_BASE + ((index & mask) << ELEMENT_SHIFT);
5347
}
5448

55-
protected final void spSequenceElement(long offset, long e) {
56-
UNSAFE.putLong(sequenceBuffer, offset, e);
57-
}
58-
59-
protected final void soSequenceElement(long offset, long e) {
60-
UNSAFE.putOrderedLong(sequenceBuffer, offset, e);
61-
}
62-
63-
protected final void svSequenceElement(long offset, long e) {
64-
UNSAFE.putLongVolatile(sequenceBuffer, offset, e);
65-
}
66-
67-
protected final long lpSequenceElement(long offset) {
68-
return UNSAFE.getLong(sequenceBuffer, offset);
69-
}
70-
71-
protected final long lvSequenceElement(long offset) {
72-
return UNSAFE.getLongVolatile(sequenceBuffer, offset);
73-
}
74-
75-
protected final void spSequenceElement(long[] buffer, long offset, long e) {
76-
UNSAFE.putLong(buffer, offset, e);
77-
}
78-
79-
protected final void soSequenceElement(long[] buffer, long offset, long e) {
49+
protected final void soSequence(long[] buffer, long offset, long e) {
8050
UNSAFE.putOrderedLong(buffer, offset, e);
8151
}
8252

83-
protected final void svSequenceElement(long[] buffer, long offset, long e) {
84-
UNSAFE.putLongVolatile(buffer, offset, e);
85-
}
86-
87-
protected final long lpSequenceElement(long[] buffer, long offset) {
88-
return UNSAFE.getLong(buffer, offset);
89-
}
90-
91-
protected final long lvSequenceElement(long[] buffer, long offset) {
53+
protected final long lvSequence(long[] buffer, long offset) {
9254
return UNSAFE.getLongVolatile(buffer, offset);
9355
}
9456

rxjava-core/src/main/java/rx/internal/util/unsafe/IntConcurrentCircularArrayQueue.java

Lines changed: 0 additions & 107 deletions
This file was deleted.

0 commit comments

Comments
 (0)