33import java .lang .invoke .MethodHandles ;
44import java .lang .invoke .MethodHandles .Lookup ;
55import java .lang .invoke .VarHandle ;
6+ import java .util .Objects ;
67import java .util .concurrent .locks .LockSupport ;
78
89/**
@@ -17,13 +18,16 @@ public class MpscArrayQueueVarHandle<E> extends BaseQueue<E> {
1718 private static final VarHandle ARRAY_HANDLE ;
1819 private static final VarHandle HEAD_HANDLE ;
1920 private static final VarHandle TAIL_HANDLE ;
21+ private static final VarHandle PRODUCER_LIMIT_HANDLE ;
2022
2123 static {
2224 try {
2325 final Lookup lookup = MethodHandles .lookup ();
2426 TAIL_HANDLE = lookup .findVarHandle (MpscArrayQueueVarHandle .class , "tail" , long .class );
2527 HEAD_HANDLE = lookup .findVarHandle (MpscArrayQueueVarHandle .class , "head" , long .class );
2628 ARRAY_HANDLE = MethodHandles .arrayElementVarHandle (Object [].class );
29+ PRODUCER_LIMIT_HANDLE =
30+ lookup .findVarHandle (MpscArrayQueueVarHandle .class , "producerLimit" , long .class );
2731 } catch (Throwable t ) {
2832 throw new IllegalStateException (t );
2933 }
@@ -32,107 +36,156 @@ public class MpscArrayQueueVarHandle<E> extends BaseQueue<E> {
3236 /** The backing array (plain Java array for VarHandle access) */
3337 private final Object [] buffer ;
3438
35- // Padding
39+ // Padding to prevent false sharing
3640 @ SuppressWarnings ("unused" )
3741 private long p0 , p1 , p2 , p3 , p4 , p5 , p6 ;
3842
39- /** The next free slot for producers */
43+ /** Next free slot for producers (multi-threaded) */
4044 private volatile long tail = 0L ;
4145
42- // Padding
46+ // Padding around tail
4347 @ SuppressWarnings ("unused" )
4448 private long q0 , q1 , q2 , q3 , q4 , q5 , q6 ;
4549
46- // Padding
50+ /** Cached producer limit to reduce volatile head reads */
51+ private volatile long producerLimit = 0L ;
52+
53+ // Padding around producerLimit
4754 @ SuppressWarnings ("unused" )
48- private long p10 , p11 , p12 , p13 , p14 , p15 , p16 ;
55+ private long r0 , r1 , r2 , r3 , r4 , r5 , r6 ;
4956
50- /** The next slot to consume (single-threaded) */
57+ /** Next slot to consume (single-threaded) */
5158 private volatile long head = 0L ;
5259
53- // Padding
54- private long q10 , q11 , q12 , q13 , q14 , q15 , q16 ;
60+ // Padding around head
61+ @ SuppressWarnings ("unused" )
62+ private long s0 , s1 , s2 , s3 , s4 , s5 , s6 ;
5563
5664 /**
5765 * Creates a new MPSC queue.
5866 *
59- * @param requestedCapacity the desired capacity, rounded up to the next power of two if needed
67+ * @param requestedCapacity the desired capacity, rounded up to next power of two
6068 */
6169 public MpscArrayQueueVarHandle (int requestedCapacity ) {
6270 super (requestedCapacity );
6371 this .buffer = new Object [capacity ];
72+ this .producerLimit = capacity ;
6473 }
6574
6675 /**
6776 * Attempts to add an element to the queue.
6877 *
69- * <p>This method uses a CAS loop on {@code tail} to allow multiple producers to safely claim
70- * distinct slots. The producer then performs a release-store into the buffer using {@code
71- * ARRAY_HANDLE.setRelease()}.
72- *
7378 * @param e the element to add (must be non-null)
74- * @return {@code true} if the element was enqueued, {@code false} if the queue is full
79+ * @return true if element was enqueued, false if queue is full
7580 */
7681 @ Override
7782 public boolean offer (E e ) {
78- if (e == null ) {
79- throw new NullPointerException ();
83+ Objects .requireNonNull (e );
84+
85+ // jctools does the same local copy to have the jitter optimise the accesses
86+ final Object [] localBuffer = this .buffer ;
87+
88+ // depending on the thread id, choose a different backoff strategy.
89+ // Note: it reduces fairness but also the contention on the cas.
90+ boolean s0 = false , s1 = false , s2 = false ;
91+ switch ((int ) (Thread .currentThread ().getId () & 3 )) {
92+ case 0 :
93+ s0 = true ;
94+ break ;
95+ case 1 :
96+ s1 = true ;
97+ break ;
98+ case 2 :
99+ s2 = true ;
100+ break ;
101+ default :
102+ break ;
80103 }
81104
105+ long localProducerLimit = (long ) PRODUCER_LIMIT_HANDLE .getVolatile (this );
106+ long cachedHead = 0L ; // Local cache of head to reduce volatile reads
107+
82108 while (true ) {
83- final long currentTail = (long ) TAIL_HANDLE .getVolatile (this );
84- final long wrapPoint = currentTail - capacity ;
85- final long currentHead = (long ) HEAD_HANDLE .getVolatile (this );
109+ long currentTail = (long ) TAIL_HANDLE .getVolatile (this );
110+
111+ // Check if producer limit exceeded
112+ if (currentTail >= localProducerLimit ) {
113+ // Refresh head only when necessary
114+ cachedHead = (long ) HEAD_HANDLE .getVolatile (this );
115+ localProducerLimit = cachedHead + capacity ;
86116
87- if (wrapPoint >= currentHead ) {
88- return false ; // full
117+ if (currentTail >= localProducerLimit ) return false ; // queue full
118+
119+ // Update producerLimit so other producers also benefit
120+ PRODUCER_LIMIT_HANDLE .setVolatile (this , localProducerLimit );
89121 }
90122
123+ // Attempt to claim a slot
91124 if (TAIL_HANDLE .compareAndSet (this , currentTail , currentTail + 1 )) {
92125 final int index = (int ) (currentTail & mask );
93- ARRAY_HANDLE .setRelease (buffer , index , e );
126+
127+ // Release-store ensures producer's write is visible to consumer
128+ ARRAY_HANDLE .setRelease (localBuffer , index , e );
94129 return true ;
95130 }
96131
97- // Backoff on contention
98- LockSupport .parkNanos (1L );
132+ // Backoff to reduce contention
133+ if (s0 ) Thread .onSpinWait ();
134+ else if (s1 ) Thread .yield ();
135+ else if (s2 ) LockSupport .parkNanos (1 );
99136 }
100137 }
101138
102139 /**
103- * Removes and returns the next element, or {@code null} if the queue is empty.
104- *
105- * <p>This method is single-threaded (one consumer). It performs a volatile read of the buffer,
106- * and then uses {@code setRelease(null)} to free the slot.
140+ * Removes and returns the next element, or null if empty.
107141 *
108- * @return the dequeued element, or null if the queue is empty
142+ * @return dequeued element, or null if queue empty
109143 */
110144 @ Override
111145 @ SuppressWarnings ("unchecked" )
112146 public E poll () {
113- final long currentHead = (long ) HEAD_HANDLE .getOpaque (this );
147+ final Object [] localBuffer = this .buffer ;
148+
149+ long currentHead = (long ) HEAD_HANDLE .getOpaque (this );
114150 final int index = (int ) (currentHead & mask );
115151
116- Object value = ARRAY_HANDLE .getAcquire (buffer , index );
117- if (value == null ) {
118- return null ;
119- }
152+ // Acquire-load ensures visibility of producer write
153+ Object value = ARRAY_HANDLE .getAcquire (localBuffer , index );
154+ if (value == null ) return null ;
120155
121- ARRAY_HANDLE .setOpaque (buffer , index , null ); // clear slot
156+ // Clear the slot without additional fence
157+ ARRAY_HANDLE .setOpaque (localBuffer , index , null );
158+
159+ // Advance head using opaque write (consumer-only)
122160 HEAD_HANDLE .setOpaque (this , currentHead + 1 );
161+
123162 return (E ) value ;
124163 }
125164
165+ /**
166+ * Returns next element without removing it.
167+ *
168+ * <p>The memory visibility is only correct if the consumer calls it.
169+ *
170+ * @return next element or null if empty
171+ */
126172 @ Override
127173 @ SuppressWarnings ("unchecked" )
128174 public E peek () {
129175 final int index = (int ) ((long ) HEAD_HANDLE .getOpaque (this ) & mask );
130176 return (E ) ARRAY_HANDLE .getVolatile (buffer , index );
131177 }
132178
179+ /**
180+ * Returns number of elements in queue.
181+ *
182+ * <p>Volatile reads of tail and head ensure accurate result in multi-threaded context.
183+ *
184+ * @return current size
185+ */
133186 @ Override
134187 public int size () {
135- long currentHead = head ; // non-volatile read
188+ long currentHead = ( long ) HEAD_HANDLE . getVolatile ( this );
136189 long currentTail = (long ) TAIL_HANDLE .getVolatile (this );
137190 return (int ) (currentTail - currentHead );
138191 }
0 commit comments