Skip to content

Commit 456039a

Browse files
committed
Reduce some codes of BufferedChannelQueue.
1 parent a11ba3b commit 456039a

File tree

1 file changed

+45
-63
lines changed

1 file changed

+45
-63
lines changed

queue.go

Lines changed: 45 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -486,59 +486,62 @@ func NewBufferedChannelQueue[T any](channelCapacity int, bufferSizeMaximum int,
486486
}
487487

488488
func (q *BufferedChannelQueue[T]) freeNodePool() {
489-
for code := range q.freeNodeWorkerCh {
490-
if code > 0 {
491-
time.Sleep(q.freeNodeHookPoolIntervalDuration)
489+
for range q.freeNodeWorkerCh {
490+
time.Sleep(q.freeNodeHookPoolIntervalDuration)
492491

493-
if q.isClosed.Get() {
494-
break
495-
}
496-
497-
if q.pool.nodeCount > q.nodeHookPoolSize {
498-
q.lock.Lock()
499-
q.pool.KeepNodePoolCount(q.nodeHookPoolSize)
500-
q.lock.Unlock()
501-
}
502-
503-
} else {
492+
if q.isClosed.Get() {
504493
break
505494
}
495+
496+
if q.pool.nodeCount > q.nodeHookPoolSize {
497+
q.lock.Lock()
498+
q.pool.KeepNodePoolCount(q.nodeHookPoolSize)
499+
q.lock.Unlock()
500+
}
506501
}
507502
}
508503
func (q *BufferedChannelQueue[T]) loadFromPool() {
509-
for code := range q.loadWorkerCh {
510-
if code > 0 {
504+
for range q.loadWorkerCh {
511505

512-
if q.isClosed.Get() {
513-
break
514-
}
506+
if q.isClosed.Get() {
507+
break
508+
}
515509

516-
var val T
517-
var pollErr, offerErr error
510+
q.lock.Lock()
518511

519-
q.lock.Lock()
520-
for pollErr == nil {
521-
// Try poll from the pool
522-
val, pollErr = q.pool.Poll()
523-
if pollErr != nil {
524-
break
525-
}
526-
527-
offerErr = q.blockingQueue.Offer(val)
528-
// If failed, unshift it back
529-
if offerErr != nil {
530-
q.pool.Unshift(val)
531-
break
532-
}
512+
var val T
513+
var pollErr, offerErr error
514+
515+
for q.pool.Count() > 0 {
516+
// Try poll from the pool
517+
val, pollErr = q.pool.Poll()
518+
if pollErr != nil {
519+
break
533520
}
534-
q.lock.Unlock()
535521

536-
time.Sleep(q.loadFromPoolDuration)
537-
} else {
538-
break
522+
offerErr = q.blockingQueue.Offer(val)
523+
// If failed, unshift it back
524+
if offerErr != nil {
525+
q.pool.Unshift(val)
526+
break
527+
}
539528
}
529+
q.lock.Unlock()
530+
531+
time.Sleep(q.loadFromPoolDuration)
532+
540533
}
541534
}
535+
func (q *BufferedChannelQueue[T]) notifyWorkers() {
536+
q.lock.RLock()
537+
if q.pool.Count() > 0 {
538+
q.loadWorkerCh.Offer(1)
539+
}
540+
if q.pool.nodeCount > q.nodeHookPoolSize {
541+
q.freeNodeWorkerCh.Offer(1)
542+
}
543+
q.lock.RUnlock()
544+
}
542545

543546
// SetBufferSizeMaximum Set MaximumBufferSize(maximum number of buffered items outside the ChannelQueue)
544547
func (q *BufferedChannelQueue[T]) SetBufferSizeMaximum(size int) {
@@ -605,14 +608,7 @@ func (q *BufferedChannelQueue[T]) Take() (T, error) {
605608
return *new(T), ErrQueueIsClosed
606609
}
607610

608-
q.lock.RLock()
609-
if q.pool.Count() > 0 {
610-
q.loadWorkerCh.Offer(1)
611-
}
612-
if q.pool.nodeCount > q.nodeHookPoolSize {
613-
q.freeNodeWorkerCh.Offer(1)
614-
}
615-
q.lock.RUnlock()
611+
q.notifyWorkers()
616612

617613
return q.blockingQueue.Take()
618614
}
@@ -626,14 +622,7 @@ func (q *BufferedChannelQueue[T]) TakeWithTimeout(timeout time.Duration) (T, err
626622
return *new(T), ErrQueueIsClosed
627623
}
628624

629-
q.lock.RLock()
630-
if q.pool.Count() > 0 {
631-
q.loadWorkerCh.Offer(1)
632-
}
633-
if q.pool.nodeCount > q.nodeHookPoolSize {
634-
q.freeNodeWorkerCh.Offer(1)
635-
}
636-
q.lock.RUnlock()
625+
q.notifyWorkers()
637626

638627
return q.blockingQueue.TakeWithTimeout(timeout)
639628
}
@@ -675,14 +664,7 @@ func (q *BufferedChannelQueue[T]) Poll() (T, error) {
675664
return *new(T), ErrQueueIsClosed
676665
}
677666

678-
q.lock.RLock()
679-
if q.pool.Count() > 0 {
680-
q.loadWorkerCh.Offer(1)
681-
}
682-
if q.pool.nodeCount > q.nodeHookPoolSize {
683-
q.freeNodeWorkerCh.Offer(1)
684-
}
685-
q.lock.RUnlock()
667+
q.notifyWorkers()
686668

687669
return q.blockingQueue.Poll()
688670
}

0 commit comments

Comments
 (0)