Skip to content

Commit 7caa770

Browse files
wait on empty queue
1 parent 743cd49 commit 7caa770

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

provider/buffered/provider.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type SweepingProvider struct {
3636
closeOnce sync.Once
3737
done chan struct{}
3838
closed chan struct{}
39+
40+
newItems chan struct{}
3941
provider internal.Provider
4042
queue *dsqueue.DSQueue
4143
batchSize int
@@ -50,6 +52,7 @@ func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *Sweepin
5052
done: make(chan struct{}),
5153
closed: make(chan struct{}),
5254

55+
newItems: make(chan struct{}, 1),
5356
provider: prov,
5457
queue: dsqueue.New(ds, cfg.dsName,
5558
dsqueue.WithDedupCacheSize(0), // disable deduplication
@@ -133,18 +136,32 @@ func executeOperation(f func(...mh.Multihash) error, keys []mh.Multihash) {
133136
// It runs in a separate goroutine and continues until the provider is closed.
134137
func (s *SweepingProvider) worker() {
135138
defer close(s.done)
139+
var emptyQueue bool
136140
for {
137-
select {
138-
case <-s.closed:
139-
return
140-
default:
141+
if emptyQueue {
142+
select {
143+
case <-s.closed:
144+
return
145+
case <-s.newItems:
146+
}
147+
emptyQueue = false
148+
} else {
149+
select {
150+
case <-s.closed:
151+
return
152+
default:
153+
}
141154
}
142155

143156
res, err := s.queue.GetN(s.batchSize)
144157
if err != nil {
145158
logger.Warnf("BufferedSweepingProvider unable to dequeue: %v", err)
146159
continue
147160
}
161+
if len(res) < s.batchSize {
162+
// Queue was fully drained.
163+
emptyQueue = true
164+
}
148165
ops, err := getOperations(res)
149166
if err != nil {
150167
logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err)
@@ -174,6 +191,10 @@ func (s *SweepingProvider) enqueue(op byte, keys ...mh.Multihash) error {
174191
return err
175192
}
176193
}
194+
select {
195+
case s.newItems <- struct{}{}:
196+
default:
197+
}
177198
return nil
178199
}
179200

0 commit comments

Comments
 (0)