Skip to content

Commit a75db37

Browse files
authored
improve close errors and add tests (#6)
1 parent f8885f0 commit a75db37

File tree

2 files changed

+79
-8
lines changed

2 files changed

+79
-8
lines changed

dsqueue.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ var log = logging.Logger("dsqueue")
3636
// then queued items can remain in memory. When the queue is closed, any
3737
// remaining items in memory are written to the datastore.
3838
type DSQueue struct {
39-
close context.CancelFunc
39+
cancel context.CancelFunc
4040
closed chan error
4141
closeOnce sync.Once
4242
dequeue chan []byte
@@ -54,7 +54,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
5454
ctx, cancel := context.WithCancel(context.Background())
5555

5656
q := &DSQueue{
57-
close: cancel,
57+
cancel: cancel,
5858
closed: make(chan error, 1),
5959
dequeue: make(chan []byte),
6060
ds: namespace.Wrap(ds, datastore.NewKey("/dsq-"+name)),
@@ -84,7 +84,7 @@ func (q *DSQueue) Close() error {
8484
select {
8585
case <-q.closed:
8686
case <-timeoutCh:
87-
q.close() // force immediate shutdown
87+
q.cancel() // force immediate shutdown
8888
err = <-q.closed
8989
}
9090
close(q.dequeue) // no more output from this queue
@@ -150,21 +150,30 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
150150

151151
defer func() {
152152
if item != "" {
153+
// Write the item directly, instead of pushing it to the front of
154+
// inbuf, in order to retain it's original kay, and therefore the
155+
// order in the datastore, which may not be empty.
153156
if err := q.ds.Put(ctx, k, nil); err != nil {
154-
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
157+
if !errors.Is(err, context.Canceled) {
158+
log.Errorw("failed to write item to datastore", "err", err, "qname", q.name)
159+
}
160+
q.closed <- fmt.Errorf("%d items not written to datastore", 1+inBuf.Len())
161+
return
155162
}
156163
}
157164
if inBuf.Len() != 0 {
158165
err := q.commitInput(ctx, counter, &inBuf)
159-
if err != nil && !errors.Is(err, context.Canceled) {
160-
log.Errorw("error writing items to datastore", "err", err, "qname", q.name)
166+
if err != nil {
167+
if !errors.Is(err, context.Canceled) {
168+
log.Errorw("error writing items to datastore", "err", err, "qname", q.name)
169+
}
161170
if inBuf.Len() != 0 {
162171
q.closed <- fmt.Errorf("%d items not written to datastore", inBuf.Len())
163172
}
164173
}
165174
}
166175
if err := q.ds.Sync(ctx, datastore.NewKey("")); err != nil {
167-
q.closed <- fmt.Errorf("cannot sync datastore: %w", err)
176+
log.Errorw("failed to sync datastore", "err", err, "qname", q.name)
168177
}
169178
}()
170179

@@ -378,6 +387,10 @@ func (q *DSQueue) getQueueHead(ctx context.Context) (*query.Entry, error) {
378387
}
379388

380389
func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.Deque[string]) error {
390+
if ctx.Err() != nil {
391+
return ctx.Err()
392+
}
393+
381394
b, err := q.ds.Batch(ctx)
382395
if err != nil {
383396
return fmt.Errorf("failed to create batch: %w", err)

dsqueue_test.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func TestInitialization(t *testing.T) {
132132

133133
func TestIdleFlush(t *testing.T) {
134134
ds := sync.MutexWrap(datastore.NewMapDatastore())
135-
queue := dsqueue.New(ds, dsqName, dsqueue.WithIdleWriteTime(time.Millisecond))
135+
queue := dsqueue.New(ds, dsqName, dsqueue.WithBufferSize(-1), dsqueue.WithIdleWriteTime(time.Millisecond))
136136
defer queue.Close()
137137

138138
cids := random.Cids(10)
@@ -246,6 +246,16 @@ func TestDeduplicateCids(t *testing.T) {
246246
queue.Enqueue(cids[4].Bytes())
247247

248248
assertOrdered(cids, queue, t)
249+
250+
// Test with dedup cache disabled.
251+
queue = dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(-1))
252+
defer queue.Close()
253+
254+
cids = append(cids, cids[0], cids[0], cids[1])
255+
for _, c := range cids {
256+
queue.Enqueue(c.Bytes())
257+
}
258+
assertOrdered(cids, queue, t)
249259
}
250260

251261
func TestClear(t *testing.T) {
@@ -292,3 +302,51 @@ func TestClear(t *testing.T) {
292302
case <-time.After(10 * time.Millisecond):
293303
}
294304
}
305+
306+
func TestCloseTimeout(t *testing.T) {
307+
ds := sync.MutexWrap(datastore.NewMapDatastore())
308+
sds := &slowds{
309+
Batching: ds,
310+
delay: time.Second,
311+
}
312+
queue := dsqueue.New(sds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithCloseTimeout(time.Microsecond))
313+
defer queue.Close()
314+
315+
cids := random.Cids(5)
316+
for _, c := range cids {
317+
queue.Enqueue(c.Bytes())
318+
}
319+
320+
err := queue.Close()
321+
if err == nil {
322+
t.Fatal("expected error")
323+
}
324+
const expectErr = "5 items not written to datastore"
325+
if err.Error() != expectErr {
326+
t.Fatalf("did not get expected err %q, got %q", expectErr, err.Error())
327+
}
328+
329+
// Test with no close timeout.
330+
queue = dsqueue.New(sds, dsqName, dsqueue.WithBufferSize(5), dsqueue.WithCloseTimeout(-1))
331+
defer queue.Close()
332+
333+
for _, c := range cids {
334+
queue.Enqueue(c.Bytes())
335+
}
336+
if err = queue.Close(); err != nil {
337+
t.Fatal(err)
338+
}
339+
}
340+
341+
type slowds struct {
342+
datastore.Batching
343+
delay time.Duration
344+
}
345+
346+
func (sds *slowds) Put(ctx context.Context, key datastore.Key, value []byte) error {
347+
time.Sleep(sds.delay)
348+
if ctx.Err() != nil {
349+
return ctx.Err()
350+
}
351+
return sds.Batching.Put(ctx, key, value)
352+
}

0 commit comments

Comments
 (0)