Skip to content

Commit c1db78b

Browse files
committed
sweepbatcher: replace batch logger atomically
This is needed to fix crashes in unit tests under -race.
1 parent 273f50d commit c1db78b

File tree

2 files changed

+60
-46
lines changed

2 files changed

+60
-46
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 58 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"math"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/btcsuite/btcd/blockchain"
@@ -284,8 +285,8 @@ type batch struct {
284285
// cfg is the configuration for this batch.
285286
cfg *batchConfig
286287

287-
// log is the logger for this batch.
288-
log btclog.Logger
288+
// log_ is the logger for this batch.
289+
log_ atomic.Pointer[btclog.Logger]
289290

290291
wg sync.WaitGroup
291292
}
@@ -387,7 +388,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
387388
}
388389
}
389390

390-
return &batch{
391+
b := &batch{
391392
id: bk.id,
392393
state: bk.state,
393394
primarySweepID: bk.primaryID,
@@ -412,9 +413,22 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
412413
publishErrorHandler: bk.publishErrorHandler,
413414
purger: bk.purger,
414415
store: bk.store,
415-
log: bk.log,
416416
cfg: &cfg,
417-
}, nil
417+
}
418+
419+
b.setLog(bk.log)
420+
421+
return b, nil
422+
}
423+
424+
// log returns current logger.
425+
func (b *batch) log() btclog.Logger {
426+
return *b.log_.Load()
427+
}
428+
429+
// setLog atomically replaces the logger.
430+
func (b *batch) setLog(logger btclog.Logger) {
431+
b.log_.Store(&logger)
418432
}
419433

420434
// addSweep tries to add a sweep to the batch. If this is the first sweep being
@@ -430,7 +444,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
430444
// If the provided sweep is nil, we can't proceed with any checks, so
431445
// we just return early.
432446
if sweep == nil {
433-
b.log.Infof("the sweep is nil")
447+
b.log().Infof("the sweep is nil")
434448

435449
return false, nil
436450
}
@@ -473,7 +487,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
473487
// the batch, do not add another sweep to prevent the tx from becoming
474488
// non-standard.
475489
if len(b.sweeps) >= MaxSweepsPerBatch {
476-
b.log.Infof("the batch has already too many sweeps (%d >= %d)",
490+
b.log().Infof("the batch has already too many sweeps %d >= %d",
477491
len(b.sweeps), MaxSweepsPerBatch)
478492

479493
return false, nil
@@ -483,7 +497,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
483497
// arrive here after the batch got closed because of a spend. In this
484498
// case we cannot add the sweep to this batch.
485499
if b.state != Open {
486-
b.log.Infof("the batch state (%v) is not open", b.state)
500+
b.log().Infof("the batch state (%v) is not open", b.state)
487501

488502
return false, nil
489503
}
@@ -493,14 +507,14 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
493507
// we cannot add this sweep to the batch.
494508
for _, s := range b.sweeps {
495509
if s.isExternalAddr {
496-
b.log.Infof("the batch already has a sweep (%x) with "+
510+
b.log().Infof("the batch already has a sweep %x with "+
497511
"an external address", s.swapHash[:6])
498512

499513
return false, nil
500514
}
501515

502516
if sweep.isExternalAddr {
503-
b.log.Infof("the batch is not empty and new sweep (%x)"+
517+
b.log().Infof("the batch is not empty and new sweep %x"+
504518
" has an external address", sweep.swapHash[:6])
505519

506520
return false, nil
@@ -515,7 +529,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
515529
int32(math.Abs(float64(sweep.timeout - s.timeout)))
516530

517531
if timeoutDistance > b.cfg.maxTimeoutDistance {
518-
b.log.Infof("too long timeout distance between the "+
532+
b.log().Infof("too long timeout distance between the "+
519533
"batch and sweep %x: %d > %d",
520534
sweep.swapHash[:6], timeoutDistance,
521535
b.cfg.maxTimeoutDistance)
@@ -544,7 +558,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
544558
}
545559

546560
// Add the sweep to the batch's sweeps.
547-
b.log.Infof("adding sweep %x", sweep.swapHash[:6])
561+
b.log().Infof("adding sweep %x", sweep.swapHash[:6])
548562
b.sweeps[sweep.swapHash] = *sweep
549563

550564
// Update FeeRate. Max(sweep.minFeeRate) for all the sweeps of
@@ -572,7 +586,7 @@ func (b *batch) sweepExists(hash lntypes.Hash) bool {
572586

573587
// Wait waits for the batch to gracefully stop.
574588
func (b *batch) Wait() {
575-
b.log.Infof("Stopping")
589+
b.log().Infof("Stopping")
576590
<-b.finished
577591
}
578592

@@ -613,7 +627,7 @@ func (b *batch) Run(ctx context.Context) error {
613627
// Set currentHeight here, because it may be needed in monitorSpend.
614628
select {
615629
case b.currentHeight = <-blockChan:
616-
b.log.Debugf("initial height for the batch is %v",
630+
b.log().Debugf("initial height for the batch is %v",
617631
b.currentHeight)
618632

619633
case <-runCtx.Done():
@@ -652,7 +666,7 @@ func (b *batch) Run(ctx context.Context) error {
652666
// completes.
653667
timerChan := clock.TickAfter(b.cfg.batchPublishDelay)
654668

655-
b.log.Infof("started, primary %x, total sweeps %v",
669+
b.log().Infof("started, primary %x, total sweeps %v",
656670
b.primarySweepID[0:6], len(b.sweeps))
657671

658672
for {
@@ -662,15 +676,15 @@ func (b *batch) Run(ctx context.Context) error {
662676

663677
// blockChan provides immediately the current tip.
664678
case height := <-blockChan:
665-
b.log.Debugf("received block %v", height)
679+
b.log().Debugf("received block %v", height)
666680

667681
// Set the timer to publish the batch transaction after
668682
// the configured delay.
669683
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
670684
b.currentHeight = height
671685

672686
case <-initialDelayChan:
673-
b.log.Debugf("initial delay of duration %v has ended",
687+
b.log().Debugf("initial delay of duration %v has ended",
674688
b.cfg.initialDelay)
675689

676690
// Set the timer to publish the batch transaction after
@@ -680,8 +694,8 @@ func (b *batch) Run(ctx context.Context) error {
680694
case <-timerChan:
681695
// Check that batch is still open.
682696
if b.state != Open {
683-
b.log.Debugf("Skipping publishing, because the"+
684-
" batch is not open (%v).", b.state)
697+
b.log().Debugf("Skipping publishing, because "+
698+
"the batch is not open (%v).", b.state)
685699
continue
686700
}
687701

@@ -695,7 +709,7 @@ func (b *batch) Run(ctx context.Context) error {
695709
// initialDelayChan has just fired, this check passes.
696710
now := clock.Now()
697711
if skipBefore.After(now) {
698-
b.log.Debugf(stillWaitingMsg, skipBefore, now)
712+
b.log().Debugf(stillWaitingMsg, skipBefore, now)
699713
continue
700714
}
701715

@@ -715,8 +729,8 @@ func (b *batch) Run(ctx context.Context) error {
715729

716730
case <-b.reorgChan:
717731
b.state = Open
718-
b.log.Warnf("reorg detected, batch is able to accept " +
719-
"new sweeps")
732+
b.log().Warnf("reorg detected, batch is able to " +
733+
"accept new sweeps")
720734

721735
err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
722736
if err != nil {
@@ -755,7 +769,7 @@ func (b *batch) timeout() int32 {
755769
func (b *batch) isUrgent(skipBefore time.Time) bool {
756770
timeout := b.timeout()
757771
if timeout <= 0 {
758-
b.log.Warnf("Method timeout() returned %v. Number of"+
772+
b.log().Warnf("Method timeout() returned %v. Number of"+
759773
" sweeps: %d. It may be an empty batch.",
760774
timeout, len(b.sweeps))
761775
return false
@@ -779,7 +793,7 @@ func (b *batch) isUrgent(skipBefore time.Time) bool {
779793
return false
780794
}
781795

782-
b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
796+
b.log().Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
783797
"remainingWaiting is %v)", timeBank, remainingWaiting)
784798

785799
// Signal to the caller to cancel initialDelay.
@@ -795,7 +809,7 @@ func (b *batch) publish(ctx context.Context) error {
795809
)
796810

797811
if len(b.sweeps) == 0 {
798-
b.log.Debugf("skipping publish: no sweeps in the batch")
812+
b.log().Debugf("skipping publish: no sweeps in the batch")
799813

800814
return nil
801815
}
@@ -808,7 +822,7 @@ func (b *batch) publish(ctx context.Context) error {
808822

809823
// logPublishError is a function which logs publish errors.
810824
logPublishError := func(errMsg string, err error) {
811-
b.publishErrorHandler(err, errMsg, b.log)
825+
b.publishErrorHandler(err, errMsg, b.log())
812826
}
813827

814828
fee, err, signSuccess = b.publishMixedBatch(ctx)
@@ -830,9 +844,9 @@ func (b *batch) publish(ctx context.Context) error {
830844
}
831845
}
832846

833-
b.log.Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee)
847+
b.log().Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee)
834848
for _, sweep := range b.sweeps {
835-
b.log.Infof("published sweep %x, value: %v",
849+
b.log().Infof("published sweep %x, value: %v",
836850
sweep.swapHash[:6], sweep.value)
837851
}
838852

@@ -1026,8 +1040,8 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
10261040
coopInputs int
10271041
)
10281042
for attempt := 1; ; attempt++ {
1029-
b.log.Infof("Attempt %d of collecting cooperative signatures.",
1030-
attempt)
1043+
b.log().Infof("Attempt %d of collecting cooperative "+
1044+
"signatures.", attempt)
10311045

10321046
// Construct unsigned batch transaction.
10331047
var err error
@@ -1062,7 +1076,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
10621076
ctx, i, sweep, tx, prevOutsMap, psbtBytes,
10631077
)
10641078
if err != nil {
1065-
b.log.Infof("cooperative signing failed for "+
1079+
b.log().Infof("cooperative signing failed for "+
10661080
"sweep %x: %v", sweep.swapHash[:6], err)
10671081

10681082
// Set coopFailed flag for this sweep in all the
@@ -1201,7 +1215,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
12011215
}
12021216
}
12031217
txHash := tx.TxHash()
1204-
b.log.Infof("attempting to publish batch tx=%v with feerate=%v, "+
1218+
b.log().Infof("attempting to publish batch tx=%v with feerate=%v, "+
12051219
"weight=%v, feeForWeight=%v, fee=%v, sweeps=%d, "+
12061220
"%d cooperative: (%s) and %d non-cooperative (%s), destAddr=%s",
12071221
txHash, b.rbfCache.FeeRate, weight, feeForWeight, fee,
@@ -1215,7 +1229,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
12151229
blockchain.GetTransactionWeight(btcutil.NewTx(tx)),
12161230
)
12171231
if realWeight != weight {
1218-
b.log.Warnf("actual weight of tx %v is %v, estimated as %d",
1232+
b.log().Warnf("actual weight of tx %v is %v, estimated as %d",
12191233
txHash, realWeight, weight)
12201234
}
12211235

@@ -1239,11 +1253,11 @@ func (b *batch) debugLogTx(msg string, tx *wire.MsgTx) {
12391253
// Serialize the transaction and convert to hex string.
12401254
buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
12411255
if err := tx.Serialize(buf); err != nil {
1242-
b.log.Errorf("failed to serialize tx for debug log: %v", err)
1256+
b.log().Errorf("failed to serialize tx for debug log: %v", err)
12431257
return
12441258
}
12451259

1246-
b.log.Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes()))
1260+
b.log().Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes()))
12471261
}
12481262

12491263
// musig2sign signs one sweep using musig2.
@@ -1405,14 +1419,14 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
14051419
if b.rbfCache.FeeRate == 0 {
14061420
// We set minFeeRate in each sweep, so fee rate is expected to
14071421
// be initiated here.
1408-
b.log.Warnf("rbfCache.FeeRate is 0, which must not happen.")
1422+
b.log().Warnf("rbfCache.FeeRate is 0, which must not happen.")
14091423

14101424
if b.cfg.batchConfTarget == 0 {
1411-
b.log.Warnf("updateRbfRate called with zero " +
1425+
b.log().Warnf("updateRbfRate called with zero " +
14121426
"batchConfTarget")
14131427
}
14141428

1415-
b.log.Infof("initializing rbf fee rate for conf target=%v",
1429+
b.log().Infof("initializing rbf fee rate for conf target=%v",
14161430
b.cfg.batchConfTarget)
14171431
rate, err := b.wallet.EstimateFeeRate(
14181432
ctx, b.cfg.batchConfTarget,
@@ -1461,7 +1475,7 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
14611475
defer cancel()
14621476
defer b.wg.Done()
14631477

1464-
b.log.Infof("monitoring spend for outpoint %s",
1478+
b.log().Infof("monitoring spend for outpoint %s",
14651479
primarySweep.outpoint.String())
14661480

14671481
for {
@@ -1584,7 +1598,7 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
15841598
if len(spendTx.TxOut) > 0 {
15851599
b.batchPkScript = spendTx.TxOut[0].PkScript
15861600
} else {
1587-
b.log.Warnf("transaction %v has no outputs", txHash)
1601+
b.log().Warnf("transaction %v has no outputs", txHash)
15881602
}
15891603

15901604
// As a previous version of the batch transaction may get confirmed,
@@ -1666,13 +1680,13 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
16661680

16671681
err := b.purger(&sweep)
16681682
if err != nil {
1669-
b.log.Errorf("unable to purge sweep %x: %v",
1683+
b.log().Errorf("unable to purge sweep %x: %v",
16701684
sweep.SwapHash[:6], err)
16711685
}
16721686
}
16731687
}()
16741688

1675-
b.log.Infof("spent, total sweeps: %v, purged sweeps: %v",
1689+
b.log().Infof("spent, total sweeps: %v, purged sweeps: %v",
16761690
len(notifyList), len(purgeList))
16771691

16781692
err := b.monitorConfirmations(ctx)
@@ -1690,7 +1704,7 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
16901704
// handleConf handles a confirmation notification. This is the final step of the
16911705
// batch. Here we signal to the batcher that this batch was completed.
16921706
func (b *batch) handleConf(ctx context.Context) error {
1693-
b.log.Infof("confirmed in txid %s", b.batchTxid)
1707+
b.log().Infof("confirmed in txid %s", b.batchTxid)
16941708
b.state = Confirmed
16951709

16961710
return b.store.ConfirmBatch(ctx, b.id)
@@ -1769,7 +1783,7 @@ func (b *batch) insertAndAcquireID(ctx context.Context) (int32, error) {
17691783
}
17701784

17711785
b.id = id
1772-
b.log = batchPrefixLogger(fmt.Sprintf("%d", b.id))
1786+
b.setLog(batchPrefixLogger(fmt.Sprintf("%d", b.id)))
17731787

17741788
return id, nil
17751789
}

sweepbatcher/sweep_batcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -940,7 +940,7 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) {
940940
}
941941
require.NotNil(t, batch1)
942942
testLogger := &wrappedLogger{Logger: batch1.log}
943-
batch1.log = testLogger
943+
batch1.setLog(testLogger)
944944

945945
// Advance the clock to publishDelay. It will trigger the publishDelay
946946
// timer, but won't result in publishing, because of initialDelay.
@@ -1234,7 +1234,7 @@ func testDelays(t *testing.T, store testStore, batcherStore testBatcherStore) {
12341234
}
12351235
require.NotNil(t, batch2)
12361236
testLogger2 := &wrappedLogger{Logger: batch2.log}
1237-
batch2.log = testLogger2
1237+
batch2.setLog(testLogger2)
12381238

12391239
// Add another sweep which is urgent. It will go to the same batch
12401240
// to make sure minimum timeout is calculated properly.

0 commit comments

Comments
 (0)