Skip to content

Commit 134a96a

Browse files
committed
feat: add blocking pop call
1 parent 2ca53c2 commit 134a96a

File tree

2 files changed

+164
-5
lines changed

2 files changed

+164
-5
lines changed

kpq/keyed_priority_queue.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ func newKeyNotFoundError[K comparable](k K) error {
6060
// CmpFunc is a generic function type used for ordering the priority queue.
6161
type CmpFunc[V any] func(x, y V) bool
6262

63+
// TODO comment
64+
type pair[K comparable, V any] struct {
65+
k K
66+
v V
67+
}
68+
6369
// KeyedPriorityQueue represents a generic keyed priority queue,
6470
// where K is the key type and V is the priority value type.
6571
//
@@ -71,6 +77,8 @@ type KeyedPriorityQueue[K comparable, V any] struct {
7177
im map[K]int // inverse map of pm; note that for a given key k, pm[im[k]] == k
7278
vals map[K]V // generic priority values of key k
7379
cmp CmpFunc[V]
80+
81+
fastTrack chan pair[K, V] // used for blocking pop calls
7482
}
7583

7684
// NewKeyedPriorityQueue returns a new keyed priority queue
@@ -82,10 +90,11 @@ func NewKeyedPriorityQueue[K comparable, V any](cmp CmpFunc[V]) *KeyedPriorityQu
8290
panic("keyed priority queue: comparison function cannot be nil")
8391
}
8492
return &KeyedPriorityQueue[K, V]{
85-
pm: make([]K, 0),
86-
im: make(map[K]int),
87-
vals: make(map[K]V),
88-
cmp: cmp,
93+
pm: make([]K, 0),
94+
im: make(map[K]int),
95+
vals: make(map[K]V),
96+
cmp: cmp,
97+
fastTrack: make(chan pair[K, V]),
8998
}
9099
}
91100

@@ -104,6 +113,12 @@ func (pq *KeyedPriorityQueue[K, V]) Push(k K, v V) error {
104113
}
105114

106115
func (pq *KeyedPriorityQueue[K, V]) push(k K, v V) {
116+
select {
117+
case pq.fastTrack <- pair[K, V]{k: k, v: v}:
118+
return
119+
default:
120+
// pass
121+
}
107122
n := len(pq.pm)
108123
pq.pm = append(pq.pm, k)
109124
pq.im[k] = n
@@ -122,6 +137,29 @@ func (pq *KeyedPriorityQueue[K, V]) Pop() (K, V, bool) {
122137
var v V
123138
return k, v, false
124139
}
140+
141+
k, v := pq.pop()
142+
return k, v, true
143+
}
144+
145+
// TODO BlockingPop
146+
// Pop removes and returns the highest priority key and value from the priority queue.
147+
// It returns false as its last return value if the priority queue is empty; otherwise, true.
148+
func (pq *KeyedPriorityQueue[K, V]) BlockingPop() (K, V) {
149+
pq.mu.Lock()
150+
151+
if len(pq.pm) == 0 {
152+
pq.mu.Unlock()
153+
154+
pair := <-pq.fastTrack
155+
return pair.k, pair.v
156+
}
157+
158+
defer pq.mu.Unlock()
159+
return pq.pop()
160+
}
161+
162+
func (pq *KeyedPriorityQueue[K, V]) pop() (K, V) {
125163
n := len(pq.pm) - 1
126164
k := pq.pm[0]
127165
v := pq.vals[k]
@@ -130,7 +168,7 @@ func (pq *KeyedPriorityQueue[K, V]) Pop() (K, V, bool) {
130168
pq.pm = pq.pm[:n]
131169
delete(pq.im, k)
132170
delete(pq.vals, k)
133-
return k, v, true
171+
return k, v
134172
}
135173

136174
// Set inserts a new entry in the priority queue with the given key and value,

kpq/keyed_priority_queue_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"testing"
7+
"time"
78
)
89

910
func TestNewKeyedPriorityQueue_NilCmp(t *testing.T) {
@@ -260,6 +261,126 @@ func TestKeyedPriorityQueue_Pop(t *testing.T) {
260261
})
261262
}
262263

264+
func TestKeyedPriorityQueue_BlockingPop(t *testing.T) {
265+
t.Run("Keys", func(t *testing.T) {
266+
pq := NewKeyedPriorityQueue[string](func(x, y int) bool {
267+
return x < y
268+
})
269+
270+
items := []struct {
271+
key string
272+
val int
273+
}{
274+
{key: "fourth", val: 10},
275+
{key: "second", val: 8},
276+
{key: "third", val: 9},
277+
{key: "first", val: 6},
278+
{key: "last", val: 20},
279+
}
280+
281+
go func() {
282+
time.Sleep(10 * time.Millisecond)
283+
for _, item := range items {
284+
err := pq.Push(item.key, item.val)
285+
if err != nil {
286+
panic(fmt.Sprintf("Push(%v, %v): got unexpected error %v", item.key, item.val, err))
287+
}
288+
}
289+
}()
290+
291+
testCases := []struct {
292+
wantKey string
293+
wantValue int
294+
wantPeekKey string
295+
wantPeekValue int
296+
wantLen int
297+
}{
298+
{
299+
wantKey: "fourth", //"first",
300+
wantValue: 10, // 6,
301+
wantPeekKey: "first", //"second",
302+
wantPeekValue: 6, // 8,
303+
wantLen: 4,
304+
},
305+
{
306+
wantKey: "first", //"second",
307+
wantValue: 6, // 8,
308+
wantPeekKey: "second", //"third",
309+
wantPeekValue: 8, // 9,
310+
wantLen: 3,
311+
},
312+
}
313+
314+
for _, tc := range testCases {
315+
t.Run(fmt.Sprintf("%s_%d", tc.wantKey, tc.wantValue), func(t *testing.T) {
316+
gotKey, gotValue := pq.BlockingPop()
317+
318+
if gotKey != tc.wantKey {
319+
t.Errorf("pq.BlockingPop(): got key %q; want %q", gotKey, tc.wantKey)
320+
}
321+
322+
if gotValue != tc.wantValue {
323+
t.Errorf("pq.BlockingPop(): got value %d; want %d", gotValue, tc.wantValue)
324+
}
325+
326+
time.Sleep(10 * time.Millisecond)
327+
328+
gotPeekKey, gotPeekValue, ok := pq.Peek()
329+
if !ok {
330+
t.Fatal("got no min key and value in the priority queue")
331+
}
332+
333+
if gotPeekKey != tc.wantPeekKey {
334+
t.Errorf("pq.Peek(): got key %q; want %q", gotPeekKey, tc.wantPeekKey)
335+
}
336+
337+
if gotPeekValue != tc.wantPeekValue {
338+
t.Errorf("pq.Peek(): got value %d; want %d", gotPeekValue, tc.wantPeekValue)
339+
}
340+
341+
if got := pq.Len(); got != tc.wantLen {
342+
t.Errorf("pq.Len(): got %d; want %d", got, tc.wantLen)
343+
}
344+
})
345+
}
346+
})
347+
348+
/*t.Run("EmptyPQ", func(t *testing.T) {
349+
pq := NewKeyedPriorityQueue[string](func(x, y int) bool {
350+
return x < y
351+
})
352+
353+
items := []struct {
354+
key string
355+
val int
356+
}{
357+
{key: "fourth", val: 10},
358+
{key: "second", val: 8},
359+
{key: "third", val: 9},
360+
{key: "first", val: 6},
361+
{key: "last", val: 20},
362+
}
363+
364+
go func() {
365+
time.Sleep(10 * time.Millisecond)
366+
for _, item := range items {
367+
err := pq.Push(item.key, item.val)
368+
if err != nil {
369+
panic(fmt.Sprintf("Push(%v, %v): got unexpected error %v", item.key, item.val, err))
370+
}
371+
}
372+
}()
373+
374+
gotKey, gotValue := pq.BlockingPop()
375+
if gotKey != items[0].key {
376+
t.Errorf("pq.BlockingPop(): got key %q; want %q", gotKey, items[0].key)
377+
}
378+
if gotValue != items[0].val {
379+
t.Errorf("pq.BlockingPop(): got value %d; want %d", gotValue, items[0].val)
380+
}
381+
})*/
382+
}
383+
263384
func TestKeyedPriorityQueue_Remove(t *testing.T) {
264385
pq := NewKeyedPriorityQueue[string](func(x, y int) bool {
265386
return x < y

0 commit comments

Comments
 (0)