Skip to content

Commit 43c1149

Browse files
authored
replace provider queue with go-dsqueue (#1033)
* replace provider queue with go-dsqueue Replace the old provider queue with go-dsqueue. The code for clearing the old provider queue datastore remains for cleaning up any data persisted by the old provider queue.
1 parent 03aa83e commit 43c1149

File tree

5 files changed

+40
-524
lines changed

5 files changed

+40
-524
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/ipfs/go-cidutil v0.1.0
2323
github.com/ipfs/go-datastore v0.8.3
2424
github.com/ipfs/go-detect-race v0.0.1
25+
github.com/ipfs/go-dsqueue v0.0.4
2526
github.com/ipfs/go-ipfs-delay v0.0.1
2627
github.com/ipfs/go-ipfs-redirects-file v0.1.2
2728
github.com/ipfs/go-ipld-format v0.6.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ github.com/ipfs/go-datastore v0.8.3 h1:z391GsQyGKUIUof2tPoaZVeDknbt7fNHs6Gqjcw5J
149149
github.com/ipfs/go-datastore v0.8.3/go.mod h1:raxQ/CreIy9L6MxT71ItfMX12/ASN6EhXJoUFjICQ2M=
150150
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
151151
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
152+
github.com/ipfs/go-dsqueue v0.0.4 h1:tesq26hKRYPG72Tu9kZKsbsLWp1KBfAxWNQlMyU17tk=
153+
github.com/ipfs/go-dsqueue v0.0.4/go.mod h1:K68ng9BVl+gLr8fqCJKaoXnXqo6MzQ6nV0MhZZFEwg4=
152154
github.com/ipfs/go-ipfs-blockstore v1.3.1 h1:cEI9ci7V0sRNivqaOr0elDsamxXFxJMMMy7PTTDQNsQ=
153155
github.com/ipfs/go-ipfs-blockstore v1.3.1/go.mod h1:KgtZyc9fq+P2xJUiCAzbRdhhqJHvsw8u2Dlqy2MyRTE=
154156
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=

provider/internal/queue/queue.go

Lines changed: 12 additions & 334 deletions
Original file line numberDiff line numberDiff line change
@@ -1,313 +1,35 @@
1+
// Package queue only remains to provide functionality to remove items from
2+
// the old provider queue datastore.
3+
//
4+
// TODO: remove thos package after kubo v.39.0
15
package queue
26

37
import (
48
"context"
5-
"encoding/base64"
6-
"errors"
79
"fmt"
8-
"sync"
9-
"time"
1010

11-
"github.com/gammazero/deque"
12-
lru "github.com/hashicorp/golang-lru/v2"
13-
cid "github.com/ipfs/go-cid"
1411
datastore "github.com/ipfs/go-datastore"
1512
namespace "github.com/ipfs/go-datastore/namespace"
1613
query "github.com/ipfs/go-datastore/query"
17-
logging "github.com/ipfs/go-log/v2"
18-
)
19-
20-
var log = logging.Logger("provider")
21-
22-
const (
23-
// batchSize is the limit on number of CIDs kept in memory at which there
24-
// are all written to the datastore.
25-
batchSize = 16 * 1024
26-
// dedupCacheSize is the size of the LRU cache used to deduplicate CIDs in
27-
// the queue.
28-
dedupCacheSize = 2 * 1024
29-
// idleWriteTime is the amout of time to check if the queue has been idle
30-
// (no input or output). If the queue has been idle since the last check,
31-
// then write all buffered CIDs to the datastore.
32-
idleWriteTime = time.Minute
33-
// shutdownTimeout is the duration that Close waits to finish writing CIDs
34-
// to the datastore.
35-
shutdownTimeout = 10 * time.Second
3614
)
3715

38-
// Queue provides a FIFO interface to the datastore for storing cids.
39-
//
40-
// CIDs in the process of being provided when a crash or shutdown occurs may be
41-
// in the queue when the node is brought back online depending on whether they
42-
// were fully written to the underlying datastore.
43-
//
44-
// Input to the queue is buffered in memory. The contents of the buffer are
45-
// written to the datastore when the input buffer contains batchSize items, or
46-
// when idleWriteTime has elapsed since the previous batch write or dequeue. CIDs to
47-
// dequeue are read, in order, from the input buffer if there are none in the
48-
// datastore. Otherwise they are read from the datastore.
49-
//
50-
// If queued items are read from the input buffer before it reaches its limit,
51-
// then queued items can remain in memory. When the queue is closed, any
52-
// remaining items in memory are written to the datastore.
53-
type Queue struct {
54-
close context.CancelFunc
55-
closed chan error
56-
closeOnce sync.Once
57-
dequeue chan cid.Cid
58-
ds datastore.Batching
59-
enqueue chan cid.Cid
60-
clear chan chan<- int
61-
}
62-
63-
// New creates a queue for cids.
64-
func New(ds datastore.Batching) *Queue {
65-
ctx, cancel := context.WithCancel(context.Background())
66-
67-
q := &Queue{
68-
close: cancel,
69-
closed: make(chan error, 1),
70-
dequeue: make(chan cid.Cid),
71-
ds: namespace.Wrap(ds, datastore.NewKey("/queue")),
72-
enqueue: make(chan cid.Cid),
73-
clear: make(chan chan<- int),
74-
}
75-
76-
go q.worker(ctx)
77-
78-
return q
79-
}
80-
81-
// Close stops the queue.
82-
func (q *Queue) Close() error {
83-
var err error
84-
q.closeOnce.Do(func() {
85-
// Close input queue and wait for worker to finish reading it.
86-
close(q.enqueue)
87-
select {
88-
case <-q.closed:
89-
case <-time.After(shutdownTimeout):
90-
q.close() // force immediate shutdown
91-
err = <-q.closed
92-
}
93-
close(q.dequeue) // no more output from this queue
94-
})
95-
return err
96-
}
97-
98-
// Enqueue puts a cid in the queue.
99-
func (q *Queue) Enqueue(c cid.Cid) (err error) {
100-
if c == cid.Undef {
101-
return
102-
}
103-
defer func() {
104-
if r := recover(); r != nil {
105-
err = errors.New("failed to enqueue CID: shutting down")
106-
}
107-
}()
108-
q.enqueue <- c
109-
return
110-
}
111-
112-
// Dequeue returns a channel that for reading entries from the queue,
113-
func (q *Queue) Dequeue() <-chan cid.Cid {
114-
return q.dequeue
115-
}
116-
117-
// Clear clears all queued records from memory and the datastore. Returns the
118-
// number of CIDs removed from the queue.
119-
func (q *Queue) Clear() int {
120-
rsp := make(chan int)
121-
q.clear <- rsp
122-
return <-rsp
123-
}
124-
125-
func makeCidString(c cid.Cid) string {
126-
data := c.Bytes()
127-
if len(data) > 4 {
128-
data = data[len(data)-4:]
129-
}
130-
return base64.RawURLEncoding.EncodeToString(data)
131-
}
132-
133-
func makeKey(c cid.Cid, counter uint64) datastore.Key {
134-
return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, makeCidString(c)))
135-
}
136-
137-
// worker run dequeues and enqueues when available.
138-
func (q *Queue) worker(ctx context.Context) {
139-
defer close(q.closed)
140-
141-
var (
142-
c cid.Cid
143-
counter uint64
144-
inBuf deque.Deque[cid.Cid]
145-
)
146-
147-
const baseCap = 1024
148-
inBuf.SetBaseCap(baseCap)
149-
k := datastore.Key{}
150-
dedupCache, _ := lru.New[cid.Cid, struct{}](dedupCacheSize)
151-
152-
defer func() {
153-
if c != cid.Undef {
154-
if err := q.ds.Put(ctx, k, c.Bytes()); err != nil {
155-
log.Errorw("provider queue: failed to write cid to datastore", "err", err)
156-
}
157-
counter++
158-
}
159-
if inBuf.Len() != 0 {
160-
err := q.commitInput(ctx, counter, &inBuf)
161-
if err != nil && !errors.Is(err, context.Canceled) {
162-
log.Error(err)
163-
if inBuf.Len() != 0 {
164-
q.closed <- fmt.Errorf("provider queue: %d cids not written to datastore", inBuf.Len())
165-
}
166-
}
167-
}
168-
}()
169-
170-
var (
171-
commit bool
172-
dsEmpty bool
173-
err error
174-
idle bool
175-
)
176-
177-
readInBuf := q.enqueue
178-
179-
batchTimer := time.NewTimer(idleWriteTime)
180-
defer batchTimer.Stop()
181-
182-
for {
183-
if c == cid.Undef {
184-
if !dsEmpty {
185-
head, err := q.getQueueHead(ctx)
186-
if err != nil {
187-
log.Errorw("provider queue: error querying for head of queue, stopping provider", "err", err)
188-
return
189-
}
190-
if head != nil {
191-
k = datastore.NewKey(head.Key)
192-
if err = q.ds.Delete(ctx, k); err != nil {
193-
log.Errorw("provider queue: error deleting queue entry, stopping provider", "err", err, "key", head.Key)
194-
return
195-
}
196-
c, err = cid.Parse(head.Value)
197-
if err != nil {
198-
log.Warnw("provider queue: error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key)
199-
continue
200-
}
201-
} else {
202-
dsEmpty = true
203-
}
204-
}
205-
if dsEmpty && inBuf.Len() != 0 {
206-
// There were no queued CIDs in the datastore, so read one from
207-
// the input buffer.
208-
c = inBuf.PopFront()
209-
k = makeKey(c, counter)
210-
}
211-
}
16+
// ClearDatastore clears any entried from the previous queue from the datastore.
17+
func ClearDatastore(ds datastore.Batching) (int, error) {
18+
const batchSize = 4096
21219

213-
// If c != cid.Undef set dequeue and attempt write.
214-
var dequeue chan cid.Cid
215-
if c != cid.Undef {
216-
dequeue = q.dequeue
217-
}
218-
219-
select {
220-
case toQueue, ok := <-readInBuf:
221-
if !ok {
222-
return
223-
}
224-
if found, _ := dedupCache.ContainsOrAdd(toQueue, struct{}{}); found {
225-
// update recentness in LRU cache
226-
dedupCache.Add(toQueue, struct{}{})
227-
continue
228-
}
229-
idle = false
230-
231-
if c == cid.Undef {
232-
// Use this CID as the next output since there was nothing in
233-
// the datastore or buffer previously.
234-
c = toQueue
235-
k = makeKey(c, counter)
236-
continue
237-
}
238-
239-
inBuf.PushBack(toQueue)
240-
if inBuf.Len() >= batchSize {
241-
commit = true
242-
}
243-
case dequeue <- c:
244-
c = cid.Undef
245-
idle = false
246-
case <-batchTimer.C:
247-
if idle {
248-
if inBuf.Len() != 0 {
249-
commit = true
250-
} else {
251-
if inBuf.Cap() > baseCap {
252-
inBuf = deque.Deque[cid.Cid]{}
253-
inBuf.SetBaseCap(baseCap)
254-
}
255-
}
256-
}
257-
idle = true
258-
batchTimer.Reset(idleWriteTime)
259-
260-
case <-ctx.Done():
261-
return
262-
263-
case rsp := <-q.clear:
264-
var rmMemCount int
265-
if c != cid.Undef {
266-
rmMemCount = 1
267-
}
268-
c = cid.Undef
269-
k = datastore.Key{}
270-
idle = false
271-
rmMemCount += inBuf.Len()
272-
inBuf.Clear()
273-
dedupCache.Purge()
274-
rmDSCount, err := q.clearDatastore(ctx)
275-
if err != nil {
276-
log.Errorw("provider queue: cannot clear datastore", "err", err)
277-
} else {
278-
dsEmpty = true
279-
}
280-
log.Infow("cleared provider queue", "fromMemory", rmMemCount, "fromDatastore", rmDSCount)
281-
rsp <- rmMemCount + rmDSCount
282-
}
283-
284-
if commit {
285-
commit = false
286-
n := inBuf.Len()
287-
err = q.commitInput(ctx, counter, &inBuf)
288-
if err != nil {
289-
if !errors.Is(err, context.Canceled) {
290-
log.Errorw("provider queue: error writing CIDs to datastore, stopping provider", "err", err)
291-
}
292-
return
293-
}
294-
counter += uint64(n)
295-
dsEmpty = false
296-
}
297-
}
298-
}
20+
ds = namespace.Wrap(ds, datastore.NewKey("/queue"))
21+
ctx := context.Background()
29922

300-
func (q *Queue) clearDatastore(ctx context.Context) (int, error) {
30123
qry := query.Query{
30224
KeysOnly: true,
30325
}
304-
results, err := q.ds.Query(ctx, qry)
26+
results, err := ds.Query(ctx, qry)
30527
if err != nil {
30628
return 0, fmt.Errorf("cannot query datastore: %w", err)
30729
}
30830
defer results.Close()
30931

310-
batch, err := q.ds.Batch(ctx)
32+
batch, err := ds.Batch(ctx)
31133
if err != nil {
31234
return 0, fmt.Errorf("cannot create datastore batch: %w", err)
31335
}
@@ -336,53 +58,9 @@ func (q *Queue) clearDatastore(ctx context.Context) (int, error) {
33658
if err = batch.Commit(ctx); err != nil {
33759
return 0, fmt.Errorf("cannot commit datastore updated: %w", err)
33860
}
339-
if err = q.ds.Sync(ctx, datastore.NewKey("")); err != nil {
61+
if err = ds.Sync(ctx, datastore.NewKey("")); err != nil {
34062
return 0, fmt.Errorf("cannot sync datastore: %w", err)
34163
}
34264

34365
return rmCount, nil
34466
}
345-
346-
func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) {
347-
qry := query.Query{
348-
Orders: []query.Order{query.OrderByKey{}},
349-
Limit: 1,
350-
}
351-
results, err := q.ds.Query(ctx, qry)
352-
if err != nil {
353-
return nil, err
354-
}
355-
defer results.Close()
356-
r, ok := results.NextSync()
357-
if !ok {
358-
return nil, nil
359-
}
360-
361-
return &r.Entry, r.Error
362-
}
363-
364-
func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deque[cid.Cid]) error {
365-
b, err := q.ds.Batch(ctx)
366-
if err != nil {
367-
return fmt.Errorf("failed to create batch: %w", err)
368-
}
369-
370-
cstr := makeCidString(cids.Front())
371-
for i := range cids.Len() {
372-
c := cids.At(i)
373-
key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr))
374-
if err = b.Put(ctx, key, c.Bytes()); err != nil {
375-
log.Errorw("provider queue: failed to add cid to batch", "err", err)
376-
continue
377-
}
378-
counter++
379-
}
380-
381-
cids.Clear()
382-
383-
if err = b.Commit(ctx); err != nil {
384-
return fmt.Errorf("failed to commit batch to datastore: %w", err)
385-
}
386-
387-
return nil
388-
}

0 commit comments

Comments
 (0)