Skip to content

Commit 0c7a173

Browse files
committed
missioncontrolstore: remove duplication of in-memory data
This removes duplication of in-memory data during the periodic flushing stage of the mission control store. The existing code entirely duplicates the in-memory cache of the store, which is very wasteful when only a few additional results are being rotated into the store. This has a significant performance penalty specially for wallets that remain online for a long time with a low volume of payments. The worst case scenario are wallets that see at most 1 new payment a second, where the entire in-memory cache is recreated every second. This commit improves the situation by determining what will be the actual changes that need to be committed before initiating the db transaction and only keeping track of these to update the in-memory cache if the db tx is successful.
1 parent 9059a65 commit 0c7a173

File tree

1 file changed

+75
-33
lines changed

1 file changed

+75
-33
lines changed

routing/missioncontrol_store.go

Lines changed: 75 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -359,68 +359,98 @@ func (b *missionControlStore) storeResults() error {
359359
b.queueCond.L.Unlock()
360360

361361
var (
362-
keys *list.List
363-
keysMap map[string]struct{}
362+
newKeys map[string]struct{}
363+
delKeys []string
364364
storeCount int
365365
pruneCount int
366366
)
367367

368+
// Create a deduped list of new entries.
369+
newKeys = make(map[string]struct{}, l.Len())
370+
for e := l.Front(); e != nil; e = e.Next() {
371+
pr, ok := e.Value.(*paymentResult)
372+
if !ok {
373+
return fmt.Errorf("wrong type %T (not *paymentResult)",
374+
e.Value)
375+
}
376+
key := string(getResultKey(pr))
377+
if _, ok := b.keysMap[key]; ok {
378+
l.Remove(e)
379+
continue
380+
}
381+
if _, ok := newKeys[key]; ok {
382+
l.Remove(e)
383+
continue
384+
}
385+
newKeys[key] = struct{}{}
386+
}
387+
388+
// Create a list of entries to delete.
389+
toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
390+
if b.maxRecords > 0 && toDelete > 0 {
391+
delKeys = make([]string, 0, toDelete)
392+
393+
// Delete as many as needed from old keys.
394+
for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
395+
key, ok := e.Value.(string)
396+
if !ok {
397+
return fmt.Errorf("wrong type %T (not string)",
398+
e.Value)
399+
}
400+
delKeys = append(delKeys, key)
401+
e = e.Next()
402+
}
403+
404+
// If more deletions are needed, simply do not add from the
405+
// list of new keys.
406+
for e := l.Front(); len(delKeys) < toDelete && e != nil; {
407+
toDelete--
408+
pr, ok := e.Value.(*paymentResult)
409+
if !ok {
410+
return fmt.Errorf("wrong type %T (not "+
411+
"*paymentResult )", e.Value)
412+
}
413+
key := string(getResultKey(pr))
414+
delete(newKeys, key)
415+
l.Remove(e)
416+
e = l.Front()
417+
}
418+
}
419+
368420
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
369421
bucket := tx.ReadWriteBucket(resultsKey)
370422

371423
for e := l.Front(); e != nil; e = e.Next() {
372-
pr := e.Value.(*paymentResult)
424+
pr, ok := e.Value.(*paymentResult)
425+
if !ok {
426+
return fmt.Errorf("wrong type %T (not "+
427+
"*paymentResult)", e.Value)
428+
}
429+
373430
// Serialize result into key and value byte slices.
374431
k, v, err := serializeResult(pr)
375432
if err != nil {
376433
return err
377434
}
378435

379-
// The store is assumed to be idempotent. It could be
380-
// that the same result is added twice and in that case
381-
// we don't need to put the value again.
382-
if _, ok := keysMap[string(k)]; ok {
383-
continue
384-
}
385-
386436
// Put into results bucket.
387437
if err := bucket.Put(k, v); err != nil {
388438
return err
389439
}
390440

391-
keys.PushBack(string(k))
392-
keysMap[string(k)] = struct{}{}
393441
storeCount++
394442
}
395443

396444
// Prune oldest entries.
397-
for {
398-
if b.maxRecords == 0 || keys.Len() <= b.maxRecords {
399-
break
400-
}
401-
402-
front := keys.Front()
403-
key := front.Value.(string)
404-
445+
for _, key := range delKeys {
405446
if err := bucket.Delete([]byte(key)); err != nil {
406447
return err
407448
}
408-
409-
keys.Remove(front)
410-
delete(keysMap, key)
411449
pruneCount++
412450
}
413451

414452
return nil
415453
}, func() {
416-
keys = list.New()
417-
keys.PushBackList(b.keys)
418-
419-
keysMap = make(map[string]struct{})
420-
for k := range b.keysMap {
421-
keysMap[k] = struct{}{}
422-
}
423-
424454
storeCount, pruneCount = 0, 0
425455
})
426456

@@ -431,8 +461,20 @@ func (b *missionControlStore) storeResults() error {
431461
log.Debugf("Stored mission control results: %d added, %d deleted",
432462
storeCount, pruneCount)
433463

434-
b.keys = keys
435-
b.keysMap = keysMap
464+
// DB Update was successful, update the in-memory cache.
465+
for _, key := range delKeys {
466+
delete(b.keysMap, key)
467+
b.keys.Remove(b.keys.Front())
468+
}
469+
for e := l.Front(); e != nil; e = e.Next() {
470+
pr, ok := e.Value.(*paymentResult)
471+
if !ok {
472+
return fmt.Errorf("wrong type %T (not *paymentResult)",
473+
e.Value)
474+
}
475+
key := string(getResultKey(pr))
476+
b.keys.PushBack(key)
477+
}
436478

437479
return nil
438480
}

0 commit comments

Comments
 (0)