Skip to content

Commit ac2394b

Browse files
committed
prevents FixedFifo.Enqueue to gets blocked trying to send the item over an invalid waitForNextElementChan channel
1 parent dfa38a0 commit ac2394b

File tree

2 files changed

+39
-8
lines changed

2 files changed

+39
-8
lines changed

fixed_fifo_queue.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,30 @@ func (st *FixedFIFO) Enqueue(value interface{}) error {
3232
// check if there is a listener waiting for the next element (this element)
3333
select {
3434
case listener := <-st.waitForNextElementChan:
35-
// send the element through the listener's channel instead of enqueue it
36-
listener <- value
37-
38-
default:
39-
// enqueue the element following the "normal way"
35+
// verify whether it is possible to notify the listener (it could be the listener is no longer
36+
// available because the context expired: DequeueOrWaitForNextElementContext)
4037
select {
41-
case st.queue <- value:
42-
default:
43-
return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity")
38+
// sends the element through the listener's channel instead of enqueueing it
39+
case listener <- value:
40+
default:
41+
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
42+
return st.enqueueIntoQueue(value)
4443
}
44+
45+
default:
46+
// enqueue the element into the queue
47+
return st.enqueueIntoQueue(value)
48+
}
49+
50+
return nil
51+
}
52+
53+
// enqueueIntoQueue enqueues the given item directly into the regular queue
54+
func (st *FixedFIFO) enqueueIntoQueue(value interface{}) error {
55+
select {
56+
case st.queue <- value:
57+
default:
58+
return NewQueueError(QueueErrorCodeFullCapacity, "FixedFIFO queue is at full capacity")
4559
}
4660

4761
return nil

fixed_fifo_queue_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,23 @@ func (suite *FixedFIFOTestSuite) TestEnqueueFullCapacitySingleGR() {
7878
suite.Equalf(QueueErrorCodeFullCapacity, customError.Code(), "Expected code: '%v'", QueueErrorCodeFullCapacity)
7979
}
8080

81+
func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
82+
var (
83+
uselessChan = make(chan interface{})
84+
value = "my-test-value"
85+
)
86+
87+
// let Enqueue knows there is a channel to send the next item instead of enqueueing it into the queue
88+
suite.fifo.waitForNextElementChan <- uselessChan
89+
90+
// enqueues an item having an waiting channel but without a valid listener, so the item should be enqueued into the queue
91+
suite.fifo.Enqueue(value)
92+
// dequeues the item directly from the queue
93+
dequeuedValue, _ := suite.fifo.Dequeue()
94+
95+
suite.Equal(value, dequeuedValue)
96+
}
97+
8198
// TestEnqueueLenMultipleGR enqueues elements concurrently
8299
//
83100
// Detailed steps:

0 commit comments

Comments
 (0)