Skip to content

Commit 12b0307

Browse files
authored
add GetN method to retrieve batches of available items (#11)
* add GetN method to retrieve batches of available items GetN retrieves up to n items that are currently available in the queue. If there are no items currently available, then none are returned and GetN does not wait for any. GetN is used to poll the DSQueue for items and return batches of those items. This is the most efficient way of fetching currently available items. GetN and Out can both be used to read items from the DSQueue, but they should not be used concurrently as items will be returned by one or tha other indeterminately. * improve tests
1 parent 8618ae8 commit 12b0307

File tree

2 files changed

+188
-28
lines changed

2 files changed

+188
-28
lines changed

dsqueue.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type DSQueue struct {
4444
enqueue chan string
4545
clear chan chan<- int
4646
closeTimeout time.Duration
47+
getn chan getRequest
4748
name string
4849
}
4950

@@ -61,6 +62,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
6162
enqueue: make(chan string),
6263
clear: make(chan chan<- int),
6364
closeTimeout: cfg.closeTimeout,
65+
getn: make(chan getRequest),
6466
name: name,
6567
}
6668

@@ -107,6 +109,36 @@ func (q *DSQueue) Put(item []byte) (err error) {
107109
return
108110
}
109111

112+
type getRequest struct {
113+
n int
114+
rsp chan getResponse
115+
}
116+
117+
type getResponse struct {
118+
items [][]byte
119+
err error
120+
}
121+
122+
// GetN retrieves up to n items that are currently available in the queue. If
123+
// there are no items currently available, then none are returned and GetN does
124+
// not wait for any.
125+
//
126+
// GetN is used to poll the DSQueue for items and return batches of those
127+
// items. This is the most efficient way of fetching currently available items.
128+
//
129+
// GetN and Out can both be used to read items from the DSQueue, but they
130+
// should not be used concurrently as items will be returned by one or the
131+
// other indeterminately.
132+
func (q *DSQueue) GetN(n int) ([][]byte, error) {
133+
rsp := make(chan getResponse)
134+
q.getn <- getRequest{
135+
n: n,
136+
rsp: rsp,
137+
}
138+
getRsp := <-rsp
139+
return getRsp.items, getRsp.err
140+
}
141+
110142
// Out returns a channel that for reading entries from the queue,
111143
func (q *DSQueue) Out() <-chan []byte {
112144
return q.dequeue
@@ -264,9 +296,43 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
264296
if bufferSize != 0 && inBuf.Len() >= bufferSize {
265297
commit = true
266298
}
299+
case getRequest := <-q.getn:
300+
n := getRequest.n
301+
rspChan := getRequest.rsp
302+
var outItems [][]byte
303+
304+
if item != "" {
305+
outItems = append(outItems, []byte(item))
306+
307+
if !dsEmpty {
308+
outItems, err = q.readDatastore(ctx, n-len(outItems), outItems)
309+
if err != nil {
310+
rspChan <- getResponse{
311+
err: err,
312+
}
313+
continue
314+
}
315+
}
316+
317+
item = ""
318+
idle = false
319+
}
320+
if len(outItems) < n {
321+
for itm := range inBuf.IterPopFront() {
322+
outItems = append(outItems, []byte(itm))
323+
if len(outItems) == n {
324+
break
325+
}
326+
}
327+
}
328+
rspChan <- getResponse{
329+
items: outItems,
330+
}
331+
267332
case dequeue <- []byte(item):
268333
item = ""
269334
idle = false
335+
270336
case <-batchTimer.C:
271337
if idle {
272338
if inBuf.Len() != 0 {
@@ -414,3 +480,67 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.
414480

415481
return nil
416482
}
483+
484+
// readDatastore reads at most n items from the data store queue, in order, and
485+
// appends them to items slice. Items are batch-deleted from the datastore as
486+
// they are read. The modified items slice is returned.
487+
func (q *DSQueue) readDatastore(ctx context.Context, n int, items [][]byte) ([][]byte, error) {
488+
qry := query.Query{
489+
KeysOnly: true,
490+
Orders: []query.Order{query.OrderByKey{}},
491+
Limit: n,
492+
}
493+
results, err := q.ds.Query(ctx, qry)
494+
if err != nil {
495+
return nil, err
496+
}
497+
defer results.Close()
498+
499+
batch, err := q.ds.Batch(ctx)
500+
if err != nil {
501+
return nil, fmt.Errorf("cannot create datastore batch: %w", err)
502+
}
503+
var delCount int
504+
505+
for result := range results.Next() {
506+
if ctx.Err() != nil {
507+
return nil, ctx.Err()
508+
}
509+
if result.Error != nil {
510+
return nil, result.Error
511+
}
512+
513+
if err = batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil {
514+
return nil, fmt.Errorf("error deleting queue item: %w", err)
515+
}
516+
delCount++
517+
518+
if delCount >= DefaultBufferSize {
519+
delCount = 0
520+
if err = batch.Commit(ctx); err != nil {
521+
return nil, fmt.Errorf("cannot commit datastore updates: %w", err)
522+
}
523+
}
524+
525+
parts := strings.SplitN(strings.TrimPrefix(result.Key, "/"), "/", 2)
526+
if len(parts) != 2 {
527+
log.Errorw("malformed queued item, removing it from queue", "err", err, "key", result.Key, "qname", q.name)
528+
continue
529+
}
530+
itemBin, err := base64.RawURLEncoding.DecodeString(parts[1])
531+
if err != nil {
532+
log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", result.Key, "qname", q.name)
533+
continue
534+
}
535+
items = append(items, itemBin)
536+
}
537+
538+
if err = batch.Commit(ctx); err != nil {
539+
return nil, fmt.Errorf("cannot commit datastore updated: %w", err)
540+
}
541+
if err = q.ds.Sync(ctx, datastore.NewKey("")); err != nil {
542+
return nil, fmt.Errorf("cannot sync datastore: %w", err)
543+
}
544+
545+
return items, nil
546+
}

dsqueue_test.go

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
const dsqName = "testq"
2020

21-
func assertOrdered(cids []cid.Cid, q *dsqueue.DSQueue, t *testing.T) {
21+
func assertOrdered(t *testing.T, cids []cid.Cid, q *dsqueue.DSQueue) {
2222
t.Helper()
2323

2424
var count int
@@ -60,41 +60,33 @@ func TestBasicOperation(t *testing.T) {
6060
case <-time.After(time.Millisecond):
6161
}
6262

63+
items := []string{"apple", "banana", "cherry"}
64+
6365
out := make(chan []string)
6466
go func() {
6567
var outStrs []string
66-
for {
67-
select {
68-
case dq, open := <-queue.Out():
69-
if !open {
70-
out <- outStrs
71-
return
72-
}
73-
dqItem := string(dq)
74-
t.Log("got:", dqItem)
75-
outStrs = append(outStrs, dqItem)
68+
for dq := range queue.Out() {
69+
dqItem := string(dq)
70+
outStrs = append(outStrs, dqItem)
71+
if len(outStrs) == len(items) {
72+
break
7673
}
7774
}
75+
out <- outStrs
7876
}()
7977

80-
items := []string{"apple", "banana", "cherry"}
8178
for _, item := range items {
8279
queue.Put([]byte(item))
8380
}
8481

85-
time.Sleep(time.Second)
86-
err := queue.Close()
87-
if err != nil {
88-
t.Fatal(err)
89-
}
90-
9182
qout := <-out
9283

9384
if len(qout) != len(items) {
94-
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(items), len(qout))
85+
t.Fatalf("dequeued wrong number of items, expected %d, got %d", len(items), len(qout))
9586
}
9687

97-
if err = queue.Close(); err != nil {
88+
err := queue.Close()
89+
if err != nil {
9890
t.Fatal(err)
9991
}
10092

@@ -104,6 +96,45 @@ func TestBasicOperation(t *testing.T) {
10496
}
10597
}
10698

99+
func TestGetN(t *testing.T) {
100+
ds := sync.MutexWrap(datastore.NewMapDatastore())
101+
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithDedupCacheSize(0))
102+
defer queue.Close()
103+
104+
cids := random.Cids(29)
105+
for _, c := range cids {
106+
queue.Put(c.Bytes())
107+
}
108+
109+
outItems, err := queue.GetN(50)
110+
if err != nil {
111+
t.Fatal(err)
112+
}
113+
114+
if len(outItems) != len(cids) {
115+
t.Fatalf("dequeued wrond number of items, expected %d, got %d", len(cids), len(outItems))
116+
}
117+
118+
for i := range outItems {
119+
outCid, err := cid.Parse(outItems[i])
120+
if err != nil {
121+
t.Fatal(err)
122+
}
123+
if outCid != cids[i] {
124+
t.Fatal("retrieved items out of order")
125+
}
126+
}
127+
128+
outItems, err = queue.GetN(10)
129+
if err != nil {
130+
t.Fatal(err)
131+
}
132+
133+
if len(outItems) != 0 {
134+
t.Fatal("shoul not get anymore items from queue")
135+
}
136+
}
137+
107138
func TestMangledData(t *testing.T) {
108139
ds := sync.MutexWrap(datastore.NewMapDatastore())
109140

@@ -125,8 +156,7 @@ func TestMangledData(t *testing.T) {
125156
}
126157

127158
// expect to only see the valid cids we entered
128-
expected := cids
129-
assertOrdered(expected, queue, t)
159+
assertOrdered(t, cids, queue)
130160
}
131161

132162
func TestInitialization(t *testing.T) {
@@ -139,7 +169,7 @@ func TestInitialization(t *testing.T) {
139169
queue.Put(c.Bytes())
140170
}
141171

142-
assertOrdered(cids[:5], queue, t)
172+
assertOrdered(t, cids[:5], queue)
143173

144174
err := queue.Close()
145175
if err != nil {
@@ -150,7 +180,7 @@ func TestInitialization(t *testing.T) {
150180
queue = dsqueue.New(ds, dsqName)
151181
defer queue.Close()
152182

153-
assertOrdered(cids[5:], queue, t)
183+
assertOrdered(t, cids[5:], queue)
154184
}
155185

156186
func TestIdleFlush(t *testing.T) {
@@ -230,7 +260,7 @@ func TestPersistManyCids(t *testing.T) {
230260
queue = dsqueue.New(ds, dsqName)
231261
defer queue.Close()
232262

233-
assertOrdered(cids, queue, t)
263+
assertOrdered(t, cids, queue)
234264
}
235265

236266
func TestPersistOneCid(t *testing.T) {
@@ -250,7 +280,7 @@ func TestPersistOneCid(t *testing.T) {
250280
queue = dsqueue.New(ds, dsqName)
251281
defer queue.Close()
252282

253-
assertOrdered(cids, queue, t)
283+
assertOrdered(t, cids, queue)
254284
}
255285

256286
func TestDeduplicateCids(t *testing.T) {
@@ -268,7 +298,7 @@ func TestDeduplicateCids(t *testing.T) {
268298
queue.Put(cids[0].Bytes())
269299
queue.Put(cids[4].Bytes())
270300

271-
assertOrdered(cids, queue, t)
301+
assertOrdered(t, cids, queue)
272302

273303
// Test with dedup cache disabled.
274304
queue = dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(-1))
@@ -278,7 +308,7 @@ func TestDeduplicateCids(t *testing.T) {
278308
for _, c := range cids {
279309
queue.Put(c.Bytes())
280310
}
281-
assertOrdered(cids, queue, t)
311+
assertOrdered(t, cids, queue)
282312
}
283313

284314
func TestClear(t *testing.T) {

0 commit comments

Comments
 (0)