Skip to content

Commit a3c75ce

Browse files
authored
Merge pull request #31 from enriquebris/lock_issue
Preventing FIFO.DequeueOrWaitForNextElement to gets blocked
2 parents e189583 + d5d4fef commit a3c75ce

File tree

4 files changed

+49
-6
lines changed

4 files changed

+49
-6
lines changed

fifo_queue.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ type FIFO struct {
2020
isLocked bool
2121
// queue for watchers that will wait for next elements (if queue is empty at DequeueOrWaitForNextElement execution )
2222
waitForNextElementChan chan chan interface{}
23+
// queue to unlock consumers that were locked when queue was empty (during DequeueOrWaitForNextElement execution)
24+
unlockDequeueOrWaitForNextElementChan chan struct{}
2325
}
2426

2527
// NewFIFO returns a new FIFO concurrent queue
@@ -33,6 +35,7 @@ func NewFIFO() *FIFO {
3335
func (st *FIFO) initialize() {
3436
st.slice = make([]interface{}, 0)
3537
st.waitForNextElementChan = make(chan chan interface{}, WaitForNextElementChanCapacity)
38+
st.unlockDequeueOrWaitForNextElementChan = make(chan struct{}, WaitForNextElementChanCapacity)
3639
}
3740

3841
// Enqueue enqueues an element. Returns error if queue is locked.
@@ -41,6 +44,13 @@ func (st *FIFO) Enqueue(value interface{}) error {
4144
return NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
4245
}
4346

47+
// let consumers (DequeueOrWaitForNextElement) know there is a new element
48+
select {
49+
case st.unlockDequeueOrWaitForNextElementChan <- struct{}{}:
50+
default:
51+
// message could not be sent
52+
}
53+
4454
// check if there is a listener waiting for the next element (this element)
4555
select {
4656
case listener := <-st.waitForNextElementChan:
@@ -134,10 +144,14 @@ func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interfa
134144

135145
// return the next enqueued element, if any
136146
select {
137-
case item := <-waitChan:
138-
return item, nil
139-
case <-ctx.Done():
140-
return nil, ctx.Err()
147+
case <-st.unlockDequeueOrWaitForNextElementChan:
148+
// new enqueued element, no need to keep waiting
149+
break
150+
151+
case item := <-waitChan:
152+
return item, nil
153+
case <-ctx.Done():
154+
return nil, ctx.Err()
141155
}
142156
default:
143157
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements

go.mod

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module github.com/enriquebris/goconcurrentqueue
2+
3+
go 1.17
4+
5+
require github.com/stretchr/testify v1.7.0
6+
7+
require (
8+
github.com/davecgh/go-spew v1.1.0 // indirect
9+
github.com/pmezard/go-difflib v1.0.0 // indirect
10+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
11+
)

go.sum

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
2+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
6+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
7+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
8+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
9+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
10+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
11+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

readme.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.1-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.1") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
1+
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.2-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.2") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
22

33
# goconcurrentqueue - Concurrent safe queues
44
The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)).
@@ -31,6 +31,9 @@ This package is compatible with the following golang versions:
3131
- 1.12.x
3232
- 1.13.x
3333
- 1.14.x
34+
- 1.15.x
35+
- 1.16.x
36+
- 1.17.x
3437

3538
## Documentation
3639
Visit [goconcurrentqueue at go.dev](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue)
@@ -242,9 +245,13 @@ func workWithQueue(queue goconcurrentqueue.Queue) error {
242245

243246
## History
244247

248+
### v0.6.2
249+
250+
- Prevents FIFO.DequeueOrWaitForNextElement to gets blocked when waiting for an enqueued element
251+
245252
### v0.6.1
246253

247-
- FixedFifo.Enqueue prevents to gets blocked trying to send the item over an invalid waitForNextElementChan channel
254+
- FixedFifo.Enqueue prevents to get blocked trying to send the item over an invalid waitForNextElementChan channel
248255

249256
### v0.6.0
250257

0 commit comments

Comments
 (0)