@@ -38,12 +38,15 @@ const (
3838// Also changes to mission control parameters can be applied to historical data.
3939// Finally, it enables importing raw data from an external source.
4040type missionControlStore struct {
41- done chan struct {}
42- wg sync.WaitGroup
43- db kvdb.Backend
44- queueMx sync.Mutex
41+ done chan struct {}
42+ wg sync.WaitGroup
43+ db kvdb.Backend
44+
45+ // queueCond is signalled when items are put into the queue.
46+ queueCond * sync.Cond
4547
4648 // queue stores all pending payment results not yet added to the store.
49+ // Access is protected by the queueCond.L mutex.
4750 queue * list.List
4851
4952 // keys holds the stored MC store item keys in the order of storage.
@@ -96,9 +99,12 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int,
9699 return nil , err
97100 }
98101
102+ log .Infof ("Loaded %d mission control entries" , len (keysMap ))
103+
99104 return & missionControlStore {
100105 done : make (chan struct {}),
101106 db : db ,
107+ queueCond : sync .NewCond (& sync.Mutex {}),
102108 queue : list .New (),
103109 keys : keys ,
104110 keysMap : keysMap ,
@@ -109,8 +115,8 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int,
109115
110116// clear removes all results from the db.
111117func (b * missionControlStore ) clear () error {
112- b .queueMx .Lock ()
113- defer b .queueMx .Unlock ()
118+ b .queueCond . L .Lock ()
119+ defer b .queueCond . L .Unlock ()
114120
115121 err := kvdb .Update (b .db , func (tx kvdb.RwTx ) error {
116122 if err := tx .DeleteTopLevelBucket (resultsKey ); err != nil {
@@ -265,14 +271,19 @@ func deserializeResult(k, v []byte) (*paymentResult, error) {
265271
266272// AddResult adds a new result to the db.
267273func (b * missionControlStore ) AddResult (rp * paymentResult ) {
268- b .queueMx .Lock ()
269- defer b .queueMx .Unlock ()
274+ b .queueCond .L .Lock ()
270275 b .queue .PushBack (rp )
276+ b .queueCond .L .Unlock ()
277+
278+ b .queueCond .Signal ()
271279}
272280
273281// stop stops the store ticker goroutine.
274282func (b * missionControlStore ) stop () {
275283 close (b .done )
284+
285+ b .queueCond .Signal ()
286+
276287 b .wg .Wait ()
277288}
278289
@@ -281,19 +292,51 @@ func (b *missionControlStore) run() {
281292 b .wg .Add (1 )
282293
283294 go func () {
284- ticker := time .NewTicker (b .flushInterval )
285- defer ticker .Stop ()
286295 defer b .wg .Done ()
287296
297+ timer := time .NewTimer (b .flushInterval )
298+
299+ // Immediately stop the timer. It will be started once new
300+ // items are added to the store. As the doc for time.Timer
301+ // states, every call to Stop() done on a timer that is not
302+ // known to have been fired needs to be checked and the timer's
303+ // channel needs to be drained appropriately. This could happen
304+ // if the flushInterval is very small (e.g. 1 nanosecond).
305+ if ! timer .Stop () {
306+ <- timer .C
307+ }
308+
288309 for {
310+ // Wait for the queue to not be empty.
311+ b .queueCond .L .Lock ()
312+ for b .queue .Front () == nil {
313+ b .queueCond .Wait ()
314+
315+ select {
316+ case <- b .done :
317+ b .queueCond .L .Unlock ()
318+
319+ return
320+ default :
321+ }
322+ }
323+ b .queueCond .L .Unlock ()
324+
325+ // Restart the timer.
326+ timer .Reset (b .flushInterval )
327+
289328 select {
290- case <- ticker .C :
329+ case <- timer .C :
291330 if err := b .storeResults (); err != nil {
292331 log .Errorf ("Failed to update mission " +
293332 "control store: %v" , err )
294333 }
295334
296335 case <- b .done :
336+ // Release the timer's resources.
337+ if ! timer .Stop () {
338+ <- timer .C
339+ }
297340 return
298341 }
299342 }
@@ -302,77 +345,136 @@ func (b *missionControlStore) run() {
302345
303346// storeResults stores all accumulated results.
304347func (b * missionControlStore ) storeResults () error {
305- b .queueMx .Lock ()
348+ // We copy a reference to the queue and clear the original queue to be
349+ // able to release the lock.
350+ b .queueCond .L .Lock ()
306351 l := b .queue
352+
353+ if l .Len () == 0 {
354+ b .queueCond .L .Unlock ()
355+
356+ return nil
357+ }
307358 b .queue = list .New ()
308- b .queueMx .Unlock ()
359+ b .queueCond . L .Unlock ()
309360
310361 var (
311- keys * list.List
312- keysMap map [string ]struct {}
362+ newKeys map [string ]struct {}
363+ delKeys []string
364+ storeCount int
365+ pruneCount int
313366 )
314367
368+ // Create a deduped list of new entries.
369+ newKeys = make (map [string ]struct {}, l .Len ())
370+ for e := l .Front (); e != nil ; e = e .Next () {
371+ pr , ok := e .Value .(* paymentResult )
372+ if ! ok {
373+ return fmt .Errorf ("wrong type %T (not *paymentResult)" ,
374+ e .Value )
375+ }
376+ key := string (getResultKey (pr ))
377+ if _ , ok := b .keysMap [key ]; ok {
378+ l .Remove (e )
379+ continue
380+ }
381+ if _ , ok := newKeys [key ]; ok {
382+ l .Remove (e )
383+ continue
384+ }
385+ newKeys [key ] = struct {}{}
386+ }
387+
388+ // Create a list of entries to delete.
389+ toDelete := b .keys .Len () + len (newKeys ) - b .maxRecords
390+ if b .maxRecords > 0 && toDelete > 0 {
391+ delKeys = make ([]string , 0 , toDelete )
392+
393+ // Delete as many as needed from old keys.
394+ for e := b .keys .Front (); len (delKeys ) < toDelete && e != nil ; {
395+ key , ok := e .Value .(string )
396+ if ! ok {
397+ return fmt .Errorf ("wrong type %T (not string)" ,
398+ e .Value )
399+ }
400+ delKeys = append (delKeys , key )
401+ e = e .Next ()
402+ }
403+
404+ // If more deletions are needed, simply do not add from the
405+ // list of new keys.
406+ for e := l .Front (); len (delKeys ) < toDelete && e != nil ; {
407+ toDelete --
408+ pr , ok := e .Value .(* paymentResult )
409+ if ! ok {
410+ return fmt .Errorf ("wrong type %T (not " +
411+ "*paymentResult )" , e .Value )
412+ }
413+ key := string (getResultKey (pr ))
414+ delete (newKeys , key )
415+ l .Remove (e )
416+ e = l .Front ()
417+ }
418+ }
419+
315420 err := kvdb .Update (b .db , func (tx kvdb.RwTx ) error {
316421 bucket := tx .ReadWriteBucket (resultsKey )
317422
318423 for e := l .Front (); e != nil ; e = e .Next () {
319- pr := e .Value .(* paymentResult )
424+ pr , ok := e .Value .(* paymentResult )
425+ if ! ok {
426+ return fmt .Errorf ("wrong type %T (not " +
427+ "*paymentResult)" , e .Value )
428+ }
429+
320430 // Serialize result into key and value byte slices.
321431 k , v , err := serializeResult (pr )
322432 if err != nil {
323433 return err
324434 }
325435
326- // The store is assumed to be idempotent. It could be
327- // that the same result is added twice and in that case
328- // we don't need to put the value again.
329- if _ , ok := keysMap [string (k )]; ok {
330- continue
331- }
332-
333436 // Put into results bucket.
334437 if err := bucket .Put (k , v ); err != nil {
335438 return err
336439 }
337440
338- keys .PushBack (string (k ))
339- keysMap [string (k )] = struct {}{}
441+ storeCount ++
340442 }
341443
342444 // Prune oldest entries.
343- for {
344- if b .maxRecords == 0 || keys .Len () <= b .maxRecords {
345- break
346- }
347-
348- front := keys .Front ()
349- key := front .Value .(string )
350-
445+ for _ , key := range delKeys {
351446 if err := bucket .Delete ([]byte (key )); err != nil {
352447 return err
353448 }
354-
355- keys .Remove (front )
356- delete (keysMap , key )
449+ pruneCount ++
357450 }
358451
359452 return nil
360453 }, func () {
361- keys = list .New ()
362- keys .PushBackList (b .keys )
363-
364- keysMap = make (map [string ]struct {})
365- for k := range b .keysMap {
366- keysMap [k ] = struct {}{}
367- }
454+ storeCount , pruneCount = 0 , 0
368455 })
369456
370457 if err != nil {
371458 return err
372459 }
373460
374- b .keys = keys
375- b .keysMap = keysMap
461+ log .Debugf ("Stored mission control results: %d added, %d deleted" ,
462+ storeCount , pruneCount )
463+
464+ // DB Update was successful, update the in-memory cache.
465+ for _ , key := range delKeys {
466+ delete (b .keysMap , key )
467+ b .keys .Remove (b .keys .Front ())
468+ }
469+ for e := l .Front (); e != nil ; e = e .Next () {
470+ pr , ok := e .Value .(* paymentResult )
471+ if ! ok {
472+ return fmt .Errorf ("wrong type %T (not *paymentResult)" ,
473+ e .Value )
474+ }
475+ key := string (getResultKey (pr ))
476+ b .keys .PushBack (key )
477+ }
376478
377479 return nil
378480}
0 commit comments