Skip to content

Commit 17943a6

Browse files
committed
Implement BufferedChannelQueue.
1 parent dc3686f commit 17943a6

File tree

2 files changed

+324
-0
lines changed

2 files changed

+324
-0
lines changed

queue.go

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ var (
2525
ErrQueueIsEmpty = errors.New("queue is empty")
2626
// ErrQueueIsFull Queue Is Full
2727
ErrQueueIsFull = errors.New("queue is full")
28+
// ErrQueueIsClosed Queue Is Closed
29+
ErrQueueIsClosed = errors.New("queue is closed")
2830
// ErrQueueTakeTimeout Queue Take Timeout
2931
ErrQueueTakeTimeout = errors.New("queue take timeout")
3032
// ErrQueuePutTimeout Queue Put Timeout
@@ -442,3 +444,245 @@ func (q *LinkedListQueue[T]) recycleNode(node *DoublyListItem[T]) {
442444
node.Prev = nil
443445
q.nodePoolFirst = node
444446
}
447+
448+
// BufferedChannelQueue BlockingQueue with ChannelQueue & scalable pool, inspired by Collection utils
449+
type BufferedChannelQueue[T any] struct {
450+
lock sync.RWMutex
451+
isClosed AtomBool
452+
453+
loadWorkerCh ChannelQueue[int]
454+
freeNodeWorkerCh ChannelQueue[int]
455+
456+
loadFromPoolDuration time.Duration
457+
freeNodeHookPoolIntervalDuration time.Duration
458+
nodeHookPoolSize int
459+
bufferSizeMaximum int
460+
461+
blockingQueue ChannelQueue[T]
462+
pool *LinkedListQueue[T]
463+
}
464+
465+
// NewBufferedChannelQueue New BufferedChannelQueue instance from a Queue[T]
466+
func NewBufferedChannelQueue[T any](channelCapacity int, bufferSizeMaximum int, nodeHookPoolSize int) *BufferedChannelQueue[T] {
467+
pool := NewLinkedListQueue[T]()
468+
469+
newOne := &BufferedChannelQueue[T]{
470+
loadWorkerCh: NewChannelQueue[int](1),
471+
freeNodeWorkerCh: NewChannelQueue[int](1),
472+
473+
blockingQueue: NewChannelQueue[T](channelCapacity),
474+
pool: pool,
475+
476+
loadFromPoolDuration: 10 * time.Millisecond,
477+
freeNodeHookPoolIntervalDuration: 10 * time.Millisecond,
478+
479+
nodeHookPoolSize: nodeHookPoolSize,
480+
bufferSizeMaximum: bufferSizeMaximum,
481+
}
482+
go newOne.freeNodePool()
483+
go newOne.loadFromPool()
484+
485+
return newOne
486+
}
487+
488+
func (q *BufferedChannelQueue[T]) freeNodePool() {
489+
for code := range q.freeNodeWorkerCh {
490+
if code > 0 {
491+
time.Sleep(q.freeNodeHookPoolIntervalDuration)
492+
493+
if q.isClosed.Get() {
494+
break
495+
}
496+
497+
if q.pool.nodeCount > q.nodeHookPoolSize {
498+
q.lock.Lock()
499+
q.pool.KeepNodePoolCount(q.nodeHookPoolSize)
500+
q.lock.Unlock()
501+
}
502+
503+
} else {
504+
break
505+
}
506+
}
507+
}
508+
func (q *BufferedChannelQueue[T]) loadFromPool() {
509+
for code := range q.loadWorkerCh {
510+
if code > 0 {
511+
512+
if q.isClosed.Get() {
513+
break
514+
}
515+
516+
var val T
517+
var pollErr, offerErr error
518+
519+
q.lock.Lock()
520+
for pollErr == nil {
521+
// Try poll from the pool
522+
val, pollErr = q.pool.Poll()
523+
if pollErr != nil {
524+
break
525+
}
526+
527+
offerErr = q.blockingQueue.Offer(val)
528+
// If failed, unshift it back
529+
if offerErr != nil {
530+
q.pool.Unshift(val)
531+
break
532+
}
533+
}
534+
q.lock.Unlock()
535+
536+
time.Sleep(q.loadFromPoolDuration)
537+
} else {
538+
break
539+
}
540+
}
541+
}
542+
543+
// SetBufferSizeMaximum Set MaximumBufferSize(maximum number of buffered items outside the ChannelQueue)
544+
func (q *BufferedChannelQueue[T]) SetBufferSizeMaximum(size int) {
545+
q.bufferSizeMaximum = size
546+
}
547+
548+
// SetNodeHookPoolSize Set nodeHookPoolSize(the buffering node hooks ideal size)
549+
func (q *BufferedChannelQueue[T]) SetNodeHookPoolSize(size int) {
550+
q.nodeHookPoolSize = size
551+
}
552+
553+
// SetLoadFromPoolDuration Set loadFromPoolDuration(the interval to take buffered items into the ChannelQueue)
554+
func (q *BufferedChannelQueue[T]) SetLoadFromPoolDuration(duration time.Duration) {
555+
q.loadFromPoolDuration = duration
556+
}
557+
558+
// SetFreeNodeHookPoolIntervalDuration Set freeNodeHookPoolIntervalDuration(the interval to clear buffering node hooks down to nodeHookPoolSize)
559+
func (q *BufferedChannelQueue[T]) SetFreeNodeHookPoolIntervalDuration(duration time.Duration) {
560+
q.freeNodeHookPoolIntervalDuration = duration
561+
}
562+
563+
// Close Close the Handler
564+
func (q *BufferedChannelQueue[T]) Close() {
565+
q.lock.Lock()
566+
defer q.lock.Unlock()
567+
568+
q.isClosed.Set(true)
569+
close(q.loadWorkerCh)
570+
close(q.blockingQueue)
571+
}
572+
573+
// Put Put the T val(non-blocking)
574+
func (q *BufferedChannelQueue[T]) Put(val T) error {
575+
// q.lock.Lock()
576+
// defer q.lock.Unlock()
577+
//
578+
// if q.isClosed.Get() {
579+
// return ErrQueueIsClosed
580+
// }
581+
//
582+
// return q.blockingQueue.Put(val)
583+
584+
return q.Offer(val)
585+
}
586+
587+
// // PutWithTimeout Put the T val(blocking), with timeout
588+
// func (q BufferedChannelQueue[T]) PutWithTimeout(val T, timeout time.Duration) error {
589+
// // q.lock.Lock()
590+
// // defer q.lock.Unlock()
591+
//
592+
// if q.isClosed.Get() {
593+
// return ErrQueueIsClosed
594+
// }
595+
//
596+
// return q.blockingQueue.PutWithTimeout(val, timeout)
597+
// }
598+
599+
// Take Take the T val(blocking)
600+
func (q *BufferedChannelQueue[T]) Take() (T, error) {
601+
// q.lock.RLock()
602+
// defer q.lock.RUnlock()
603+
604+
if q.isClosed.Get() {
605+
return *new(T), ErrQueueIsClosed
606+
}
607+
608+
q.lock.RLock()
609+
if q.pool.Count() > 0 {
610+
q.loadWorkerCh.Offer(1)
611+
}
612+
if q.pool.nodeCount > q.nodeHookPoolSize {
613+
q.freeNodeWorkerCh.Offer(1)
614+
}
615+
q.lock.RUnlock()
616+
617+
return q.blockingQueue.Take()
618+
}
619+
620+
// TakeWithTimeout Take the T val(blocking), with timeout
621+
func (q *BufferedChannelQueue[T]) TakeWithTimeout(timeout time.Duration) (T, error) {
622+
// q.lock.RLock()
623+
// defer q.lock.RUnlock()
624+
625+
if q.isClosed.Get() {
626+
return *new(T), ErrQueueIsClosed
627+
}
628+
629+
q.lock.RLock()
630+
if q.pool.Count() > 0 {
631+
q.loadWorkerCh.Offer(1)
632+
}
633+
if q.pool.nodeCount > q.nodeHookPoolSize {
634+
q.freeNodeWorkerCh.Offer(1)
635+
}
636+
q.lock.RUnlock()
637+
638+
return q.blockingQueue.TakeWithTimeout(timeout)
639+
}
640+
641+
// Offer Offer the T val(non-blocking)
642+
func (q *BufferedChannelQueue[T]) Offer(val T) error {
643+
q.lock.Lock()
644+
defer q.lock.Unlock()
645+
646+
if q.isClosed.Get() {
647+
return ErrQueueIsClosed
648+
}
649+
650+
// Before +1: >=, After +1: >
651+
if q.pool.Count() >= q.bufferSizeMaximum {
652+
return ErrQueueIsFull
653+
}
654+
655+
q.pool.Offer(val)
656+
q.loadWorkerCh.Offer(1)
657+
return nil
658+
659+
// err := q.blockingQueue.Offer(val)
660+
// if err == ErrQueueIsFull {
661+
// q.pool.Offer(val)
662+
// q.loadWorkerCh.Offer(1)
663+
// return nil
664+
// }
665+
//
666+
// return err
667+
}
668+
669+
// Poll Poll the T val(non-blocking)
670+
func (q *BufferedChannelQueue[T]) Poll() (T, error) {
671+
// q.lock.RLock()
672+
// defer q.lock.RUnlock()
673+
674+
if q.isClosed.Get() {
675+
return *new(T), ErrQueueIsClosed
676+
}
677+
678+
q.lock.RLock()
679+
if q.pool.Count() > 0 {
680+
q.loadWorkerCh.Offer(1)
681+
}
682+
if q.pool.nodeCount > q.nodeHookPoolSize {
683+
q.freeNodeWorkerCh.Offer(1)
684+
}
685+
q.lock.RUnlock()
686+
687+
return q.blockingQueue.Poll()
688+
}

queue_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,3 +256,83 @@ func TestLinkedListQueue(t *testing.T) {
256256

257257
time.Sleep(2 * timeout)
258258
}
259+
260+
func TestNewBufferedChannelQueue(t *testing.T) {
261+
var queue Queue[int]
262+
var err error
263+
var result int
264+
var timeout time.Duration
265+
266+
bufferedChannelQueue := NewBufferedChannelQueue[int](3, 10000, 100)
267+
bufferedChannelQueue.SetLoadFromPoolDuration(time.Millisecond / 10)
268+
bufferedChannelQueue.SetFreeNodeHookPoolIntervalDuration(1 * time.Millisecond)
269+
queue = bufferedChannelQueue
270+
271+
// Sync
272+
273+
timeout = 1 * time.Millisecond
274+
bufferedChannelQueue.SetBufferSizeMaximum(1)
275+
276+
err = queue.Offer(1)
277+
assert.Equal(t, nil, err)
278+
time.Sleep(1 * timeout)
279+
err = queue.Offer(2)
280+
assert.Equal(t, nil, err)
281+
time.Sleep(1 * timeout)
282+
err = queue.Offer(3)
283+
assert.Equal(t, nil, err)
284+
time.Sleep(1 * timeout)
285+
// Channel: only 3 positions & Buffer: 1 position, now `4` is inserted into the buffer(buffer size: 1)
286+
err = queue.Offer(4)
287+
assert.Equal(t, nil, err)
288+
time.Sleep(1 * timeout)
289+
// Channel: only 3 positions & Buffer: 1 position, now `5` can't be inserted into the buffer(`4` is already inside)
290+
err = queue.Offer(5)
291+
assert.Equal(t, ErrQueueIsFull, err)
292+
293+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
294+
assert.Equal(t, 1, result)
295+
assert.Equal(t, nil, err)
296+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
297+
assert.Equal(t, 2, result)
298+
assert.Equal(t, nil, err)
299+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
300+
assert.Equal(t, 3, result)
301+
assert.Equal(t, nil, err)
302+
result, err = bufferedChannelQueue.TakeWithTimeout(timeout)
303+
assert.Equal(t, 4, result)
304+
assert.Equal(t, nil, err)
305+
306+
// Async
307+
308+
bufferedChannelQueue.SetBufferSizeMaximum(10000)
309+
timeout = 1 * time.Millisecond
310+
asyncTaskDone := make(chan bool)
311+
go func() {
312+
for i := 1; i <= 10000; i++ {
313+
result, err := bufferedChannelQueue.TakeWithTimeout(timeout)
314+
assert.Equal(t, nil, err)
315+
assert.Equal(t, i, result)
316+
}
317+
asyncTaskDone <- true
318+
}()
319+
go func() {
320+
for i := 1; i <= 10000; i++ {
321+
// err := bufferedChannelQueue.PutWithTimeout(i, timeout)
322+
err := bufferedChannelQueue.Offer(i)
323+
assert.Equal(t, nil, err)
324+
}
325+
assert.Equal(t, 0, bufferedChannelQueue.pool.nodeCount)
326+
}()
327+
328+
<-asyncTaskDone
329+
330+
result, err = bufferedChannelQueue.Poll()
331+
assert.Equal(t, ErrQueueIsEmpty, err)
332+
assert.Equal(t, 0, result)
333+
334+
time.Sleep(1 * timeout)
335+
336+
assert.GreaterOrEqual(t, 100, bufferedChannelQueue.pool.nodeCount)
337+
close(asyncTaskDone)
338+
}

0 commit comments

Comments
 (0)