Skip to content

Commit 5c88cf8

Browse files
authored
Merge pull request #760 from starius/sweepbatcher-rm-defaultBatchConfTarget
sweepbatcher: load from DB preserves confTarget
2 parents 22dd2e8 + 87fb185 commit 5c88cf8

File tree

6 files changed

+176
-29
lines changed

6 files changed

+176
-29
lines changed

loopout_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func testCustomSweepConfTarget(t *testing.T) {
296296

297297
errChan := make(chan error, 2)
298298

299-
batcherStore := sweepbatcher.NewStoreMock()
299+
batcherStore := sweepbatcher.NewStoreMock(cfg.store)
300300

301301
batcher := sweepbatcher.NewBatcher(
302302
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
@@ -529,7 +529,7 @@ func testPreimagePush(t *testing.T) {
529529

530530
errChan := make(chan error, 2)
531531

532-
batcherStore := sweepbatcher.NewStoreMock()
532+
batcherStore := sweepbatcher.NewStoreMock(cfg.store)
533533

534534
batcher := sweepbatcher.NewBatcher(
535535
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
@@ -950,7 +950,7 @@ func TestLoopOutMuSig2Sweep(t *testing.T) {
950950

951951
errChan := make(chan error, 2)
952952

953-
batcherStore := sweepbatcher.NewStoreMock()
953+
batcherStore := sweepbatcher.NewStoreMock(cfg.store)
954954

955955
batcher := sweepbatcher.NewBatcher(
956956
lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,

sweepbatcher/store_mock.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sweepbatcher
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"sort"
78

89
"github.com/btcsuite/btcd/btcutil"
@@ -11,15 +12,17 @@ import (
1112

1213
// StoreMock implements a mock client swap store.
1314
type StoreMock struct {
14-
batches map[int32]dbBatch
15-
sweeps map[lntypes.Hash]dbSweep
15+
batches map[int32]dbBatch
16+
sweeps map[lntypes.Hash]dbSweep
17+
swapStore LoopOutFetcher
1618
}
1719

1820
// NewStoreMock instantiates a new mock store.
19-
func NewStoreMock() *StoreMock {
21+
func NewStoreMock(swapStore LoopOutFetcher) *StoreMock {
2022
return &StoreMock{
21-
batches: make(map[int32]dbBatch),
22-
sweeps: make(map[lntypes.Hash]dbSweep),
23+
batches: make(map[int32]dbBatch),
24+
sweeps: make(map[lntypes.Hash]dbSweep),
25+
swapStore: swapStore,
2326
}
2427
}
2528

@@ -90,9 +93,21 @@ func (s *StoreMock) FetchBatchSweeps(ctx context.Context,
9093
result := []*dbSweep{}
9194
for _, sweep := range s.sweeps {
9295
sweep := sweep
93-
if sweep.BatchID == id {
94-
result = append(result, &sweep)
96+
if sweep.BatchID != id {
97+
continue
9598
}
99+
100+
// Load swap from loopdb.
101+
swap, err := s.swapStore.FetchLoopOutSwap(
102+
ctx, sweep.SwapHash,
103+
)
104+
if err != nil {
105+
return nil, fmt.Errorf("failed to fetch swap "+
106+
"for SwapHash=%v", sweep.SwapHash)
107+
}
108+
sweep.LoopOut = swap
109+
110+
result = append(result, &sweep)
96111
}
97112

98113
sort.Slice(result, func(i, j int) bool {

sweepbatcher/sweep_batch.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ const (
3535
// fee rate is increased when an rbf is attempted.
3636
defaultFeeRateStep = chainfee.SatPerKWeight(100)
3737

38-
// defaultBatchConfTarget is the default confirmation target of the
39-
// batch transaction.
40-
defaultBatchConfTarget = 12
41-
4238
// batchConfHeight is the default confirmation height of the batch
4339
// transaction.
4440
batchConfHeight = 3
@@ -316,7 +312,22 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
316312
}
317313

318314
// NewBatchFromDB creates a new batch that already existed in storage.
319-
func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
315+
func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
316+
// Make sure the batch is not empty.
317+
if len(bk.sweeps) == 0 {
318+
// This should never happen, as this precondition is already
319+
// ensured in spinUpBatchFromDB.
320+
return nil, fmt.Errorf("empty batch is not allowed")
321+
}
322+
323+
// Assign batchConfTarget to primary sweep's confTarget.
324+
for _, sweep := range bk.sweeps {
325+
if sweep.swapHash == bk.primaryID {
326+
cfg.batchConfTarget = sweep.confTarget
327+
break
328+
}
329+
}
330+
320331
return &batch{
321332
id: bk.id,
322333
state: bk.state,
@@ -343,7 +354,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
343354
store: bk.store,
344355
log: bk.log,
345356
cfg: &cfg,
346-
}
357+
}, nil
347358
}
348359

349360
// addSweep tries to add a sweep to the batch. If this is the first sweep being
@@ -1044,6 +1055,10 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
10441055
// If the feeRate is unset then we never published before, so we
10451056
// retrieve the fee estimate from our wallet.
10461057
if b.rbfCache.FeeRate == 0 {
1058+
if b.cfg.batchConfTarget == 0 {
1059+
b.log.Warnf("updateRbfRate called with zero " +
1060+
"batchConfTarget")
1061+
}
10471062
b.log.Infof("initializing rbf fee rate for conf target=%v",
10481063
b.cfg.batchConfTarget)
10491064
rate, err := b.wallet.EstimateFeeRate(

sweepbatcher/sweep_batcher.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,6 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
378378
func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
379379
cfg := batchConfig{
380380
maxTimeoutDistance: defaultMaxTimeoutDistance,
381-
batchConfTarget: defaultBatchConfTarget,
382381
}
383382

384383
switch b.chainParams {
@@ -488,10 +487,12 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
488487

489488
cfg := batchConfig{
490489
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
491-
batchConfTarget: defaultBatchConfTarget,
492490
}
493491

494-
newBatch := NewBatchFromDB(cfg, batchKit)
492+
newBatch, err := NewBatchFromDB(cfg, batchKit)
493+
if err != nil {
494+
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
495+
}
495496

496497
// We add the batch to our map of batches and start it.
497498
b.batches[batch.id] = newBatch

sweepbatcher/sweep_batcher_test.go

Lines changed: 125 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestSweepBatcherBatchCreation(t *testing.T) {
6464

6565
store := loopdb.NewStoreMock(t)
6666

67-
batcherStore := NewStoreMock()
67+
batcherStore := NewStoreMock(store)
6868

6969
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
7070
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
@@ -218,7 +218,7 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) {
218218

219219
store := loopdb.NewStoreMock(t)
220220

221-
batcherStore := NewStoreMock()
221+
batcherStore := NewStoreMock(store)
222222

223223
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
224224
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
@@ -355,7 +355,7 @@ func TestSweepBatcherSweepReentry(t *testing.T) {
355355

356356
store := loopdb.NewStoreMock(t)
357357

358-
batcherStore := NewStoreMock()
358+
batcherStore := NewStoreMock(store)
359359

360360
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
361361
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
@@ -562,7 +562,7 @@ func TestSweepBatcherNonWalletAddr(t *testing.T) {
562562

563563
store := loopdb.NewStoreMock(t)
564564

565-
batcherStore := NewStoreMock()
565+
batcherStore := NewStoreMock(store)
566566

567567
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
568568
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
@@ -727,7 +727,7 @@ func TestSweepBatcherComposite(t *testing.T) {
727727

728728
store := loopdb.NewStoreMock(t)
729729

730-
batcherStore := NewStoreMock()
730+
batcherStore := NewStoreMock(store)
731731

732732
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
733733
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
@@ -1044,7 +1044,7 @@ func TestRestoringEmptyBatch(t *testing.T) {
10441044

10451045
store := loopdb.NewStoreMock(t)
10461046

1047-
batcherStore := NewStoreMock()
1047+
batcherStore := NewStoreMock(store)
10481048
_, err := batcherStore.InsertSweepBatch(ctx, &dbBatch{})
10491049
require.NoError(t, err)
10501050

@@ -1109,7 +1109,7 @@ func TestRestoringEmptyBatch(t *testing.T) {
11091109
require.NoError(t, err)
11101110
require.Len(t, batches, 1)
11111111

1112-
// Now make it quit by canceling the context.
1112+
// Now make the batcher quit by canceling the context.
11131113
cancel()
11141114
wg.Wait()
11151115

@@ -1158,7 +1158,7 @@ func TestHandleSweepTwice(t *testing.T) {
11581158

11591159
store := newLoopStoreMock()
11601160

1161-
batcherStore := NewStoreMock()
1161+
batcherStore := NewStoreMock(store)
11621162

11631163
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
11641164
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
@@ -1297,9 +1297,125 @@ func TestHandleSweepTwice(t *testing.T) {
12971297
require.Equal(t, 1, len(batcher.batches[0].sweeps))
12981298
require.Equal(t, 1, len(batcher.batches[1].sweeps))
12991299

1300-
// Now make it quit by canceling the context.
1300+
// Now make the batcher quit by canceling the context.
13011301
cancel()
13021302
wg.Wait()
13031303

13041304
checkBatcherError(t, runErr)
13051305
}
1306+
1307+
// TestRestoringPreservesConfTarget tests that after the batch is written to DB
1308+
// and loaded back, its batchConfTarget value is preserved.
1309+
func TestRestoringPreservesConfTarget(t *testing.T) {
1310+
defer test.Guard(t)()
1311+
1312+
lnd := test.NewMockLnd()
1313+
ctx, cancel := context.WithCancel(context.Background())
1314+
1315+
store := loopdb.NewStoreMock(t)
1316+
1317+
batcherStore := NewStoreMock(store)
1318+
1319+
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1320+
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
1321+
1322+
var wg sync.WaitGroup
1323+
wg.Add(1)
1324+
1325+
var runErr error
1326+
go func() {
1327+
defer wg.Done()
1328+
runErr = batcher.Run(ctx)
1329+
}()
1330+
1331+
// Wait for the batcher to be initialized.
1332+
<-batcher.initDone
1333+
1334+
// Create a sweep request.
1335+
sweepReq := SweepRequest{
1336+
SwapHash: lntypes.Hash{1, 1, 1},
1337+
Value: 111,
1338+
Outpoint: wire.OutPoint{
1339+
Hash: chainhash.Hash{1, 1},
1340+
Index: 1,
1341+
},
1342+
Notifier: &dummyNotifier,
1343+
}
1344+
1345+
swap := &loopdb.LoopOutContract{
1346+
SwapContract: loopdb.SwapContract{
1347+
CltvExpiry: 111,
1348+
AmountRequested: 111,
1349+
},
1350+
1351+
SwapInvoice: swapInvoice,
1352+
SweepConfTarget: 123,
1353+
}
1354+
1355+
err := store.CreateLoopOut(ctx, sweepReq.SwapHash, swap)
1356+
require.NoError(t, err)
1357+
store.AssertLoopOutStored()
1358+
1359+
// Deliver sweep request to batcher.
1360+
require.NoError(t, batcher.AddSweep(&sweepReq))
1361+
1362+
// Since a batch was created we check that it registered for its primary
1363+
// sweep's spend.
1364+
<-lnd.RegisterSpendChannel
1365+
1366+
// Once batcher receives sweep request it will eventually spin up a
1367+
// batch.
1368+
require.Eventually(t, func() bool {
1369+
// Make sure that the sweep was stored and we have exactly one
1370+
// active batch, with one sweep and proper batchConfTarget.
1371+
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
1372+
len(batcher.batches) == 1 &&
1373+
len(batcher.batches[0].sweeps) == 1 &&
1374+
batcher.batches[0].cfg.batchConfTarget == 123
1375+
}, test.Timeout, eventuallyCheckFrequency)
1376+
1377+
// Make sure we have stored the batch.
1378+
batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx)
1379+
require.NoError(t, err)
1380+
require.Len(t, batches, 1)
1381+
1382+
// Now make the batcher quit by canceling the context.
1383+
cancel()
1384+
wg.Wait()
1385+
1386+
// Make sure the batcher exited without an error.
1387+
checkBatcherError(t, runErr)
1388+
1389+
// Now launch it again.
1390+
batcher = NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1391+
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
1392+
ctx, cancel = context.WithCancel(context.Background())
1393+
wg.Add(1)
1394+
go func() {
1395+
defer wg.Done()
1396+
runErr = batcher.Run(ctx)
1397+
}()
1398+
1399+
// Wait for the batcher to be initialized.
1400+
<-batcher.initDone
1401+
1402+
// Wait for batch to load.
1403+
require.Eventually(t, func() bool {
1404+
return batcherStore.AssertSweepStored(sweepReq.SwapHash) &&
1405+
len(batcher.batches) == 1 &&
1406+
len(batcher.batches[0].sweeps) == 1
1407+
}, test.Timeout, eventuallyCheckFrequency)
1408+
1409+
// Make sure batchConfTarget was preserved.
1410+
require.Equal(t, 123, int(batcher.batches[0].cfg.batchConfTarget))
1411+
1412+
// Expect registration for spend notification.
1413+
<-lnd.RegisterSpendChannel
1414+
1415+
// Now make the batcher quit by canceling the context.
1416+
cancel()
1417+
wg.Wait()
1418+
1419+
// Make sure the batcher exited without an error.
1420+
checkBatcherError(t, runErr)
1421+
}

testcontext_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func newSwapClient(config *clientConfig) *Client {
7777

7878
lndServices := config.LndServices
7979

80-
batcherStore := sweepbatcher.NewStoreMock()
80+
batcherStore := sweepbatcher.NewStoreMock(config.Store)
8181

8282
batcher := sweepbatcher.NewBatcher(
8383
config.LndServices.WalletKit, config.LndServices.ChainNotifier,

0 commit comments

Comments
 (0)