Skip to content

Commit 082dbc7

Browse files
committed
issue-34
1 parent ea2e52e commit 082dbc7

File tree

2 files changed

+51
-8
lines changed

2 files changed

+51
-8
lines changed

fixed_fifo_queue.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package goconcurrentqueue
22

3-
import "context"
3+
import (
4+
"context"
5+
)
46

57
// Fixed capacity FIFO (First In First Out) concurrent queue
68
type FixedFIFO struct {
@@ -35,11 +37,11 @@ func (st *FixedFIFO) Enqueue(value interface{}) error {
3537
// verify whether it is possible to notify the listener (it could be the listener is no longer
3638
// available because the context expired: DequeueOrWaitForNextElementContext)
3739
select {
38-
// sends the element through the listener's channel instead of enqueueing it
39-
case listener <- value:
40-
default:
41-
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
42-
return st.enqueueIntoQueue(value)
40+
// sends the element through the listener's channel instead of enqueueing it
41+
case listener <- value:
42+
default:
43+
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
44+
return st.enqueueIntoQueue(value)
4345
}
4446

4547
default:
@@ -114,6 +116,12 @@ func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (in
114116
return item, nil
115117
case <-ctx.Done():
116118
return nil, ctx.Err()
119+
// try again to get the element from the regular queue (in case waitChan doesn't provide any item)
120+
case value, ok := <-st.queue:
121+
if ok {
122+
return value, nil
123+
}
124+
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
117125
}
118126
default:
119127
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements

fixed_fifo_queue_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueFullCapacitySingleGR() {
8181
func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
8282
var (
8383
uselessChan = make(chan interface{})
84-
value = "my-test-value"
84+
value = "my-test-value"
8585
)
8686

8787
// let Enqueue knows there is a channel to send the next item instead of enqueueing it into the queue
@@ -98,6 +98,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
9898
// TestEnqueueLenMultipleGR enqueues elements concurrently
9999
//
100100
// Detailed steps:
101+
//
101102
// 1 - Enqueue totalGRs concurrently (from totalGRs different GRs)
102103
// 2 - Verifies the len, it should be equal to totalGRs
103104
// 3 - Verifies that all elements from 0 to totalGRs were enqueued
@@ -269,6 +270,7 @@ func (suite *FixedFIFOTestSuite) TestDequeueClosedChannelSingleGR() {
269270
// TestDequeueMultipleGRs dequeues elements concurrently
270271
//
271272
// Detailed steps:
273+
//
272274
// 1 - Enqueues totalElementsToEnqueue consecutive integers
273275
// 2 - Dequeues totalElementsToDequeue concurrently from totalElementsToDequeue GRs
274276
// 3 - Verifies the final len, should be equal to totalElementsToEnqueue - totalElementsToDequeue
@@ -376,6 +378,39 @@ func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue()
376378
}
377379
}
378380

381+
// calling DequeueOrWaitForNextElement with empty queue, then adding an item directly into queue's internal channel
382+
func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithStuckWaitChan() {
383+
var (
384+
dummyValue = "dummyValue"
385+
doneChan = make(chan struct{})
386+
)
387+
388+
// consumer
389+
go func(queue *FixedFIFO, expectedValue interface{}, done chan struct{}) {
390+
item, err := queue.DequeueOrWaitForNextElement()
391+
suite.NoError(err)
392+
suite.Equal(expectedValue, item)
393+
394+
done <- struct{}{}
395+
}(suite.fifo, dummyValue, doneChan)
396+
397+
// a second should be enough for the consumer to start consuming ...
398+
time.Sleep(time.Second)
399+
400+
// add an item (enqueue) directly into queue's internal channel
401+
suite.fifo.queue <- dummyValue
402+
403+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
404+
defer cancel()
405+
406+
select {
407+
case <-doneChan:
408+
409+
case <-ctx.Done():
410+
suite.Fail("too much time waiting ...")
411+
}
412+
}
413+
379414
// single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error
380415
func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() {
381416
// enqueue WaitForNextElementChanCapacity listeners to future enqueued elements
@@ -554,4 +589,4 @@ func (suite *FixedFIFOTestSuite) TestContextAlreadyCanceled() {
554589
case <-time.After(2 * time.Second):
555590
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
556591
}
557-
}
592+
}

0 commit comments

Comments
 (0)