1717package com .amazon .sns .messaging .lib .concurrent ;
1818
1919import java .util .AbstractQueue ;
20- import java .util .Arrays ;
2120import java .util .Collection ;
2221import java .util .Iterator ;
2322import java .util .concurrent .BlockingQueue ;
2423import java .util .concurrent .TimeUnit ;
2524import java .util .concurrent .atomic .AtomicInteger ;
25+ import java .util .concurrent .atomic .AtomicLong ;
26+ import java .util .concurrent .atomic .AtomicReferenceArray ;
2627import java .util .concurrent .locks .Condition ;
2728import java .util .concurrent .locks .ReentrantLock ;
29+ import java .util .stream .IntStream ;
2830
2931import lombok .Getter ;
3032import lombok .Setter ;
3133import lombok .SneakyThrows ;
3234
33- @ SuppressWarnings ({ "java:S2274" , "unchecked" })
35+ @ SuppressWarnings ({ "java:S2274" })
3436public class RingBufferBlockingQueue <E > extends AbstractQueue <E > implements BlockingQueue <E > {
3537
3638 private static final int DEFAULT_CAPACITY = 2048 ;
3739
38- private final Entry <E >[] buffer ;
40+ private final AtomicReferenceArray < Entry <E >> buffer ;
3941
4042 private final int capacity ;
4143
42- private final AtomicInteger writeSequence = new AtomicInteger (-1 );
44+ private final AtomicLong writeSequence = new AtomicLong (-1 );
4345
44- private final AtomicInteger readSequence = new AtomicInteger (0 );
46+ private final AtomicLong readSequence = new AtomicLong (0 );
47+
48+ private final AtomicInteger size = new AtomicInteger (0 );
4549
4650 private final ReentrantLock reentrantLock ;
4751
48- private final Condition notEmpty ;
52+ private final Condition waitingConsumer ;
4953
50- private final Condition notFull ;
54+ private final Condition waitingProducer ;
5155
5256 public RingBufferBlockingQueue (final int capacity ) {
5357 this .capacity = capacity ;
54- this .buffer = new Entry [capacity ];
55- Arrays .setAll (buffer , p -> new Entry <>());
58+ buffer = new AtomicReferenceArray <>(capacity );
5659 reentrantLock = new ReentrantLock (true );
57- notEmpty = reentrantLock .newCondition ();
58- notFull = reentrantLock .newCondition ();
60+ waitingConsumer = reentrantLock .newCondition ();
61+ waitingProducer = reentrantLock .newCondition ();
62+ IntStream .range (0 , capacity ).forEach (idx -> buffer .set (idx , new Entry <>()));
5963 }
6064
6165 public RingBufferBlockingQueue () {
6266 this (DEFAULT_CAPACITY );
6367 }
6468
65- private void enqueue (final E element ) throws InterruptedException {
66- while (isFull ()) {
67- notFull .await ();
68- }
69-
70- final int nextWriteSeq = writeSequence .get () + 1 ;
71- buffer [wrap (nextWriteSeq )].setValue (element );
72- writeSequence .incrementAndGet ();
73- notEmpty .signal ();
69+ private long avoidSequenceOverflow (final long sequence ) {
70+ return (sequence < Long .MAX_VALUE ? sequence : wrap (sequence ));
7471 }
7572
76- private E dequeue () throws InterruptedException {
77- while (isEmpty ()) {
78- notEmpty .await ();
79- }
80-
81- final E nextValue = buffer [wrap (readSequence .get ())].getValue ();
82- readSequence .incrementAndGet ();
83- notFull .signal ();
84- return nextValue ;
85- }
86-
87- private int wrap (final int sequence ) {
88- return sequence % capacity ;
73+ private int wrap (final long sequence ) {
74+ return Math .toIntExact (sequence % capacity );
8975 }
9076
9177 @ Override
9278 public int size () {
93- return ( writeSequence .get () - readSequence . get ()) + 1 ;
79+ return size .get ();
9480 }
9581
9682 @ Override
9783 public boolean isEmpty () {
98- return writeSequence .get () < readSequence . get () ;
84+ return size .get () == 0 ;
9985 }
10086
10187 public boolean isFull () {
102- return size () >= capacity ;
88+ return size . get () >= capacity ;
10389 }
10490
105- public int writeSequence () {
91+ public long writeSequence () {
10692 return writeSequence .get ();
10793 }
10894
109- public int readSequence () {
95+ public long readSequence () {
11096 return readSequence .get ();
11197 }
11298
11399 @ Override
114100 public E peek () {
115- return isEmpty () ? null : buffer [ wrap (readSequence .get ())] .getValue ();
101+ return isEmpty () ? null : buffer . get ( wrap (readSequence .get ())) .getValue ();
116102 }
117103
118104 @ Override
119105 @ SneakyThrows
120106 public void put (final E element ) {
121107 try {
122108 reentrantLock .lock ();
123- enqueue (element );
109+
110+ while (isFull ()) {
111+ waitingProducer .await ();
112+ }
113+
114+ final long prevWriteSeq = writeSequence .get ();
115+ final long nextWriteSeq = avoidSequenceOverflow (prevWriteSeq ) + 1 ;
116+
117+ buffer .get (wrap (nextWriteSeq )).setValue (element );
118+
119+ writeSequence .compareAndSet (prevWriteSeq , nextWriteSeq );
120+
121+ size .incrementAndGet ();
122+
123+ waitingConsumer .signal ();
124124 } finally {
125125 reentrantLock .unlock ();
126126 }
@@ -131,7 +131,25 @@ public void put(final E element) {
131131 public E take () {
132132 try {
133133 reentrantLock .lock ();
134- return dequeue ();
134+
135+ while (isEmpty ()) {
136+ waitingConsumer .await ();
137+ }
138+
139+ final long prevReadSeq = readSequence .get ();
140+ final long nextReadSeq = avoidSequenceOverflow (prevReadSeq ) + 1 ;
141+
142+ final E nextValue = buffer .get (wrap (prevReadSeq )).getValue ();
143+
144+ buffer .get (wrap (prevReadSeq )).setValue (null );
145+
146+ readSequence .compareAndSet (prevReadSeq , nextReadSeq );
147+
148+ size .decrementAndGet ();
149+
150+ waitingProducer .signal ();
151+
152+ return nextValue ;
135153 } finally {
136154 reentrantLock .unlock ();
137155 }
0 commit comments