Skip to content

Commit e10179f

Browse files
authored
chore: housekeeping — doc fixes, Iterator lock, modulo and MarshalJSON cleanups (#59)
* docs: fix stale package doc and copy-paste comment typos - doc.go: package overview said "2 available implementations" but describes 4. Also drop the inaccurate "Lesser interface" line — Priority takes a less function at construction, not an interface. - circular.go: Queue-implementation guard's doc comment said "Ensure Priority implements..." (copy-paste from priority.go). - linked.go: comment above isEmpty() said "IsEmpty returns true...". Refs #47 * refactor(linked): hold the lock for the whole Iterator call Iterator was calling Clear, which acquired and released the write lock, then built the channel outside any lock. The other queue implementations hold their lock for the duration of Iterator; this brings Linked in line. Extracted the drain into a locked helper so Clear can share it. Refs #47 * perf: small cleanups in Contains, MarshalJSON, and helpers - circular.go: walk Contains/MarshalJSON in two contiguous chunks instead of computing modulo per element. - priority.go: MarshalJSON sorts a copy with sort.Slice instead of heap.Init + N heap.Pop. Same output, fewer allocations and a cache-friendlier loop. - blocking.go: drop the unused-elsewhere size() helper; the only caller was Iterator and len(bq.elems) is just as readable. No behaviour changes. Adds a Circular.Contains test for the wrap-around chunk so the new branch is exercised. Refs #47
1 parent b458967 commit e10179f

6 files changed

Lines changed: 82 additions & 41 deletions

File tree

blocking.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (bq *Blocking[T]) Iterator() <-chan T {
181181
defer bq.lock.Unlock()
182182

183183
// use a buffered channel to avoid blocking the iterator.
184-
iteratorCh := make(chan T, bq.size())
184+
iteratorCh := make(chan T, len(bq.elems))
185185

186186
// close the channel when the function returns.
187187
defer close(iteratorCh)
@@ -276,10 +276,6 @@ func (bq *Blocking[T]) isFull() bool {
276276
return len(bq.elems) >= *bq.capacity
277277
}
278278

279-
func (bq *Blocking[T]) size() int {
280-
return len(bq.elems)
281-
}
282-
283279
func (bq *Blocking[T]) get() (v T, _ error) {
284280
if bq.isEmpty() {
285281
return v, ErrNoElementsAvailable

circular.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"sync"
66
)
77

8-
// Ensure Priority implements the Queue interface.
8+
// Ensure Circular implements the Queue interface.
99
var _ Queue[any] = (*Circular[any])(nil)
1010

1111
// Circular is a Queue implementation.
@@ -189,18 +189,29 @@ func (q *Circular[T]) Contains(elem T) bool {
189189
defer q.lock.RUnlock()
190190

191191
if q.isEmpty() {
192-
return false // queue is empty, item not found
192+
return false
193193
}
194194

195-
for i := 0; i < q.size; i++ {
196-
idx := (q.head + i) % len(q.elems)
195+
// Walk head..end, then wrap to 0..tail. Avoids a modulo per
196+
// iteration in the hot path.
197+
firstChunk := len(q.elems) - q.head
198+
if firstChunk > q.size {
199+
firstChunk = q.size
200+
}
197201

198-
if q.elems[idx] == elem {
199-
return true // item found
202+
for i := 0; i < firstChunk; i++ {
203+
if q.elems[q.head+i] == elem {
204+
return true
200205
}
201206
}
202207

203-
return false // item not found
208+
for i := 0; i < q.size-firstChunk; i++ {
209+
if q.elems[i] == elem {
210+
return true
211+
}
212+
}
213+
214+
return false
204215
}
205216

206217
// Peek returns the element at the head of the queue.
@@ -274,14 +285,18 @@ func (q *Circular[T]) MarshalJSON() ([]byte, error) {
274285
return []byte("[]"), nil
275286
}
276287

277-
// Collect elements in logical order from head to tail.
288+
// Collect elements in logical order: head..end of array, then
289+
// wrap to 0..tail. Two contiguous copies, no per-element modulo.
278290
elements := make([]T, q.size)
279291

280-
for i := 0; i < q.size; i++ {
281-
index := (q.head + i) % len(q.elems)
282-
elements[i] = q.elems[index]
292+
firstChunk := len(q.elems) - q.head
293+
if firstChunk > q.size {
294+
firstChunk = q.size
283295
}
284296

297+
copy(elements, q.elems[q.head:q.head+firstChunk])
298+
copy(elements[firstChunk:], q.elems[:q.size-firstChunk])
299+
285300
q.lock.RUnlock()
286301

287302
return json.Marshal(elements)

circular_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,37 @@ func testCircularContains(t *testing.T) {
316316
t.Fatal("expected elem to not be found")
317317
}
318318
})
319+
320+
t.Run("WrapsAroundBackingArray", func(t *testing.T) {
321+
t.Parallel()
322+
323+
// Force the queue to span the array's wrap point so Contains
324+
// has to walk both the head..end and 0..tail chunks.
325+
circularQueue := queue.NewCircular([]int{1, 2, 3, 4}, 4)
326+
327+
// Pop two from the head, push two new at the tail. Logical
328+
// queue is now [3, 4, 5, 6] but the backing array is
329+
// [5, 6, 3, 4] with head=2, tail=2.
330+
_, _ = circularQueue.Get()
331+
_, _ = circularQueue.Get()
332+
333+
if err := circularQueue.Offer(5); err != nil {
334+
t.Fatalf("offer 5: %v", err)
335+
}
336+
337+
if err := circularQueue.Offer(6); err != nil {
338+
t.Fatalf("offer 6: %v", err)
339+
}
340+
341+
// 4 lives in the head chunk, 5 lives in the wrapped chunk.
342+
if !circularQueue.Contains(4) {
343+
t.Fatal("expected to find 4 in the head chunk")
344+
}
345+
346+
if !circularQueue.Contains(5) {
347+
t.Fatal("expected to find 5 in the wrapped chunk")
348+
}
349+
})
319350
}
320351

321352
func testCircularClear(t *testing.T) {

doc.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
// Package queue provides multiple thread-safe generic queue implementations.
2-
// Currently, there are 2 available implementations:
2+
// Currently, there are 4 available implementations:
33
//
44
// A blocking queue, which provides methods that wait for the
55
// queue to have available elements when attempting to retrieve an element, and
66
// waits for a free slot when attempting to insert an element.
77
//
8-
// A priority queue based on a container.Heap. The elements in the queue
9-
// must implement the Lesser interface, and are ordered based on the
10-
// Less method. The head of the queue is always the highest priority element.
8+
// A priority queue based on container/heap. Order is defined by a less
9+
// function supplied at construction; the head of the queue is always the
10+
// highest priority element.
1111
//
1212
// A circular queue, which is a queue that uses a fixed-size slice as
1313
// if it were connected end-to-end. When the queue is full, adding a new element to the queue

linked.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,18 @@ func (lq *Linked[T]) IsEmpty() bool {
188188
return lq.isEmpty()
189189
}
190190

191-
// IsEmpty returns true if the queue is empty, false otherwise.
191+
// isEmpty returns true if the queue is empty, false otherwise.
192192
func (lq *Linked[T]) isEmpty() bool {
193193
return lq.size == 0
194194
}
195195

196196
// Iterator returns a channel that will be filled with the elements.
197197
// It removes the elements from the queue.
198198
func (lq *Linked[T]) Iterator() <-chan T {
199-
elems := lq.Clear()
199+
lq.lock.Lock()
200+
defer lq.lock.Unlock()
201+
202+
elems := lq.drainLocked()
200203

201204
ch := make(chan T, len(elems))
202205

@@ -214,6 +217,12 @@ func (lq *Linked[T]) Clear() []T {
214217
lq.lock.Lock()
215218
defer lq.lock.Unlock()
216219

220+
return lq.drainLocked()
221+
}
222+
223+
// drainLocked collects all elements in order and resets the queue.
224+
// Caller must hold the write lock.
225+
func (lq *Linked[T]) drainLocked() []T {
217226
elements := make([]T, lq.size)
218227

219228
current := lq.head

priority.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -272,31 +272,21 @@ func (pq *Priority[T]) Size() int {
272272
return pq.elements.Len()
273273
}
274274

275-
// MarshalJSON serializes the Priority queue to JSON.
275+
// MarshalJSON serializes the Priority queue to JSON in priority order.
276276
func (pq *Priority[T]) MarshalJSON() ([]byte, error) {
277277
pq.lock.RLock()
278278

279-
// Create a temporary copy of the heap to extract elements in order.
280-
tempHeap := &priorityHeap[T]{
281-
elems: make([]T, len(pq.elements.elems)),
282-
lessFunc: pq.elements.lessFunc,
283-
}
284-
285-
copy(tempHeap.elems, pq.elements.elems)
279+
output := make([]T, len(pq.elements.elems))
280+
copy(output, pq.elements.elems)
281+
lessFunc := pq.elements.lessFunc
286282

287283
pq.lock.RUnlock()
288284

289-
heap.Init(tempHeap)
290-
291-
output := make([]T, len(tempHeap.elems))
292-
293-
i := 0
294-
295-
for tempHeap.Len() > 0 {
296-
// nolint: forcetypeassert, revive
297-
output[i] = heap.Pop(tempHeap).(T)
298-
i++
299-
}
285+
// Sorting the copy gives the same result as draining a heap and is
286+
// cache-friendlier than heap.Init + N heap.Pop calls.
287+
sort.Slice(output, func(i, j int) bool {
288+
return lessFunc(output[i], output[j])
289+
})
300290

301291
return json.Marshal(output)
302292
}

0 commit comments

Comments
 (0)