Skip to content

Commit a41c947

Browse files
authored
Merge pull request rdleal#2 from Bychin/feat-add-blocking-pop-call
feat: add blocking pop call
2 parents 2ca53c2 + a1f3f66 commit a41c947

File tree

2 files changed

+220
-9
lines changed

2 files changed

+220
-9
lines changed

kpq/keyed_priority_queue.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package kpq
99

1010
import (
11+
"context"
1112
"fmt"
1213
"sync"
1314
)
@@ -60,6 +61,12 @@ func newKeyNotFoundError[K comparable](k K) error {
6061
// CmpFunc is a generic function type used for ordering the priority queue.
6162
type CmpFunc[V any] func(x, y V) bool
6263

64+
// pair is a pair of key and value that might be put or read from the queue.
65+
type pair[K comparable, V any] struct {
66+
k K
67+
v V
68+
}
69+
6370
// KeyedPriorityQueue represents a generic keyed priority queue,
6471
// where K is the key type and V is the priority value type.
6572
//
@@ -71,6 +78,8 @@ type KeyedPriorityQueue[K comparable, V any] struct {
7178
im map[K]int // inverse map of pm; note that for a given key k, pm[im[k]] == k
7279
vals map[K]V // generic priority values of key k
7380
cmp CmpFunc[V]
81+
82+
fastTrack chan pair[K, V] // used for blocking pop calls
7483
}
7584

7685
// NewKeyedPriorityQueue returns a new keyed priority queue
@@ -82,10 +91,11 @@ func NewKeyedPriorityQueue[K comparable, V any](cmp CmpFunc[V]) *KeyedPriorityQu
8291
panic("keyed priority queue: comparison function cannot be nil")
8392
}
8493
return &KeyedPriorityQueue[K, V]{
85-
pm: make([]K, 0),
86-
im: make(map[K]int),
87-
vals: make(map[K]V),
88-
cmp: cmp,
94+
pm: make([]K, 0),
95+
im: make(map[K]int),
96+
vals: make(map[K]V),
97+
cmp: cmp,
98+
fastTrack: make(chan pair[K, V]),
8999
}
90100
}
91101

@@ -104,6 +114,12 @@ func (pq *KeyedPriorityQueue[K, V]) Push(k K, v V) error {
104114
}
105115

106116
func (pq *KeyedPriorityQueue[K, V]) push(k K, v V) {
117+
select {
118+
case pq.fastTrack <- pair[K, V]{k: k, v: v}:
119+
return
120+
default:
121+
// pass
122+
}
107123
n := len(pq.pm)
108124
pq.pm = append(pq.pm, k)
109125
pq.im[k] = n
@@ -122,6 +138,35 @@ func (pq *KeyedPriorityQueue[K, V]) Pop() (K, V, bool) {
122138
var v V
123139
return k, v, false
124140
}
141+
142+
k, v := pq.pop()
143+
return k, v, true
144+
}
145+
146+
// BlockingPop removes and returns the highest priority key and value from the priority queue.
147+
// In case queue is empty, it blocks until next Push happens or ctx is closed.
148+
func (pq *KeyedPriorityQueue[K, V]) BlockingPop(ctx context.Context) (K, V, bool) {
149+
pq.mu.Lock()
150+
151+
if len(pq.pm) == 0 {
152+
pq.mu.Unlock()
153+
154+
select {
155+
case <-ctx.Done():
156+
var k K
157+
var v V
158+
return k, v, false
159+
case pair := <-pq.fastTrack:
160+
return pair.k, pair.v, true
161+
}
162+
}
163+
164+
defer pq.mu.Unlock()
165+
k, v := pq.pop()
166+
return k, v, true
167+
}
168+
169+
func (pq *KeyedPriorityQueue[K, V]) pop() (K, V) {
125170
n := len(pq.pm) - 1
126171
k := pq.pm[0]
127172
v := pq.vals[k]
@@ -130,7 +175,7 @@ func (pq *KeyedPriorityQueue[K, V]) Pop() (K, V, bool) {
130175
pq.pm = pq.pm[:n]
131176
delete(pq.im, k)
132177
delete(pq.vals, k)
133-
return k, v, true
178+
return k, v
134179
}
135180

136181
// Set inserts a new entry in the priority queue with the given key and value,

kpq/keyed_priority_queue_test.go

Lines changed: 170 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package kpq
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"testing"
8+
"time"
79
)
810

911
func TestNewKeyedPriorityQueue_NilCmp(t *testing.T) {
1012
defer func() {
1113
if err := recover(); err == nil {
12-
t.Error("want NewKeyedPriorityQueue to panic when receiving a nil comparison cunction")
14+
t.Error("want NewKeyedPriorityQueue to panic when receiving a nil comparison function")
1315
}
1416
}()
1517

@@ -217,7 +219,7 @@ func TestKeyedPriorityQueue_Pop(t *testing.T) {
217219
t.Run(fmt.Sprintf("%s_%d", tc.wantKey, tc.wantValue), func(t *testing.T) {
218220
gotKey, gotValue, ok := pq.Pop()
219221
if !ok {
220-
t.Fatal("pq.Pop(): got unexpected empty prioriy queue")
222+
t.Fatal("pq.Pop(): got unexpected empty priority queue")
221223
}
222224

223225
if gotKey != tc.wantKey {
@@ -255,7 +257,171 @@ func TestKeyedPriorityQueue_Pop(t *testing.T) {
255257

256258
_, _, ok := pq.Pop()
257259
if ok {
258-
t.Errorf("pq.Pop(): got unexpected non-empty priorit queue")
260+
t.Errorf("pq.Pop(): got unexpected non-empty priority queue")
261+
}
262+
})
263+
}
264+
265+
func TestKeyedPriorityQueue_BlockingPop(t *testing.T) {
266+
t.Run("Block before pushing into queue", func(t *testing.T) {
267+
pq := NewKeyedPriorityQueue[string](func(x, y int) bool {
268+
return x < y
269+
})
270+
271+
items := []struct {
272+
key string
273+
val int
274+
}{
275+
{key: "fourth", val: 10},
276+
{key: "second", val: 8},
277+
{key: "third", val: 9},
278+
{key: "first", val: 6},
279+
{key: "last", val: 20},
280+
}
281+
282+
go func() {
283+
time.Sleep(10 * time.Millisecond) // let reader block via BlockingPop()
284+
285+
for _, item := range items {
286+
err := pq.Push(item.key, item.val)
287+
if err != nil {
288+
panic(fmt.Sprintf("Push(%v, %v): got unexpected error %v", item.key, item.val, err))
289+
}
290+
}
291+
}()
292+
293+
testCases := []struct {
294+
wantKey string
295+
wantValue int
296+
wantPeekKey string
297+
wantPeekValue int
298+
wantLen int
299+
}{
300+
{
301+
wantKey: "fourth", // this element was put first into queue, when reader was already blocked
302+
wantValue: 10,
303+
wantPeekKey: "first", // this is an already sorted element in the queue
304+
wantPeekValue: 6,
305+
wantLen: 4,
306+
},
307+
{
308+
wantKey: "first",
309+
wantValue: 6,
310+
wantPeekKey: "second",
311+
wantPeekValue: 8,
312+
wantLen: 3,
313+
},
314+
}
315+
316+
for _, tc := range testCases {
317+
t.Run(fmt.Sprintf("%s_%d", tc.wantKey, tc.wantValue), func(t *testing.T) {
318+
gotKey, gotValue, _ := pq.BlockingPop(context.Background())
319+
320+
if gotKey != tc.wantKey {
321+
t.Errorf("pq.BlockingPop(): got key %q; want %q", gotKey, tc.wantKey)
322+
}
323+
324+
if gotValue != tc.wantValue {
325+
t.Errorf("pq.BlockingPop(): got value %d; want %d", gotValue, tc.wantValue)
326+
}
327+
328+
time.Sleep(10 * time.Millisecond) // give queue a chance to process all the rest Push'es
329+
330+
gotPeekKey, gotPeekValue, ok := pq.Peek()
331+
if !ok {
332+
t.Fatal("got no min key and value in the priority queue")
333+
}
334+
335+
if gotPeekKey != tc.wantPeekKey {
336+
t.Errorf("pq.Peek(): got key %q; want %q", gotPeekKey, tc.wantPeekKey)
337+
}
338+
339+
if gotPeekValue != tc.wantPeekValue {
340+
t.Errorf("pq.Peek(): got value %d; want %d", gotPeekValue, tc.wantPeekValue)
341+
}
342+
343+
if got := pq.Len(); got != tc.wantLen {
344+
t.Errorf("pq.Len(): got %d; want %d", got, tc.wantLen)
345+
}
346+
})
347+
}
348+
})
349+
350+
t.Run("Do not actually block as queue is not empty", func(t *testing.T) {
351+
pq := NewKeyedPriorityQueue[string](func(x, y int) bool {
352+
return x < y
353+
})
354+
355+
items := []struct {
356+
key string
357+
val int
358+
}{
359+
{key: "fourth", val: 10},
360+
{key: "second", val: 8},
361+
{key: "third", val: 9},
362+
{key: "first", val: 6},
363+
{key: "last", val: 20},
364+
}
365+
366+
for _, item := range items {
367+
err := pq.Push(item.key, item.val)
368+
if err != nil {
369+
panic(fmt.Sprintf("Push(%v, %v): got unexpected error %v", item.key, item.val, err))
370+
}
371+
}
372+
373+
testCases := []struct {
374+
wantKey string
375+
wantValue int
376+
wantPeekKey string
377+
wantPeekValue int
378+
wantLen int
379+
}{
380+
{
381+
wantKey: "first",
382+
wantValue: 6,
383+
wantPeekKey: "second",
384+
wantPeekValue: 8,
385+
wantLen: 4,
386+
},
387+
{
388+
wantKey: "second",
389+
wantValue: 8,
390+
wantPeekKey: "third",
391+
wantPeekValue: 9,
392+
wantLen: 3,
393+
},
394+
}
395+
396+
for _, tc := range testCases {
397+
t.Run(fmt.Sprintf("%s_%d", tc.wantKey, tc.wantValue), func(t *testing.T) {
398+
gotKey, gotValue, _ := pq.BlockingPop(context.Background())
399+
400+
if gotKey != tc.wantKey {
401+
t.Errorf("pq.Pop(): got key %q; want %q", gotKey, tc.wantKey)
402+
}
403+
404+
if gotValue != tc.wantValue {
405+
t.Errorf("pq.Pop(): got value %d; want %d", gotValue, tc.wantValue)
406+
}
407+
408+
gotPeekKey, gotPeekValue, ok := pq.Peek()
409+
if !ok {
410+
t.Fatal("got no min key and value in the priority queue")
411+
}
412+
413+
if gotPeekKey != tc.wantPeekKey {
414+
t.Errorf("pq.Peek(): got key %q; want %q", gotPeekKey, tc.wantPeekKey)
415+
}
416+
417+
if gotPeekValue != tc.wantPeekValue {
418+
t.Errorf("pq.Peek(): got value %d; want %d", gotPeekValue, tc.wantPeekValue)
419+
}
420+
421+
if got := pq.Len(); got != tc.wantLen {
422+
t.Errorf("pq.Len(): got %d; want %d", got, tc.wantLen)
423+
}
424+
})
259425
}
260426
})
261427
}
@@ -385,7 +551,7 @@ func TestKeyedPriorityQueue_PeekKey_EmptyQueue(t *testing.T) {
385551
}
386552
}
387553

388-
func TestKeyedPriorityQeue_Contains(t *testing.T) {
554+
func TestKeyedPriorityQueue_Contains(t *testing.T) {
389555
pq := NewKeyedPriorityQueue[string](func(x, y int) bool { return x < y })
390556

391557
k := "user"

0 commit comments

Comments
 (0)