7
7
"fmt"
8
8
"io"
9
9
"strings"
10
- "sync"
11
10
"time"
12
11
13
12
lru "github.com/hashicorp/golang-lru/simplelru"
@@ -67,9 +66,8 @@ type ProviderManager struct {
67
66
68
67
cleanupInterval time.Duration
69
68
70
- ctx context.Context
71
69
cancel context.CancelFunc
72
- wg sync. WaitGroup
70
+ closed chan struct {}
73
71
}
74
72
75
73
var _ ProviderStore = (* ProviderManager )(nil )
@@ -118,30 +116,31 @@ type getProv struct {
118
116
119
117
// NewProviderManager constructor
120
118
func NewProviderManager (ctx context.Context , local peer.ID , ps peerstore.Peerstore , dstore ds.Batching , opts ... Option ) (* ProviderManager , error ) {
121
- pm := new (ProviderManager )
122
- pm .self = local
123
- pm .getprovs = make (chan * getProv )
124
- pm .newprovs = make (chan * addProv )
125
- pm .pstore = ps
126
- pm .dstore = autobatch .NewAutoBatching (dstore , batchBufferSize )
127
119
cache , err := lru .NewLRU (lruCacheSize , nil )
128
120
if err != nil {
129
121
return nil , err
130
122
}
131
- pm .cache = cache
132
- pm .cleanupInterval = defaultCleanupInterval
123
+ pm := & ProviderManager {
124
+ self : local ,
125
+ getprovs : make (chan * getProv ),
126
+ newprovs : make (chan * addProv ),
127
+ closed : make (chan struct {}),
128
+ pstore : ps ,
129
+ dstore : autobatch .NewAutoBatching (dstore , batchBufferSize ),
130
+ cache : cache ,
131
+ cleanupInterval : defaultCleanupInterval ,
132
+ }
133
133
if err := pm .applyOptions (opts ... ); err != nil {
134
134
return nil , err
135
135
}
136
- pm . ctx , pm .cancel = context .WithCancel (ctx )
137
- pm .run ()
136
+ ctx , pm .cancel = context .WithCancel (ctx )
137
+ pm .run (ctx )
138
138
return pm , nil
139
139
}
140
140
141
- func (pm * ProviderManager ) run () {
142
- pm .wg .Add (1 )
141
+ func (pm * ProviderManager ) run (ctx context.Context ) {
143
142
go func () {
144
- defer pm .wg . Done ( )
143
+ defer close ( pm .closed )
145
144
146
145
var gcQuery dsq.Results
147
146
gcTimer := time .NewTimer (pm .cleanupInterval )
@@ -164,17 +163,17 @@ func (pm *ProviderManager) run() {
164
163
case np := <- pm .newprovs :
165
164
err := pm .addProv (np .ctx , np .key , np .val )
166
165
if err != nil {
167
- log .Error ("error adding new providers : " , err )
166
+ log .Error ("error adding new provider : " , err )
168
167
continue
169
168
}
170
169
if gcSkip != nil {
171
- // we have an gc , tell it to skip this provider
170
+ // gc in progress , tell it to skip this provider
172
171
// as we've updated it since the GC started.
173
172
gcSkip [mkProvKeyFor (np .key , np .val )] = struct {}{}
174
173
}
175
174
case gp := <- pm .getprovs :
176
175
provs , err := pm .getProvidersForKey (gp .ctx , gp .key )
177
- if err != nil && err != ds .ErrNotFound {
176
+ if err != nil && ! errors . Is ( err , ds .ErrNotFound ) {
178
177
log .Error ("error reading providers: " , err )
179
178
}
180
179
@@ -210,7 +209,7 @@ func (pm *ProviderManager) run() {
210
209
fallthrough
211
210
case gcTime .Sub (t ) > ProvideValidity :
212
211
// or expired
213
- err = pm .dstore .Delete (pm . ctx , ds .RawKey (res .Key ))
212
+ err = pm .dstore .Delete (ctx , ds .RawKey (res .Key ))
214
213
if err != nil && err != ds .ErrNotFound {
215
214
log .Error ("failed to remove provider record from disk: " , err )
216
215
}
@@ -224,7 +223,7 @@ func (pm *ProviderManager) run() {
224
223
pm .cache .Purge ()
225
224
226
225
// Now, kick off a GC of the datastore.
227
- q , err := pm .dstore .Query (pm . ctx , dsq.Query {
226
+ q , err := pm .dstore .Query (ctx , dsq.Query {
228
227
Prefix : ProvidersKeyPrefix ,
229
228
})
230
229
if err != nil {
@@ -234,7 +233,7 @@ func (pm *ProviderManager) run() {
234
233
gcQuery = q
235
234
gcQueryRes = q .Next ()
236
235
gcSkip = make (map [string ]struct {})
237
- case <- pm . ctx .Done ():
236
+ case <- ctx .Done ():
238
237
return
239
238
}
240
239
}
@@ -243,7 +242,7 @@ func (pm *ProviderManager) run() {
243
242
244
243
func (pm * ProviderManager ) Close () error {
245
244
pm .cancel ()
246
- pm .wg . Wait ()
245
+ <- pm .closed
247
246
return nil
248
247
}
249
248
@@ -377,7 +376,7 @@ func loadProviderSet(ctx context.Context, dstore ds.Datastore, k []byte) (*provi
377
376
case now .Sub (t ) > ProvideValidity :
378
377
// or just expired
379
378
err = dstore .Delete (ctx , ds .RawKey (e .Key ))
380
- if err != nil && err != ds .ErrNotFound {
379
+ if err != nil && errors . Is ( err , ds .ErrNotFound ) {
381
380
log .Error ("failed to remove provider record from disk: " , err )
382
381
}
383
382
continue
@@ -389,7 +388,7 @@ func loadProviderSet(ctx context.Context, dstore ds.Datastore, k []byte) (*provi
389
388
if err != nil {
390
389
log .Error ("base32 decoding error: " , err )
391
390
err = dstore .Delete (ctx , ds .RawKey (e .Key ))
392
- if err != nil && err != ds .ErrNotFound {
391
+ if err != nil && errors . Is ( err , ds .ErrNotFound ) {
393
392
log .Error ("failed to remove provider record from disk: " , err )
394
393
}
395
394
continue
0 commit comments