Skip to content

Commit c09fe97

Browse files
authored
Merge pull request #35 from enriquebris/issue-34
Issue 34
2 parents ea2e52e + c7e9f8b commit c09fe97

File tree

3 files changed

+57
-21
lines changed

3 files changed

+57
-21
lines changed

fixed_fifo_queue.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package goconcurrentqueue
22

3-
import "context"
3+
import (
4+
"context"
5+
)
46

57
// Fixed capacity FIFO (First In First Out) concurrent queue
68
type FixedFIFO struct {
@@ -35,11 +37,11 @@ func (st *FixedFIFO) Enqueue(value interface{}) error {
3537
// verify whether it is possible to notify the listener (it could be the listener is no longer
3638
// available because the context expired: DequeueOrWaitForNextElementContext)
3739
select {
38-
// sends the element through the listener's channel instead of enqueueing it
39-
case listener <- value:
40-
default:
41-
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
42-
return st.enqueueIntoQueue(value)
40+
// sends the element through the listener's channel instead of enqueueing it
41+
case listener <- value:
42+
default:
43+
// push the element into the queue instead of sending it through the listener's channel (which is not available at this moment)
44+
return st.enqueueIntoQueue(value)
4345
}
4446

4547
default:
@@ -114,6 +116,12 @@ func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (in
114116
return item, nil
115117
case <-ctx.Done():
116118
return nil, ctx.Err()
119+
// try again to get the element from the regular queue (in case waitChan doesn't provide any item)
120+
case value, ok := <-st.queue:
121+
if ok {
122+
return value, nil
123+
}
124+
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
117125
}
118126
default:
119127
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements

fixed_fifo_queue_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueFullCapacitySingleGR() {
8181
func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
8282
var (
8383
uselessChan = make(chan interface{})
84-
value = "my-test-value"
84+
value = "my-test-value"
8585
)
8686

8787
// let Enqueue knows there is a channel to send the next item instead of enqueueing it into the queue
@@ -98,6 +98,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() {
9898
// TestEnqueueLenMultipleGR enqueues elements concurrently
9999
//
100100
// Detailed steps:
101+
//
101102
// 1 - Enqueue totalGRs concurrently (from totalGRs different GRs)
102103
// 2 - Verifies the len, it should be equal to totalGRs
103104
// 3 - Verifies that all elements from 0 to totalGRs were enqueued
@@ -269,6 +270,7 @@ func (suite *FixedFIFOTestSuite) TestDequeueClosedChannelSingleGR() {
269270
// TestDequeueMultipleGRs dequeues elements concurrently
270271
//
271272
// Detailed steps:
273+
//
272274
// 1 - Enqueues totalElementsToEnqueue consecutive integers
273275
// 2 - Dequeues totalElementsToDequeue concurrently from totalElementsToDequeue GRs
274276
// 3 - Verifies the final len, should be equal to totalElementsToEnqueue - totalElementsToDequeue
@@ -376,6 +378,39 @@ func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue()
376378
}
377379
}
378380

381+
// calling DequeueOrWaitForNextElement with empty queue, then adding an item directly into queue's internal channel
382+
func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithStuckWaitChan() {
383+
var (
384+
dummyValue = "dummyValue"
385+
doneChan = make(chan struct{})
386+
)
387+
388+
// consumer
389+
go func(queue *FixedFIFO, expectedValue interface{}, done chan struct{}) {
390+
item, err := queue.DequeueOrWaitForNextElement()
391+
suite.NoError(err)
392+
suite.Equal(expectedValue, item)
393+
394+
done <- struct{}{}
395+
}(suite.fifo, dummyValue, doneChan)
396+
397+
// a second should be enough for the consumer to start consuming ...
398+
time.Sleep(time.Second)
399+
400+
// add an item (enqueue) directly into queue's internal channel
401+
suite.fifo.queue <- dummyValue
402+
403+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
404+
defer cancel()
405+
406+
select {
407+
case <-doneChan:
408+
409+
case <-ctx.Done():
410+
suite.Fail("too much time waiting ...")
411+
}
412+
}
413+
379414
// single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error
380415
func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() {
381416
// enqueue WaitForNextElementChanCapacity listeners to future enqueued elements
@@ -554,4 +589,4 @@ func (suite *FixedFIFOTestSuite) TestContextAlreadyCanceled() {
554589
case <-time.After(2 * time.Second):
555590
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
556591
}
557-
}
592+
}

readme.md

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.3-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.3") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
1+
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.7.0-yellowgreen.svg?style=flat "goconcurrentqueue v0.7.0") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
22

33
# goconcurrentqueue - Concurrent safe queues
44
The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)).
@@ -22,18 +22,7 @@ Execute
2222
go get github.com/enriquebris/goconcurrentqueue
2323
```
2424

25-
This package is compatible with the following golang versions:
26-
- 1.7.x
27-
- 1.8.x
28-
- 1.9.x
29-
- 1.10.x
30-
- 1.11.x
31-
- 1.12.x
32-
- 1.13.x
33-
- 1.14.x
34-
- 1.15.x
35-
- 1.16.x
36-
- 1.17.x
25+
This package is compatible with all golang versions >= 1.7.x
3726

3827
## Documentation
3928
Visit [goconcurrentqueue at go.dev](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue)
@@ -245,6 +234,10 @@ func workWithQueue(queue goconcurrentqueue.Queue) error {
245234

246235
## History
247236

237+
### v0.7.0
238+
239+
- Prevents FIFO.DequeueOrWaitForNextElement to keep waiting for a waitChan while internal queues contain items
240+
248241
### v0.6.3
249242

250243
- Prevents FIFO.DequeueOrWaitForNextElement to add useless wait channels

0 commit comments

Comments
 (0)