@@ -18,13 +18,9 @@ var _ Queue[any] = (*Blocking[any])(nil)
1818// If there are no elements available the retrieve operations wait until
1919// elements are added to the queue.
2020type Blocking [T comparable ] struct {
21- // elements queue
22- elements []T
23- elementsIndex int
24-
25- initialLen int
26-
27- capacity * int
21+ initialElems []T
22+ elems []T
23+ capacity * int
2824
2925 // synchronization
3026 lock sync.RWMutex
@@ -45,20 +41,23 @@ func NewBlocking[T comparable](
4541 o .apply (& options )
4642 }
4743
44+ // Store initial elements
45+ initialElems := make ([]T , len (elems ))
46+ copy (initialElems , elems )
47+
4848 queue := & Blocking [T ]{
49- elements : elems ,
50- elementsIndex : 0 ,
51- initialLen : len (elems ),
52- capacity : options .capacity ,
53- lock : sync.RWMutex {},
49+ elems : elems ,
50+ initialElems : initialElems ,
51+ capacity : options .capacity ,
52+ lock : sync.RWMutex {},
5453 }
5554
5655 queue .notEmptyCond = sync .NewCond (& queue .lock )
5756 queue .notFullCond = sync .NewCond (& queue .lock )
5857
5958 if queue .capacity != nil {
60- if len (queue .elements ) > * queue .capacity {
61- queue .elements = queue .elements [:* queue .capacity ]
59+ if len (queue .elems ) > * queue .capacity {
60+ queue .elems = queue .elems [:* queue .capacity ]
6261 }
6362 }
6463
@@ -77,7 +76,7 @@ func (bq *Blocking[T]) OfferWait(elem T) {
7776 bq .notFullCond .Wait ()
7877 }
7978
80- bq .elements = append (bq .elements , elem )
79+ bq .elems = append (bq .elems , elem )
8180
8281 bq .notEmptyCond .Signal ()
8382}
@@ -92,22 +91,21 @@ func (bq *Blocking[T]) Offer(elem T) error {
9291 return ErrQueueIsFull
9392 }
9493
95- bq .elements = append (bq .elements , elem )
94+ bq .elems = append (bq .elems , elem )
9695
9796 bq .notEmptyCond .Signal ()
9897
9998 return nil
10099}
101100
102- // Reset sets the queue elements index to 0. The queue will be in its initial
103- // state.
101+ // Reset sets the queue to its initial state with the original elements.
104102func (bq * Blocking [T ]) Reset () {
105103 bq .lock .Lock ()
106104 defer bq .lock .Unlock ()
107105
108- bq . elementsIndex = 0
109-
110- bq .elements = bq .elements [: bq . initialLen ]
106+ // Restore initial elements
107+ bq . elems = make ([] T , len ( bq . initialElems ))
108+ copy ( bq .elems , bq .initialElems )
111109
112110 bq .notEmptyCond .Broadcast ()
113111}
@@ -117,29 +115,24 @@ func (bq *Blocking[T]) Reset() {
117115// GetWait removes and returns the head of the elements queue.
118116// If no element is available it waits until the queue
119117// has an element available.
120- //
121- // It does not actually remove elements from the elements slice, but
122- // it's incrementing the underlying index.
123118func (bq * Blocking [T ]) GetWait () (v T ) {
124119 bq .lock .Lock ()
125120 defer bq .lock .Unlock ()
126121
127- defer bq .notFullCond . Signal ()
128-
129- idx := bq . getNextIndexOrWait ()
122+ for bq .isEmpty () {
123+ bq . notEmptyCond . Wait ()
124+ }
130125
131- elem := bq .elements [idx ]
126+ elem := bq .elems [0 ]
127+ bq .elems = bq .elems [1 :]
132128
133- bq .elementsIndex ++
129+ bq .notFullCond . Signal ()
134130
135131 return elem
136132}
137133
138134// Get removes and returns the head of the elements queue.
139135// If no element is available it returns an ErrNoElementsAvailable error.
140- //
141- // It does not actually remove elements from the elements slice, but
142- // it's incrementing the underlying index.
143136func (bq * Blocking [T ]) Get () (v T , _ error ) {
144137 bq .lock .Lock ()
145138 defer bq .lock .Unlock ()
@@ -154,9 +147,9 @@ func (bq *Blocking[T]) Clear() []T {
154147
155148 defer bq .notFullCond .Broadcast ()
156149
157- removed := bq . elements [ bq .elementsIndex :]
158-
159- bq .elementsIndex += len ( removed )
150+ removed := make ([] T , len ( bq .elems ))
151+ copy ( removed , bq . elems )
152+ bq .elems = bq . elems [: 0 ]
160153
161154 return removed
162155}
@@ -199,9 +192,7 @@ func (bq *Blocking[T]) Peek() (v T, _ error) {
199192 return v , ErrNoElementsAvailable
200193 }
201194
202- elem := bq .elements [bq .elementsIndex ]
203-
204- return elem , nil
195+ return bq .elems [0 ], nil
205196}
206197
207198// PeekWait retrieves but does not return the head of the queue.
@@ -215,7 +206,7 @@ func (bq *Blocking[T]) PeekWait() T {
215206 bq .notEmptyCond .Wait ()
216207 }
217208
218- elem := bq .elements [ bq . elementsIndex ]
209+ elem := bq .elems [ 0 ]
219210
220211 // send the not empty signal again in case any remove method waits.
221212 bq .notEmptyCond .Signal ()
@@ -228,16 +219,16 @@ func (bq *Blocking[T]) Size() int {
228219 bq .lock .RLock ()
229220 defer bq .lock .RUnlock ()
230221
231- return bq .size ( )
222+ return len ( bq .elems )
232223}
233224
234225// Contains returns true if the queue contains the given element.
235226func (bq * Blocking [T ]) Contains (elem T ) bool {
236227 bq .lock .RLock ()
237228 defer bq .lock .RUnlock ()
238229
239- for i := range bq .elements [ bq . elementsIndex :] {
240- if bq . elements [ i ] == elem {
230+ for _ , e := range bq .elems {
231+ if e == elem {
241232 return true
242233 }
243234 }
@@ -255,20 +246,9 @@ func (bq *Blocking[T]) IsEmpty() bool {
255246
256247// ===================================Helpers==================================
257248
258- // getNextIndexOrWait returns the next available index of the elements slice.
259- func (bq * Blocking [T ]) getNextIndexOrWait () int {
260- if ! bq .isEmpty () {
261- return bq .elementsIndex
262- }
263-
264- bq .notEmptyCond .Wait ()
265-
266- return bq .getNextIndexOrWait ()
267- }
268-
269249// isEmpty returns true if the queue is empty.
270250func (bq * Blocking [T ]) isEmpty () bool {
271- return bq . elementsIndex >= len (bq .elements )
251+ return len (bq .elems ) == 0
272252}
273253
274254// isFull returns true if the queue is full.
@@ -277,43 +257,37 @@ func (bq *Blocking[T]) isFull() bool {
277257 return false
278258 }
279259
280- return len (bq .elements ) - bq . elementsIndex >= * bq .capacity
260+ return len (bq .elems ) >= * bq .capacity
281261}
282262
283263func (bq * Blocking [T ]) size () int {
284- return len (bq .elements ) - bq . elementsIndex
264+ return len (bq .elems )
285265}
286266
287267func (bq * Blocking [T ]) get () (v T , _ error ) {
288- defer bq .notFullCond .Signal ()
289-
290268 if bq .isEmpty () {
291269 return v , ErrNoElementsAvailable
292270 }
293271
294- elem := bq .elements [bq .elementsIndex ]
272+ elem := bq .elems [0 ]
273+ bq .elems = bq .elems [1 :]
295274
296- bq .elementsIndex ++
275+ bq .notFullCond . Signal ()
297276
298277 return elem , nil
299278}
300279
301280// MarshalJSON serializes the Blocking queue to JSON.
302281func (bq * Blocking [T ]) MarshalJSON () ([]byte , error ) {
303282 bq .lock .RLock ()
283+ defer bq .lock .RUnlock ()
304284
305- if bq .IsEmpty () {
306- bq .lock .RUnlock ()
285+ if bq .isEmpty () {
307286 return []byte ("[]" ), nil
308287 }
309288
310- // Extract elements from `elements` starting at `elementsIndex`.
311- elements := bq .elements [bq .elementsIndex :]
312-
313- bq .lock .RUnlock ()
314-
315289 // Marshal the slice of elements into JSON.
316- data , err := json .Marshal (elements )
290+ data , err := json .Marshal (bq . elems )
317291 if err != nil {
318292 return nil , fmt .Errorf ("failed to marshal blocking queue: %w" , err )
319293 }
0 commit comments