Skip to content

Commit 2dd9046

Browse files
committed
routing: introduce missionControlDB abstraction
So that `missionControlStore` can be unaware of the backing DB structure it is writing to. In an upcoming commit when we change mission control to write to namespaced buckets instead, we then only need to update the `namespacedDB` implementation.
1 parent 75eaaf7 commit 2dd9046

File tree

3 files changed

+105
-28
lines changed

3 files changed

+105
-28
lines changed

routing/missioncontrol.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/btcsuite/btcd/btcutil"
10+
"github.com/btcsuite/btcwallet/walletdb"
1011
"github.com/lightningnetwork/lnd/channeldb"
1112
"github.com/lightningnetwork/lnd/fn"
1213
"github.com/lightningnetwork/lnd/kvdb"
@@ -226,7 +227,8 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex,
226227
}
227228

228229
store, err := newMissionControlStore(
229-
db, cfg.MaxMcHistory, cfg.McFlushInterval,
230+
newNamespacedDB(db), cfg.MaxMcHistory,
231+
cfg.McFlushInterval,
230232
)
231233
if err != nil {
232234
return nil, err
@@ -545,3 +547,74 @@ func (m *MissionControl) applyPaymentResult(
545547

546548
return i.finalFailureReason
547549
}
550+
551+
// namespacedDB is an implementation of the missionControlDB that gives a user
552+
// of the interface access to the top level mission control bucket. In a
553+
// follow-up commit (accompanied by a migration), this will change to giving
554+
// the user of the interface access to a namespaced sub-bucket instead.
555+
type namespacedDB struct {
556+
topLevelBucketKey []byte
557+
db kvdb.Backend
558+
}
559+
560+
// A compile-time check to ensure that namespacedDB implements missionControlDB.
561+
var _ missionControlDB = (*namespacedDB)(nil)
562+
563+
// newNamespacedDB creates a new instance of missionControlDB where the DB will
564+
// have access to the top level bucket.
565+
func newNamespacedDB(db kvdb.Backend) missionControlDB {
566+
return &namespacedDB{
567+
db: db,
568+
topLevelBucketKey: resultsKey,
569+
}
570+
}
571+
572+
// update can be used to perform reads and writes on the given bucket.
573+
//
574+
// NOTE: this is part of the missionControlDB interface.
575+
func (n *namespacedDB) update(f func(bkt walletdb.ReadWriteBucket) error,
576+
reset func()) error {
577+
578+
return n.db.Update(func(tx kvdb.RwTx) error {
579+
mcStoreBkt, err := tx.CreateTopLevelBucket(n.topLevelBucketKey)
580+
if err != nil {
581+
return fmt.Errorf("cannot create top level mission "+
582+
"control bucket: %w", err)
583+
}
584+
585+
return f(mcStoreBkt)
586+
}, reset)
587+
}
588+
589+
// view can be used to perform reads on the given bucket.
590+
//
591+
// NOTE: this is part of the missionControlDB interface.
592+
func (n *namespacedDB) view(f func(bkt walletdb.ReadBucket) error,
593+
reset func()) error {
594+
595+
return n.db.View(func(tx kvdb.RTx) error {
596+
mcStoreBkt := tx.ReadBucket(n.topLevelBucketKey)
597+
if mcStoreBkt == nil {
598+
return fmt.Errorf("top level mission control bucket " +
599+
"not found")
600+
}
601+
602+
return f(mcStoreBkt)
603+
}, reset)
604+
}
605+
606+
// purge will delete all the contents in the namespace.
607+
//
608+
// NOTE: this is part of the missionControlDB interface.
609+
func (n *namespacedDB) purge() error {
610+
return n.db.Update(func(tx kvdb.RwTx) error {
611+
err := tx.DeleteTopLevelBucket(n.topLevelBucketKey)
612+
if err != nil {
613+
return err
614+
}
615+
616+
_, err = tx.CreateTopLevelBucket(n.topLevelBucketKey)
617+
618+
return err
619+
}, func() {})
620+
}

routing/missioncontrol_store.go

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@ const (
3232
unknownFailureSourceIdx = -1
3333
)
3434

35+
// missionControlDB is an interface that defines the database methods that a
36+
// single missionControlStore has access to. It allows the missionControlStore
37+
// to be unaware of the overall DB structure and restricts its access to the DB
38+
// by only providing it the bucket that it needs to care about.
39+
type missionControlDB interface {
40+
// update can be used to perform reads and writes on the given bucket.
41+
update(f func(bkt kvdb.RwBucket) error, reset func()) error
42+
43+
// view can be used to perform reads on the given bucket.
44+
view(f func(bkt kvdb.RBucket) error, reset func()) error
45+
46+
// purge will delete all the contents in this store.
47+
purge() error
48+
}
49+
3550
// missionControlStore is a bolt db based implementation of a mission control
3651
// store. It stores the raw payment attempt data from which the internal mission
3752
// controls state can be rederived on startup. This allows the mission control
@@ -41,7 +56,7 @@ const (
4156
type missionControlStore struct {
4257
done chan struct{}
4358
wg sync.WaitGroup
44-
db kvdb.Backend
59+
db missionControlDB
4560

4661
// queueCond is signalled when items are put into the queue.
4762
queueCond *sync.Cond
@@ -67,7 +82,7 @@ type missionControlStore struct {
6782
flushInterval time.Duration
6883
}
6984

70-
func newMissionControlStore(db kvdb.Backend, maxRecords int,
85+
func newMissionControlStore(db missionControlDB, maxRecords int,
7186
flushInterval time.Duration) (*missionControlStore, error) {
7287

7388
var (
@@ -76,13 +91,7 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int,
7691
)
7792

7893
// Create buckets if not yet existing.
79-
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
80-
resultsBucket, err := tx.CreateTopLevelBucket(resultsKey)
81-
if err != nil {
82-
return fmt.Errorf("cannot create results bucket: %w",
83-
err)
84-
}
85-
94+
err := db.update(func(resultsBucket kvdb.RwBucket) error {
8695
// Collect all keys to be able to quickly calculate the
8796
// difference when updating the DB state.
8897
c := resultsBucket.ReadCursor()
@@ -119,29 +128,20 @@ func (b *missionControlStore) clear() error {
119128
b.queueCond.L.Lock()
120129
defer b.queueCond.L.Unlock()
121130

122-
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
123-
if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
124-
return err
125-
}
126-
127-
_, err := tx.CreateTopLevelBucket(resultsKey)
128-
return err
129-
}, func() {})
130-
131-
if err != nil {
131+
if err := b.db.purge(); err != nil {
132132
return err
133133
}
134134

135135
b.queue = list.New()
136+
136137
return nil
137138
}
138139

139140
// fetchAll returns all results currently stored in the database.
140141
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
141142
var results []*paymentResult
142143

143-
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
144-
resultBucket := tx.ReadBucket(resultsKey)
144+
err := b.db.view(func(resultBucket kvdb.RBucket) error {
145145
results = make([]*paymentResult, 0)
146146

147147
return resultBucket.ForEach(func(k, v []byte) error {
@@ -511,9 +511,7 @@ func (b *missionControlStore) storeResults() error {
511511
}
512512
}
513513

514-
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
515-
bucket := tx.ReadWriteBucket(resultsKey)
516-
514+
err := b.db.update(func(bucket kvdb.RwBucket) error {
517515
for e := l.Front(); e != nil; e = e.Next() {
518516
pr, ok := e.Value.(*paymentResult)
519517
if !ok {

routing/missioncontrol_store_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ func newMCStoreTestHarness(t testing.TB, maxRecords int,
6262
require.NoError(t, db.Close())
6363
})
6464

65-
store, err := newMissionControlStore(db, maxRecords, flushInterval)
65+
store, err := newMissionControlStore(
66+
newNamespacedDB(db), maxRecords, flushInterval,
67+
)
6668
require.NoError(t, err)
6769

6870
return mcStoreTestHarness{db: db, store: store}
@@ -115,7 +117,9 @@ func TestMissionControlStore(t *testing.T) {
115117
require.Equal(t, &result2, results[1])
116118

117119
// Recreate store to test pruning.
118-
store, err = newMissionControlStore(db, testMaxRecords, time.Second)
120+
store, err = newMissionControlStore(
121+
newNamespacedDB(db), testMaxRecords, time.Second,
122+
)
119123
require.NoError(t, err)
120124

121125
// Add a newer result which failed due to mpp timeout.
@@ -213,7 +217,9 @@ func TestMissionControlStoreFlushing(t *testing.T) {
213217
store.stop()
214218

215219
// Recreate store.
216-
store, err := newMissionControlStore(db, testMaxRecords, flushInterval)
220+
store, err := newMissionControlStore(
221+
newNamespacedDB(db), testMaxRecords, flushInterval,
222+
)
217223
require.NoError(t, err)
218224
store.run()
219225
defer store.stop()

0 commit comments

Comments
 (0)