Skip to content

Commit 698d6d8

Browse files
committed
Fix queue potential bugs
1 parent 23d8e28 commit 698d6d8

File tree

2 files changed

+51
-22
lines changed

2 files changed

+51
-22
lines changed

queue.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -547,11 +547,11 @@ func (q *BufferedChannelQueue[T]) freeNodePool() {
547547
break
548548
}
549549

550+
q.lock.Lock()
550551
if q.pool.nodeCount > q.nodeHookPoolSize {
551-
q.lock.Lock()
552552
q.pool.KeepNodePoolCount(q.nodeHookPoolSize)
553-
q.lock.Unlock()
554553
}
554+
q.lock.Unlock()
555555
}
556556
}
557557

@@ -589,14 +589,8 @@ func (q *BufferedChannelQueue[T]) loadFromPool() {
589589
}
590590

591591
func (q *BufferedChannelQueue[T]) notifyWorkers() {
592-
q.lock.RLock()
593-
if q.pool.Count() > 0 {
594-
q.loadWorkerCh.Offer(1)
595-
}
596-
if q.pool.nodeCount > q.nodeHookPoolSize {
597-
q.freeNodeWorkerCh.Offer(1)
598-
}
599-
q.lock.RUnlock()
592+
q.loadWorkerCh.Offer(1)
593+
q.freeNodeWorkerCh.Offer(1)
600594
}
601595

602596
// SetBufferSizeMaximum Set MaximumBufferSize(maximum number of buffered items outside the ChannelQueue)
@@ -656,6 +650,9 @@ func (q *BufferedChannelQueue[T]) Count() int {
656650
return 0
657651
}
658652

653+
q.lock.RLock()
654+
defer q.lock.RUnlock()
655+
659656
return len(q.blockingQueue) + q.pool.Count()
660657
}
661658

@@ -711,9 +708,6 @@ func (q *BufferedChannelQueue[T]) Put(val T) error {
711708

712709
// Take Take the T val(blocking)
713710
func (q *BufferedChannelQueue[T]) Take() (T, error) {
714-
// q.lock.RLock()
715-
// defer q.lock.RUnlock()
716-
717711
if q.isClosed.Get() {
718712
return *new(T), ErrQueueIsClosed
719713
}
@@ -725,9 +719,6 @@ func (q *BufferedChannelQueue[T]) Take() (T, error) {
725719

726720
// TakeWithTimeout Take the T val(blocking), with timeout
727721
func (q *BufferedChannelQueue[T]) TakeWithTimeout(timeout time.Duration) (T, error) {
728-
// q.lock.RLock()
729-
// defer q.lock.RUnlock()
730-
731722
if q.isClosed.Get() {
732723
return *new(T), ErrQueueIsClosed
733724
}
@@ -775,9 +766,6 @@ func (q *BufferedChannelQueue[T]) Offer(val T) error {
775766

776767
// Poll Poll the T val(non-blocking)
777768
func (q *BufferedChannelQueue[T]) Poll() (T, error) {
778-
// q.lock.RLock()
779-
// defer q.lock.RUnlock()
780-
781769
if q.isClosed.Get() {
782770
return *new(T), ErrQueueIsClosed
783771
}

queue_test.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,51 @@ func TestNewBufferedChannelQueue(t *testing.T) {
301301
assert.Equal(t, nil, err)
302302

303303
// Async
304+
asyncTaskDone := make(chan bool)
305+
306+
bufferedChannelQueue.SetBufferSizeMaximum(6)
307+
timeout = 2 * time.Millisecond
308+
go func() {
309+
time.Sleep(timeout)
310+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
311+
assert.Equal(t, nil, err)
312+
assert.Equal(t, 1, result)
313+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
314+
assert.Equal(t, nil, err)
315+
assert.Equal(t, 2, result)
316+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
317+
assert.Equal(t, nil, err)
318+
assert.Equal(t, 3, result)
319+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
320+
assert.Equal(t, nil, err)
321+
assert.Equal(t, 4, result)
322+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
323+
assert.Equal(t, nil, err)
324+
assert.Equal(t, 5, result)
325+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
326+
assert.Equal(t, nil, err)
327+
assert.Equal(t, 6, result)
328+
asyncTaskDone <- true
329+
}()
330+
go func() {
331+
err = bufferedChannelQueue.Put(1)
332+
assert.Equal(t, nil, err)
333+
err = bufferedChannelQueue.Put(2)
334+
assert.Equal(t, nil, err)
335+
err = bufferedChannelQueue.Put(3)
336+
assert.Equal(t, nil, err)
337+
err = bufferedChannelQueue.Put(4)
338+
assert.Equal(t, nil, err)
339+
err = bufferedChannelQueue.Put(5)
340+
assert.Equal(t, nil, err)
341+
err = bufferedChannelQueue.Put(6)
342+
assert.Equal(t, nil, err)
343+
}()
344+
345+
<-asyncTaskDone
304346

305347
bufferedChannelQueue.SetBufferSizeMaximum(10000)
306-
timeout = 1 * time.Millisecond
307-
asyncTaskDone := make(chan bool)
348+
timeout = 10 * time.Millisecond
308349
go func() {
309350
for i := 1; i <= 10000; i++ {
310351
result, err := bufferedChannelQueue.TakeWithTimeout(timeout)
@@ -331,6 +372,6 @@ func TestNewBufferedChannelQueue(t *testing.T) {
331372

332373
time.Sleep(1 * timeout)
333374

334-
assert.GreaterOrEqual(t, 100, bufferedChannelQueue.pool.nodeCount)
375+
assert.GreaterOrEqual(t, bufferedChannelQueue.pool.nodeCount, 100)
335376
close(asyncTaskDone)
336377
}

0 commit comments

Comments
 (0)