Skip to content

Commit cf01496

Browse files
committed
prevents FIFO.DequeueOrWaitForNextElement to gets blocked when waiting for an enqueued element
1 parent e189583 commit cf01496

File tree

1 file changed

+18
-4
lines changed

1 file changed

+18
-4
lines changed

fifo_queue.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ type FIFO struct {
2020
isLocked bool
2121
// queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution )
2222
waitForNextElementChan chan chan interface{}
23+
// queue to unlock consumers that were locked when queue was empty (during DequeueOrWaitForNextElement execution)
24+
unlockDequeueOrWaitForNextElementChan chan struct{}
2325
}
2426

2527
// NewFIFO returns a new FIFO concurrent queue
@@ -33,6 +35,7 @@ func NewFIFO() *FIFO {
3335
func (st *FIFO) initialize() {
3436
st.slice = make([]interface{}, 0)
3537
st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity)
38+
st.unlockDequeueOrWaitForNextElementChan = make(chan struct{}, WaitForNextElementChanCapacity)
3639
}
3740

3841
// Enqueue enqueues an element. Returns error if queue is locked.
@@ -41,6 +44,13 @@ func (st *FIFO) Enqueue(value interface{}) error {
4144
return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
4245
}
4346

47+
// let consumers (DequeueOrWaitForNextElement) know there is a new element
48+
select {
49+
case st.unlockDequeueOrWaitForNextElementChan <- struct{}{}:
50+
default:
51+
// message could not be sent
52+
}
53+
4454
// check if there is a listener waiting for the next element (this element)
4555
select {
4656
case listener := <-st.waitForNextElementChan:
@@ -134,10 +144,14 @@ func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interfa
134144

135145
// return the next enqueued element, if any
136146
select {
137-
case item := <-waitChan:
138-
return item, nil
139-
case <-ctx.Done():
140-
return nil, ctx.Err()
147+
case <-st.unlockDequeueOrWaitForNextElementChan:
148+
// new enqueued element, no need to keep waiting
149+
break
150+
151+
case item := <-waitChan:
152+
return item, nil
153+
case <-ctx.Done():
154+
return nil, ctx.Err()
141155
}
142156
default:
143157
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements

0 commit comments

Comments
 (0)