Skip to content

Commit b5b42fa

Browse files
author
Mike Atlas
committed
add queue.Peek() functionality
1 parent 93a32f6 commit b5b42fa

File tree

3 files changed

+53
-0
lines changed

3 files changed

+53
-0
lines changed

queue/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ var (
2525

2626
// ErrTimeout is returned when an applicable queue operation times out.
2727
ErrTimeout = errors.New(`queue: poll timed out`)
28+
29+
// ErrUnknown is returned if the error state was unusual and unexpected.
30+
ErrUnknown = errors.New(`queue: unknown error`)
2831
)

queue/queue.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,16 @@ func (items *items) get(number int64) []interface{} {
9595
return returnItems
9696
}
9797

98+
func (items *items) peek() (interface{}, bool) {
99+
length := len(*items)
100+
101+
if length == 0 {
102+
return nil, false
103+
}
104+
105+
return (*items)[length-1], true
106+
}
107+
98108
func (items *items) getUntil(checker func(item interface{}) bool) []interface{} {
99109
length := len(*items)
100110

@@ -239,6 +249,24 @@ func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error)
239249
return items, nil
240250
}
241251

252+
// Peek returns a the first item in the queue by value
253+
// without modifying the queue.
254+
func (q *Queue) Peek() (interface{}, error) {
255+
q.lock.Lock()
256+
defer q.lock.Unlock()
257+
258+
if q.disposed {
259+
return nil, ErrDisposed
260+
}
261+
262+
peekItem, ok := q.items.peek()
263+
if ok {
264+
return peekItem, nil
265+
}
266+
267+
return nil, ErrUnknown
268+
}
269+
242270
// TakeUntil takes a function and returns a list of items that
243271
// match the checker until the checker returns false. This does not
244272
// wait if there are no items in the queue.

queue/queue_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,28 @@ func BenchmarkChannel(b *testing.B) {
329329
wg.Wait()
330330
}
331331

332+
func TestPeek(t *testing.T) {
333+
q := New(10)
334+
q.Put(`a`)
335+
q.Put(`b`)
336+
q.Put(`c`)
337+
result, err := q.Peek()
338+
expected := `c`
339+
340+
assert.Nil(t, err)
341+
assert.Equal(t, expected, result)
342+
assert.Equal(t, q.Len(), int64(3))
343+
}
344+
345+
func TestPeekOnDisposedQueue(t *testing.T) {
346+
q := New(10)
347+
q.Dispose()
348+
result, err := q.Peek()
349+
350+
assert.Nil(t, result)
351+
assert.IsType(t, ErrDisposed, err)
352+
}
353+
332354
func TestTakeUntil(t *testing.T) {
333355
q := New(10)
334356
q.Put(`a`, `b`, `c`)

0 commit comments

Comments
 (0)