@@ -31,8 +31,8 @@ abstract class MpmcArrayQueueProducerField<E> extends MpmcArrayQueueL1Pad<E> {
3131 private final static long P_INDEX_OFFSET ;
3232 static {
3333 try {
34- P_INDEX_OFFSET =
35- UNSAFE . objectFieldOffset ( MpmcArrayQueueProducerField . class .getDeclaredField ("producerIndex" ));
34+ P_INDEX_OFFSET = UNSAFE . objectFieldOffset ( MpmcArrayQueueProducerField . class
35+ .getDeclaredField ("producerIndex" ));
3636 } catch (NoSuchFieldException e ) {
3737 throw new RuntimeException (e );
3838 }
@@ -65,8 +65,8 @@ abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad<E> {
6565 private final static long C_INDEX_OFFSET ;
6666 static {
6767 try {
68- C_INDEX_OFFSET =
69- UNSAFE . objectFieldOffset ( MpmcArrayQueueConsumerField . class .getDeclaredField ("consumerIndex" ));
68+ C_INDEX_OFFSET = UNSAFE . objectFieldOffset ( MpmcArrayQueueConsumerField . class
69+ .getDeclaredField ("consumerIndex" ));
7070 } catch (NoSuchFieldException e ) {
7171 throw new RuntimeException (e );
7272 }
@@ -87,26 +87,28 @@ protected final boolean casConsumerIndex(long expect, long newValue) {
8787}
8888
8989/**
90- * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that any and all
91- * threads may call the offer/poll/peek methods and correctness is maintained. <br>
90+ * A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
91+ * any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
9292 * This implementation follows patterns documented on the package level for False Sharing protection.<br>
9393 * The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See <a
94- * href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original algorithm
95- * uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on
96- * Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is
97- * a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is
98- * closer structurally to the original, but sadly does not perform as well as this implementation.<br>
94+ * href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original
95+ * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
96+ * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
97+ * field of the struct. There is a further alternative in the experimental project which uses iteration phase
98+ * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
99+ * well as this implementation.<br>
99100 * Tradeoffs to keep in mind:
100101 * <ol>
101- * <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays.
102- * We are trading memory to avoid false sharing(active and passive).
103- * <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array.
104- * This is doubling/tripling the memory allocated for the buffer.
102+ * <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
103+ * both arrays. We are trading memory to avoid false sharing(active and passive).
104+ * <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
105+ * elements array. This is doubling/tripling the memory allocated for the buffer.
105106 * <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
106107 * equal to the requested capacity.
107108 * </ol>
108109 *
109- * @param <E> type of the element stored in the {@link java.util.Queue}
110+ * @param <E>
111+ * type of the element stored in the {@link java.util.Queue}
110112 */
111113public class MpmcArrayQueue <E > extends MpmcArrayQueueConsumerField <E > {
112114 long p40 , p41 , p42 , p43 , p44 , p45 , p46 ;
@@ -123,10 +125,11 @@ public boolean offer(final E e) {
123125 }
124126
125127 // local load of field to avoid repeated loads after volatile reads
128+ final long capacity = mask + 1 ;
126129 final long [] lSequenceBuffer = sequenceBuffer ;
127130 long currentProducerIndex ;
128131 long seqOffset ;
129-
132+ long cIndex = Long . MAX_VALUE ; // start with bogus value, hope we don't need it
130133 while (true ) {
131134 currentProducerIndex = lvProducerIndex (); // LoadLoad
132135 seqOffset = calcSequenceOffset (currentProducerIndex );
@@ -140,8 +143,10 @@ public boolean offer(final E e) {
140143 break ;
141144 }
142145 // failed cas, retry 1
143- } else if (delta < 0 ) {
144- // poll has not moved this value forward
146+ } else if (delta < 0 && // poll has not moved this value forward
147+ currentProducerIndex - capacity <= cIndex && // test against cached cIndex
148+ currentProducerIndex - capacity <= (cIndex = lvConsumerIndex ())) { // test against latest cIndex
149+ // Extra check required to ensure [Queue.offer == false iff queue is full]
145150 return false ;
146151 }
147152
@@ -161,16 +166,17 @@ public boolean offer(final E e) {
161166
162167 /**
163168 * {@inheritDoc}
164- * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must
165- * test producer index when next element is not visible.
169+ * <p>
170+ * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
171+ * and must test producer index when next element is not visible.
166172 */
167173 @ Override
168174 public E poll () {
169175 // local load of field to avoid repeated loads after volatile reads
170176 final long [] lSequenceBuffer = sequenceBuffer ;
171177 long currentConsumerIndex ;
172178 long seqOffset ;
173-
179+ long pIndex = - 1 ; // start with bogus value, hope we don't need it
174180 while (true ) {
175181 currentConsumerIndex = lvConsumerIndex ();// LoadLoad
176182 seqOffset = calcSequenceOffset (currentConsumerIndex );
@@ -183,12 +189,10 @@ public E poll() {
183189 break ;
184190 }
185191 // failed cas, retry 1
186- } else if (delta < 0 ) {
187- // COMMENTED OUT: strict empty check.
188- // if (currentConsumerIndex == lvProducerIndex()) {
189- // return null;
190- // }
191- // next element is not visible, probably empty
192+ } else if (delta < 0 && // slot has not been moved by producer
193+ currentConsumerIndex >= pIndex && // test against cached pIndex
194+ currentConsumerIndex == (pIndex = lvProducerIndex ())) { // update pIndex if we must
195+ // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
192196 return null ;
193197 }
194198
@@ -202,22 +206,31 @@ public E poll() {
202206
203207 // Move sequence ahead by capacity, preparing it for next offer
204208 // (seeing this value from a consumer will lead to retry 2)
205- soSequence (lSequenceBuffer , seqOffset , currentConsumerIndex + capacity );// StoreStore
209+ soSequence (lSequenceBuffer , seqOffset , currentConsumerIndex + mask + 1 );// StoreStore
206210
207211 return e ;
208212 }
209213
210214 @ Override
211215 public E peek () {
212- return lpElement (calcElementOffset (lvConsumerIndex ()));
216+ long currConsumerIndex ;
217+ E e ;
218+ do {
219+ currConsumerIndex = lvConsumerIndex ();
220+ // other consumers may have grabbed the element, or queue might be empty
221+ e = lpElement (calcElementOffset (currConsumerIndex ));
222+ // only return null if queue is empty
223+ } while (e == null && currConsumerIndex != lvProducerIndex ());
224+ return e ;
213225 }
214226
215227 @ Override
216228 public int size () {
217229 /*
218- * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
219- * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
220- * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
230+ * It is possible for a thread to be interrupted or reschedule between the read of the producer and
231+ * consumer indices, therefore protection is required to ensure size is within valid range. In the
232+ * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
233+ * index BEFORE the producer index.
221234 */
222235 long after = lvConsumerIndex ();
223236 while (true ) {
@@ -229,13 +242,13 @@ public int size() {
229242 }
230243 }
231244 }
232-
245+
233246 @ Override
234247 public boolean isEmpty () {
235- // Order matters!
248+ // Order matters!
236249 // Loading consumer before producer allows for producer increments after consumer index is read.
237- // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is nothing we
238- // can do to make this an exact method.
250+ // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
251+ // nothing we can do to make this an exact method.
239252 return (lvConsumerIndex () == lvProducerIndex ());
240253 }
241- }
254+ }
0 commit comments