Skip to content

Commit ea2e52e

Browse files
authored
Merge pull request #33 from enriquebris/dequeueWaitChanLeak
Dequeue wait chan leak
2 parents a3c75ce + 4eb073b commit ea2e52e

File tree

2 files changed

+36
-21
lines changed

2 files changed

+36
-21
lines changed

fifo_queue.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -126,32 +126,43 @@ func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interfa
126126
// enqueue a watcher into the watchForNextElementChannel to wait for the next element
127127
case st.waitForNextElementChan <- waitChan:
128128

129-
// re-checks every i milliseconds (top: 10 times) ... the following verifies if an item was enqueued
130-
// around the same time DequeueOrWaitForNextElementContext was invoked, meaning the waitChan wasn't yet sent over
131-
// st.waitForNextElementChan
132-
for i := 0; i < dequeueOrWaitForNextElementInvokeGapTime; i++ {
133-
select {
134-
case <-ctx.Done():
135-
return nil, ctx.Err()
136-
case dequeuedItem := <-waitChan:
137-
return dequeuedItem, nil
138-
case <-time.After(time.Millisecond * time.Duration(i)):
139-
if dequeuedItem, err := st.Dequeue(); err == nil {
129+
// n
130+
for {
131+
// re-checks every i milliseconds (top: 10 times) ... the following verifies if an item was enqueued
132+
// around the same time DequeueOrWaitForNextElementContext was invoked, meaning the waitChan wasn't yet sent over
133+
// st.waitForNextElementChan
134+
for i := 0; i < dequeueOrWaitForNextElementInvokeGapTime; i++ {
135+
select {
136+
case <-ctx.Done():
137+
return nil, ctx.Err()
138+
case dequeuedItem := <-waitChan:
140139
return dequeuedItem, nil
140+
case <-time.After(time.Millisecond * time.Duration(i)):
141+
if dequeuedItem, err := st.Dequeue(); err == nil {
142+
return dequeuedItem, nil
143+
}
141144
}
142145
}
143-
}
144146

145-
// return the next enqueued element, if any
146-
select {
147-
case <-st.unlockDequeueOrWaitForNextElementChan:
147+
// return the next enqueued element, if any
148+
select {
148149
// new enqueued element, no need to keep waiting
149-
break
150+
case <-st.unlockDequeueOrWaitForNextElementChan:
151+
// check if we got a new element just after we got <-st.unlockDequeueOrWaitForNextElementChan
152+
select {
153+
case item := <-waitChan:
154+
return item, nil
155+
default:
156+
}
157+
// go back to: for loop
158+
continue
150159

151-
case item := <-waitChan:
152-
return item, nil
153-
case <-ctx.Done():
154-
return nil, ctx.Err()
160+
case item := <-waitChan:
161+
return item, nil
162+
case <-ctx.Done():
163+
return nil, ctx.Err()
164+
}
165+
// n
155166
}
156167
default:
157168
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements

readme.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.2-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.2") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
1+
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.3-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.3") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)
22

33
# goconcurrentqueue - Concurrent safe queues
44
The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)).
@@ -245,6 +245,10 @@ func workWithQueue(queue goconcurrentqueue.Queue) error {
245245

246246
## History
247247

248+
### v0.6.3
249+
250+
- Prevents FIFO.DequeueOrWaitForNextElement to add useless wait channels
251+
248252
### v0.6.2
249253

250254
- Prevents FIFO.DequeueOrWaitForNextElement to gets blocked when waiting for an enqueued element

0 commit comments

Comments
 (0)