Skip to content

Commit 0c85de3

Browse files
authored
Log queue name (#4)
* put queue name into log messages * add tests
1 parent 845e969 commit 0c85de3

File tree

3 files changed

+90
-32
lines changed

3 files changed

+90
-32
lines changed

dsqueue.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111

1212
"github.com/gammazero/deque"
1313
lru "github.com/hashicorp/golang-lru/v2"
14-
datastore "github.com/ipfs/go-datastore"
15-
namespace "github.com/ipfs/go-datastore/namespace"
16-
query "github.com/ipfs/go-datastore/query"
14+
"github.com/ipfs/go-datastore"
15+
"github.com/ipfs/go-datastore/namespace"
16+
"github.com/ipfs/go-datastore/query"
1717
logging "github.com/ipfs/go-log/v2"
1818
)
1919

@@ -44,6 +44,7 @@ type DSQueue struct {
4444
enqueue chan string
4545
clear chan chan<- int
4646
closeTimeout time.Duration
47+
name string
4748
}
4849

4950
// New creates a queue for strings.
@@ -60,6 +61,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
6061
enqueue: make(chan string),
6162
clear: make(chan chan<- int),
6263
closeTimeout: cfg.closeTimeout,
64+
name: name,
6365
}
6466

6567
go q.worker(ctx, cfg.bufferSize, cfg.dedupCacheSize, cfg.idleWriteTime)
@@ -97,7 +99,7 @@ func (q *DSQueue) Enqueue(item []byte) (err error) {
9799
}
98100
defer func() {
99101
if r := recover(); r != nil {
100-
err = errors.New("failed to enqueue item: shutting down")
102+
err = fmt.Errorf("failed to enqueue item: %s", r)
101103
}
102104
}()
103105

@@ -118,6 +120,11 @@ func (q *DSQueue) Clear() int {
118120
return <-rsp
119121
}
120122

123+
// Name returns the name of this DSQueue instance.
124+
func (q *DSQueue) Name() string {
125+
return q.name
126+
}
127+
121128
func makeKey(item string, counter uint64) datastore.Key {
122129
b64Item := base64.RawURLEncoding.EncodeToString([]byte(item))
123130
return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, b64Item))
@@ -144,16 +151,16 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
144151
defer func() {
145152
if item != "" {
146153
if err := q.ds.Put(ctx, k, nil); err != nil {
147-
log.Errorw("dsqueue: failed to write item to datastore", "err", err)
154+
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
148155
}
149156
counter++
150157
}
151158
if inBuf.Len() != 0 {
152159
err := q.commitInput(ctx, counter, &inBuf)
153160
if err != nil && !errors.Is(err, context.Canceled) {
154-
log.Error(err)
161+
log.Errorw("error writing items to datastore", "err", err, "qname", q.name)
155162
if inBuf.Len() != 0 {
156-
q.closed <- fmt.Errorf("dsqueue: %d items not written to datastore", inBuf.Len())
163+
q.closed <- fmt.Errorf("%d items not written to datastore", inBuf.Len())
157164
}
158165
}
159166
}
@@ -183,23 +190,23 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
183190
if !dsEmpty {
184191
head, err := q.getQueueHead(ctx)
185192
if err != nil {
186-
log.Errorw("dsqueue: error querying for head of queue, stopping dsqueue", "err", err)
193+
log.Errorw("error querying for head of queue, stopping dsqueue", "err", err, "qname", q.name)
187194
return
188195
}
189196
if head != nil {
190197
k = datastore.NewKey(head.Key)
191198
if err = q.ds.Delete(ctx, k); err != nil {
192-
log.Errorw("dsqueue: error deleting queue entry, stopping dsqueue", "err", err, "key", head.Key)
199+
log.Errorw("error deleting queue entry, stopping dsqueue", "err", err, "key", head.Key, "qname", q.name)
193200
return
194201
}
195202
parts := strings.SplitN(strings.TrimPrefix(head.Key, "/"), "/", 2)
196203
if len(parts) != 2 {
197-
log.Errorw("dsqueue: malformed queued item, removing it from queue", "err", err, "key", head.Key)
204+
log.Errorw("malformed queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name)
198205
continue
199206
}
200207
itemBin, err := base64.RawURLEncoding.DecodeString(parts[1])
201208
if err != nil {
202-
log.Errorw("dsqueue: error decoding queued item, removing it from queue", "err", err, "key", head.Key)
209+
log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name)
203210
continue
204211
}
205212
item = string(itemBin)
@@ -281,11 +288,11 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
281288
dedupCache.Purge()
282289
rmDSCount, err := q.clearDatastore(ctx)
283290
if err != nil {
284-
log.Errorw("dsqueue: cannot clear datastore", "err", err)
291+
log.Errorw("cannot clear datastore", "err", err, "qname", q.name)
285292
} else {
286293
dsEmpty = true
287294
}
288-
log.Infow("cleared dsqueue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount)
295+
log.Infow("cleared dsqueue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount, "qname", q.name)
289296
rsp <- rmMemCount + rmDSCount
290297
}
291298

@@ -295,7 +302,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
295302
err = q.commitInput(ctx, counter, &inBuf)
296303
if err != nil {
297304
if !errors.Is(err, context.Canceled) {
298-
log.Errorw("dsqueue: error writing items to datastore, stopping dsqueue", "err", err)
305+
log.Errorw("error writing items to datastore, stopping dsqueue", "err", err, "qname", q.name)
299306
}
300307
return
301308
}
@@ -380,7 +387,7 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.
380387
item := items.At(i)
381388
key := makeKey(item, counter)
382389
if err = b.Put(ctx, key, nil); err != nil {
383-
log.Errorw("dsqueue: failed to add item to batch", "err", err)
390+
log.Errorw("failed to add item to batch", "err", err, "qname", q.name)
384391
continue
385392
}
386393
counter++
@@ -389,7 +396,7 @@ func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.
389396
items.Clear()
390397

391398
if err = b.Commit(ctx); err != nil {
392-
return fmt.Errorf("dsqueue: failed to commit batch to datastore: %w", err)
399+
return fmt.Errorf("failed to commit batch to datastore: %w", err)
393400
}
394401

395402
return nil

dsqueue_test.go

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ipfs/go-cid"
1111
"github.com/ipfs/go-datastore"
1212
"github.com/ipfs/go-datastore/namespace"
13+
"github.com/ipfs/go-datastore/query"
1314
"github.com/ipfs/go-datastore/sync"
1415
"github.com/ipfs/go-dsqueue"
1516
"github.com/ipfs/go-test/random"
@@ -48,6 +49,17 @@ func TestBasicOperation(t *testing.T) {
4849
queue := dsqueue.New(ds, dsqName)
4950
defer queue.Close()
5051

52+
if queue.Name() != dsqName {
53+
t.Fatal("wrong queue name")
54+
}
55+
56+
queue.Enqueue(nil)
57+
select {
58+
case <-queue.Dequeue():
59+
t.Fatal("nothing should be in queue")
60+
case <-time.After(time.Millisecond):
61+
}
62+
5163
cids := random.Cids(10)
5264
for _, c := range cids {
5365
queue.Enqueue(c.Bytes())
@@ -62,6 +74,11 @@ func TestBasicOperation(t *testing.T) {
6274
if err = queue.Close(); err != nil {
6375
t.Fatal(err)
6476
}
77+
78+
err = queue.Enqueue(cids[0].Bytes())
79+
if err == nil {
80+
t.Fatal("expected error calling Enqueue after Close")
81+
}
6582
}
6683

6784
func TestMangledData(t *testing.T) {
@@ -113,28 +130,62 @@ func TestInitialization(t *testing.T) {
113130
assertOrdered(cids[5:], queue, t)
114131
}
115132

116-
func TestInitializationWithManyCids(t *testing.T) {
133+
func TestIdleFlush(t *testing.T) {
117134
ds := sync.MutexWrap(datastore.NewMapDatastore())
118-
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(25))
135+
queue := dsqueue.New(ds, dsqName, dsqueue.WithIdleWriteTime(time.Millisecond))
119136
defer queue.Close()
120137

121-
cids := random.Cids(100)
138+
cids := random.Cids(10)
122139
for _, c := range cids {
123140
queue.Enqueue(c.Bytes())
124141
}
125142

126-
/*
127-
err := queue.Close()
128-
if err != nil {
129-
t.Fatal(err)
130-
}
143+
dsn := namespace.Wrap(ds, datastore.NewKey("/dsq-"+dsqName))
144+
time.Sleep(10 * time.Millisecond)
145+
146+
ctx := context.Background()
147+
n, err := countItems(ctx, dsn)
148+
if err != nil {
149+
t.Fatal(err)
150+
}
151+
if n != 0 {
152+
t.Fatal("expected nothing in datastore")
153+
}
131154

132-
// make a new queue, same data
133-
queue = dsqueue.New(ds, dsqName)
134-
defer queue.Close()
135-
*/
155+
time.Sleep(2 * time.Second)
136156

137-
assertOrdered(cids, queue, t)
157+
n, err = countItems(ctx, dsn)
158+
if err != nil {
159+
t.Fatal(err)
160+
}
161+
expect := len(cids) - 1
162+
if n != expect {
163+
t.Fatalf("should have flushed %d cids to datastore, got %d", expect, n)
164+
}
165+
}
166+
167+
func countItems(ctx context.Context, ds datastore.Datastore) (int, error) {
168+
qry := query.Query{
169+
KeysOnly: true,
170+
}
171+
results, err := ds.Query(ctx, qry)
172+
if err != nil {
173+
return 0, fmt.Errorf("cannot query datastore: %w", err)
174+
}
175+
defer results.Close()
176+
177+
var count int
178+
for result := range results.Next() {
179+
if ctx.Err() != nil {
180+
return 0, ctx.Err()
181+
}
182+
if result.Error != nil {
183+
return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error)
184+
}
185+
count++
186+
}
187+
188+
return count, nil
138189
}
139190

140191
func TestPersistManyCids(t *testing.T) {

option.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ func WithDedupCacheSize(n int) Option {
6464
// WithIdleWriteTime sets the amout of time that the queue must be idle (no
6565
// input or output) before all buffered input items are written to the
6666
// datastore. A value of zero means that buffered input items are not
67-
// automatically flushed to the datastore. This value must be greater than one
68-
// second.
67+
// automatically flushed to the datastore. A non-zero value must be greater
68+
// than one second.
6969
func WithIdleWriteTime(d time.Duration) Option {
7070
return func(c *config) {
71-
if d < time.Second {
71+
if d != 0 && d < time.Second {
7272
d = time.Second
7373
}
7474
c.idleWriteTime = d

0 commit comments

Comments
 (0)