Skip to content

Commit e406b58

Browse files
committed
Add Poll method to RingBuffer
Adds a Poll method like the one in Queue to do Gets with a timeout.
1 parent 01ff43b commit e406b58

File tree

2 files changed

+71
-3
lines changed

2 files changed

+71
-3
lines changed

queue/ring.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package queue
1919
import (
2020
"runtime"
2121
"sync/atomic"
22+
"time"
2223
)
2324

2425
// roundUp takes a uint64 greater than 0 and rounds it up to the next
@@ -128,9 +129,23 @@ L:
128129
// to the queue or Dispose is called on the queue. An error will be returned
129130
// if the queue is disposed.
130131
func (rb *RingBuffer) Get() (interface{}, error) {
131-
var n *node
132-
pos := atomic.LoadUint64(&rb.dequeue)
133-
i := 0
132+
return rb.Poll(0)
133+
}
134+
135+
// Poll will return the next item in the queue. This call will block
136+
// if the queue is empty. This call will unblock when an item is added
137+
// to the queue, Dispose is called on the queue, or the timeout is reached. An
138+
// error will be returned if the queue is disposed or a timeout occurs.
139+
func (rb *RingBuffer) Poll(timeout time.Duration) (interface{}, error) {
140+
var (
141+
n *node
142+
pos = atomic.LoadUint64(&rb.dequeue)
143+
i = 0
144+
start time.Time
145+
)
146+
if timeout > 0 {
147+
start = time.Now()
148+
}
134149
L:
135150
for {
136151
if atomic.LoadUint64(&rb.disposed) == 1 {
@@ -150,6 +165,10 @@ L:
150165
pos = atomic.LoadUint64(&rb.dequeue)
151166
}
152167

168+
if timeout > 0 && time.Since(start) >= timeout {
169+
return nil, ErrTimeout
170+
}
171+
153172
if i == 10000 {
154173
runtime.Gosched() // free up the cpu before the next iteration
155174
i = 0

queue/ring_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"sync"
2121
"sync/atomic"
2222
"testing"
23+
"time"
2324

2425
"github.com/stretchr/testify/assert"
2526
)
@@ -176,6 +177,54 @@ func TestRingGetEmpty(t *testing.T) {
176177
wg.Wait()
177178
}
178179

180+
func TestRingPollEmpty(t *testing.T) {
181+
rb := NewRingBuffer(3)
182+
183+
_, err := rb.Poll(1)
184+
assert.Equal(t, ErrTimeout, err)
185+
}
186+
187+
func TestRingPoll(t *testing.T) {
188+
rb := NewRingBuffer(10)
189+
190+
// should be able to Poll() before anything is present, without breaking future Puts
191+
rb.Poll(time.Millisecond)
192+
193+
rb.Put(`test`)
194+
result, err := rb.Poll(0)
195+
if !assert.Nil(t, err) {
196+
return
197+
}
198+
199+
assert.Equal(t, `test`, result)
200+
assert.Equal(t, uint64(0), rb.Len())
201+
202+
rb.Put(`1`)
203+
rb.Put(`2`)
204+
205+
result, err = rb.Poll(time.Millisecond)
206+
if !assert.Nil(t, err) {
207+
return
208+
}
209+
210+
assert.Equal(t, `1`, result)
211+
assert.Equal(t, uint64(1), rb.Len())
212+
213+
result, err = rb.Poll(time.Millisecond)
214+
if !assert.Nil(t, err) {
215+
return
216+
}
217+
218+
assert.Equal(t, `2`, result)
219+
220+
before := time.Now()
221+
_, err = rb.Poll(5 * time.Millisecond)
222+
// This delta is normally 1-3 ms but running tests in CI with -race causes
223+
// this to run much slower. For now, just bump up the threshold.
224+
assert.InDelta(t, 5, time.Since(before).Seconds()*1000, 10)
225+
assert.Equal(t, ErrTimeout, err)
226+
}
227+
179228
func TestRingLen(t *testing.T) {
180229
rb := NewRingBuffer(4)
181230
assert.Equal(t, uint64(0), rb.Len())

0 commit comments

Comments
 (0)