Skip to content

Commit 18006f4

Browse files
DequeueOrWaitForNextElementContext implemented
1 parent b0550b7 commit 18006f4

File tree

5 files changed

+188
-4
lines changed

5 files changed

+188
-4
lines changed

fifo_queue.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package goconcurrentqueue
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67
"time"
@@ -90,6 +91,13 @@ func (st *FIFO) Dequeue() (interface{}, error) {
9091
// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
9192
// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.
9293
func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error) {
94+
return st.DequeueOrWaitForNextElementContext(context.Background())
95+
}
96+
97+
// DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
98+
// Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements.
99+
// When the passed context expires this function exits and returns the context' error
100+
func (st *FIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interface{}, error) {
93101
for {
94102
if st.isLocked {
95103
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
@@ -109,10 +117,12 @@ func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error) {
109117
case st.waitForNextElementChan <- waitChan:
110118

111119
// re-checks every i milliseconds (top: 10 times) ... the following verifies if an item was enqueued
112-
// around the same time DequeueOrWaitForNextElement was invoked, meaning the waitChan wasn't yet sent over
120+
// around the same time DequeueOrWaitForNextElementContext was invoked, meaning the waitChan wasn't yet sent over
113121
// st.waitForNextElementChan
114122
for i := 0; i < dequeueOrWaitForNextElementInvokeGapTime; i++ {
115123
select {
124+
case <-ctx.Done():
125+
return nil, ctx.Err()
116126
case dequeuedItem := <-waitChan:
117127
return dequeuedItem, nil
118128
case <-time.After(time.Millisecond * time.Duration(i)):
@@ -123,7 +133,12 @@ func (st *FIFO) DequeueOrWaitForNextElement() (interface{}, error) {
123133
}
124134

125135
// return the next enqueued element, if any
126-
return <-waitChan, nil
136+
select {
137+
case item := <-waitChan:
138+
return item, nil
139+
case <-ctx.Done():
140+
return nil, ctx.Err()
141+
}
127142
default:
128143
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
129144
return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element because there are too many DequeueOrWaitForNextElement() waiting")

fifo_queue_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package goconcurrentqueue
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67
"testing"
@@ -680,6 +681,66 @@ func (suite *FIFOTestSuite) TestIsLockedSingleGR() {
680681
suite.True(suite.fifo.isLocked == suite.fifo.IsLocked(), "fifo.IsLocked() has to be equal to fifo.isLocked")
681682
}
682683

684+
// ***************************************************************************************
685+
// ** Context
686+
// ***************************************************************************************
687+
688+
// context canceled while waiting for element to be added
689+
func (suite *FIFOTestSuite) TestContextCanceledAfter1Second() {
690+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
691+
defer cancel()
692+
693+
var (
694+
result interface{}
695+
err error
696+
done = make(chan struct{})
697+
)
698+
699+
go func() {
700+
result, err = suite.fifo.DequeueOrWaitForNextElementContext(ctx)
701+
done <- struct{}{}
702+
}()
703+
704+
select {
705+
// wait for the dequeue to finish
706+
case <-done:
707+
suite.True(err == context.DeadlineExceeded, "Canceling the context passed to fifo.DequeueOrWaitForNextElementContext() must cancel the dequeue wait", err)
708+
suite.Nil(result)
709+
710+
// the following comes first if more time than expected happened while waiting for the dequeued element
711+
case <-time.After(2 * time.Second):
712+
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
713+
}
714+
}
715+
716+
// passing a already-canceled context to DequeueOrWaitForNextElementContext
717+
func (suite *FIFOTestSuite) TestContextAlreadyCanceled() {
718+
ctx, cancel := context.WithCancel(context.Background())
719+
cancel()
720+
721+
var (
722+
result interface{}
723+
err error
724+
done = make(chan struct{})
725+
)
726+
727+
go func() {
728+
result, err = suite.fifo.DequeueOrWaitForNextElementContext(ctx)
729+
done <- struct{}{}
730+
}()
731+
732+
select {
733+
// wait for the dequeue to finish
734+
case <-done:
735+
suite.True(err == context.Canceled, "Canceling the context passed to fifo.DequeueOrWaitForNextElementContext() must cancel the dequeue wait", err)
736+
suite.Nil(result)
737+
738+
// the following comes first if more time than expected happened while waiting for the dequeued element
739+
case <-time.After(2 * time.Second):
740+
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
741+
}
742+
}
743+
683744
// ***************************************************************************************
684745
// ** Run suite
685746
// ***************************************************************************************

fixed_fifo_queue.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package goconcurrentqueue
22

3+
import "context"
4+
35
// Fixed capacity FIFO (First In First Out) concurrent queue
46
type FixedFIFO struct {
57
queue chan interface{}
@@ -65,6 +67,13 @@ func (st *FixedFIFO) Dequeue() (interface{}, error) {
6567
// DequeueOrWaitForNextElement dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
6668
// Multiple calls to DequeueOrWaitForNextElement() would enqueue multiple "listeners" for future enqueued elements.
6769
func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error) {
70+
return st.DequeueOrWaitForNextElementContext(context.Background())
71+
}
72+
73+
// DequeueOrWaitForNextElementContext dequeues an element (if exist) or waits until the next element gets enqueued and returns it.
74+
// Multiple calls to DequeueOrWaitForNextElementContext() would enqueue multiple "listeners" for future enqueued elements.
75+
// When the passed context expires this function exits and returns the context' error
76+
func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (interface{}, error) {
6877
if st.IsLocked() {
6978
return nil, NewQueueError(QueueErrorCodeLockedQueue, "The queue is locked")
7079
}
@@ -75,7 +84,8 @@ func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error) {
7584
return value, nil
7685
}
7786
return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed")
78-
87+
case <-ctx.Done():
88+
return nil, ctx.Err()
7989
// queue is empty, add a listener to wait until next enqueued element is ready
8090
default:
8191
// channel to wait for next enqueued element
@@ -85,7 +95,12 @@ func (st *FixedFIFO) DequeueOrWaitForNextElement() (interface{}, error) {
8595
// enqueue a watcher into the watchForNextElementChannel to wait for the next element
8696
case st.waitForNextElementChan <- waitChan:
8797
// return the next enqueued element, if any
88-
return <-waitChan, nil
98+
select {
99+
case item := <-waitChan:
100+
return item, nil
101+
case <-ctx.Done():
102+
return nil, ctx.Err()
103+
}
89104
default:
90105
// too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements
91106
return nil, NewQueueError(QueueErrorCodeEmptyQueue, "empty queue and can't wait for next element")

fixed_fifo_queue_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package goconcurrentqueue
22

33
import (
4+
"context"
45
"sync"
56
"testing"
67
"time"
@@ -477,3 +478,63 @@ func (suite *FixedFIFOTestSuite) TestUnlockSingleGR() {
477478
suite.fifo.Unlock()
478479
suite.True(suite.fifo.IsLocked() == false, "fifo.isLocked has to be false after fifo.Unlock()")
479480
}
481+
482+
// ***************************************************************************************
483+
// ** Context
484+
// ***************************************************************************************
485+
486+
// context canceled while waiting for element to be added
487+
func (suite *FixedFIFOTestSuite) TestContextCanceledAfter1Second() {
488+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
489+
defer cancel()
490+
491+
var (
492+
result interface{}
493+
err error
494+
done = make(chan struct{})
495+
)
496+
497+
go func() {
498+
result, err = suite.fifo.DequeueOrWaitForNextElementContext(ctx)
499+
done <- struct{}{}
500+
}()
501+
502+
select {
503+
// wait for the dequeue to finish
504+
case <-done:
505+
suite.True(err == context.DeadlineExceeded, "Canceling the context passed to fifo.DequeueOrWaitForNextElementContext() must cancel the dequeue wait", err)
506+
suite.Nil(result)
507+
508+
// the following comes first if more time than expected happened while waiting for the dequeued element
509+
case <-time.After(2 * time.Second):
510+
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
511+
}
512+
}
513+
514+
// passing a already-canceled context to DequeueOrWaitForNextElementContext
515+
func (suite *FixedFIFOTestSuite) TestContextAlreadyCanceled() {
516+
ctx, cancel := context.WithCancel(context.Background())
517+
cancel()
518+
519+
var (
520+
result interface{}
521+
err error
522+
done = make(chan struct{})
523+
)
524+
525+
go func() {
526+
result, err = suite.fifo.DequeueOrWaitForNextElementContext(ctx)
527+
done <- struct{}{}
528+
}()
529+
530+
select {
531+
// wait for the dequeue to finish
532+
case <-done:
533+
suite.True(err == context.Canceled, "Canceling the context passed to fifo.DequeueOrWaitForNextElementContext() must cancel the dequeue wait", err)
534+
suite.Nil(result)
535+
536+
// the following comes first if more time than expected happened while waiting for the dequeued element
537+
case <-time.After(2 * time.Second):
538+
suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled")
539+
}
540+
}

readme.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,38 @@ func main() {
158158

159159
```
160160

161+
### Wait until an element gets enqueued with timeout
162+
[Live code - playground](https://play.golang.org/p/E3xdHcW5nJy)
163+
164+
```go
165+
package main
166+
167+
import (
168+
"context"
169+
"fmt"
170+
"time"
171+
172+
"github.com/enriquebris/goconcurrentqueue"
173+
)
174+
175+
func main() {
176+
var (
177+
fifo = goconcurrentqueue.NewFIFO()
178+
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
179+
)
180+
defer cancel()
181+
182+
fmt.Println("1 - Waiting for next enqueued element")
183+
_, err := fifo.DequeueOrWaitForNextElementContext(ctx)
184+
185+
if err != nil {
186+
fmt.Printf("2 - Failed waiting for new element: %v\n", err)
187+
return
188+
}
189+
}
190+
191+
```
192+
161193
### Dependency Inversion Principle using concurrent-safe queues
162194

163195
*High level modules should not depend on low level modules. Both should depend on abstractions.* Robert C. Martin

0 commit comments

Comments
 (0)