Skip to content

Commit 339b0f0

Browse files
wait on empty queue
1 parent 743cd49 commit 339b0f0

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

provider/buffered/provider.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type SweepingProvider struct {
3636
closeOnce sync.Once
3737
done chan struct{}
3838
closed chan struct{}
39+
40+
newItems chan struct{}
3941
provider internal.Provider
4042
queue *dsqueue.DSQueue
4143
batchSize int
@@ -50,6 +52,7 @@ func New(prov internal.Provider, ds datastore.Batching, opts ...Option) *Sweepin
5052
done: make(chan struct{}),
5153
closed: make(chan struct{}),
5254

55+
newItems: make(chan struct{}, 1),
5356
provider: prov,
5457
queue: dsqueue.New(ds, cfg.dsName,
5558
dsqueue.WithDedupCacheSize(0), // disable deduplication
@@ -133,18 +136,33 @@ func executeOperation(f func(...mh.Multihash) error, keys []mh.Multihash) {
133136
// It runs in a separate goroutine and continues until the provider is closed.
134137
func (s *SweepingProvider) worker() {
135138
defer close(s.done)
139+
var emptyQueue bool
136140
for {
137-
select {
138-
case <-s.closed:
139-
return
140-
default:
141+
if emptyQueue {
142+
select {
143+
case <-s.closed:
144+
return
145+
case <-s.newItems:
146+
}
147+
emptyQueue = false
148+
} else {
149+
select {
150+
case <-s.closed:
151+
return
152+
case <-s.newItems:
153+
default:
154+
}
141155
}
142156

143157
res, err := s.queue.GetN(s.batchSize)
144158
if err != nil {
145159
logger.Warnf("BufferedSweepingProvider unable to dequeue: %v", err)
146160
continue
147161
}
162+
if len(res) < s.batchSize {
163+
// Queue was fully drained.
164+
emptyQueue = true
165+
}
148166
ops, err := getOperations(res)
149167
if err != nil {
150168
logger.Warnf("BufferedSweepingProvider unable to parse dequeued item: %v", err)
@@ -174,6 +192,10 @@ func (s *SweepingProvider) enqueue(op byte, keys ...mh.Multihash) error {
174192
return err
175193
}
176194
}
195+
select {
196+
case s.newItems <- struct{}{}:
197+
default:
198+
}
177199
return nil
178200
}
179201

0 commit comments

Comments
 (0)