Skip to content

Commit 8b4e2fb

Browse files
committed
sweepbatcher: replace batch logger atomically
This is needed to fix crashes in unit tests under -race.
1 parent 8166a3e commit 8b4e2fb

File tree

2 files changed

+83
-50
lines changed

2 files changed

+83
-50
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 79 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"math"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/btcsuite/btcd/blockchain"
@@ -284,8 +285,8 @@ type batch struct {
284285
// cfg is the configuration for this batch.
285286
cfg *batchConfig
286287

287-
// log is the logger for this batch.
288-
log btclog.Logger
288+
// log_ is the logger for this batch.
289+
log_ atomic.Pointer[btclog.Logger]
289290

290291
wg sync.WaitGroup
291292
}
@@ -387,7 +388,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
387388
}
388389
}
389390

390-
return &batch{
391+
b := &batch{
391392
id: bk.id,
392393
state: bk.state,
393394
primarySweepID: bk.primaryID,
@@ -412,9 +413,42 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
412413
publishErrorHandler: bk.publishErrorHandler,
413414
purger: bk.purger,
414415
store: bk.store,
415-
log: bk.log,
416416
cfg: &cfg,
417-
}, nil
417+
}
418+
419+
b.setLog(bk.log)
420+
421+
return b, nil
422+
}
423+
424+
// log returns current logger.
425+
func (b *batch) log() btclog.Logger {
426+
return *b.log_.Load()
427+
}
428+
429+
// setLog atomically replaces the logger.
430+
func (b *batch) setLog(logger btclog.Logger) {
431+
b.log_.Store(&logger)
432+
}
433+
434+
// Debugf logs a message with level DEBUG.
435+
func (b *batch) Debugf(format string, params ...interface{}) {
436+
b.log().Debugf(format, params...)
437+
}
438+
439+
// Infof logs a message with level INFO.
440+
func (b *batch) Infof(format string, params ...interface{}) {
441+
b.log().Infof(format, params...)
442+
}
443+
444+
// Warnf logs a message with level WARN.
445+
func (b *batch) Warnf(format string, params ...interface{}) {
446+
b.log().Warnf(format, params...)
447+
}
448+
449+
// Errorf logs a message with level ERROR.
450+
func (b *batch) Errorf(format string, params ...interface{}) {
451+
b.log().Errorf(format, params...)
418452
}
419453

420454
// addSweep tries to add a sweep to the batch. If this is the first sweep being
@@ -430,7 +464,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
430464
// If the provided sweep is nil, we can't proceed with any checks, so
431465
// we just return early.
432466
if sweep == nil {
433-
b.log.Infof("the sweep is nil")
467+
b.Infof("the sweep is nil")
434468

435469
return false, nil
436470
}
@@ -473,7 +507,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
473507
// the batch, do not add another sweep to prevent the tx from becoming
474508
// non-standard.
475509
if len(b.sweeps) >= MaxSweepsPerBatch {
476-
b.log.Infof("the batch has already too many sweeps (%d >= %d)",
510+
b.Infof("the batch has already too many sweeps %d >= %d",
477511
len(b.sweeps), MaxSweepsPerBatch)
478512

479513
return false, nil
@@ -483,7 +517,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
483517
// arrive here after the batch got closed because of a spend. In this
484518
// case we cannot add the sweep to this batch.
485519
if b.state != Open {
486-
b.log.Infof("the batch state (%v) is not open", b.state)
520+
b.Infof("the batch state (%v) is not open", b.state)
487521

488522
return false, nil
489523
}
@@ -493,15 +527,15 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
493527
// we cannot add this sweep to the batch.
494528
for _, s := range b.sweeps {
495529
if s.isExternalAddr {
496-
b.log.Infof("the batch already has a sweep (%x) with "+
530+
b.Infof("the batch already has a sweep %x with "+
497531
"an external address", s.swapHash[:6])
498532

499533
return false, nil
500534
}
501535

502536
if sweep.isExternalAddr {
503-
b.log.Infof("the batch is not empty and new sweep (%x)"+
504-
" has an external address", sweep.swapHash[:6])
537+
b.Infof("the batch is not empty and new sweep %x "+
538+
"has an external address", sweep.swapHash[:6])
505539

506540
return false, nil
507541
}
@@ -515,7 +549,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
515549
int32(math.Abs(float64(sweep.timeout - s.timeout)))
516550

517551
if timeoutDistance > b.cfg.maxTimeoutDistance {
518-
b.log.Infof("too long timeout distance between the "+
552+
b.Infof("too long timeout distance between the "+
519553
"batch and sweep %x: %d > %d",
520554
sweep.swapHash[:6], timeoutDistance,
521555
b.cfg.maxTimeoutDistance)
@@ -544,7 +578,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
544578
}
545579

546580
// Add the sweep to the batch's sweeps.
547-
b.log.Infof("adding sweep %x", sweep.swapHash[:6])
581+
b.Infof("adding sweep %x", sweep.swapHash[:6])
548582
b.sweeps[sweep.swapHash] = *sweep
549583

550584
// Update FeeRate. Max(sweep.minFeeRate) for all the sweeps of
@@ -572,7 +606,7 @@ func (b *batch) sweepExists(hash lntypes.Hash) bool {
572606

573607
// Wait waits for the batch to gracefully stop.
574608
func (b *batch) Wait() {
575-
b.log.Infof("Stopping")
609+
b.Infof("Stopping")
576610
<-b.finished
577611
}
578612

@@ -613,8 +647,7 @@ func (b *batch) Run(ctx context.Context) error {
613647
// Set currentHeight here, because it may be needed in monitorSpend.
614648
select {
615649
case b.currentHeight = <-blockChan:
616-
b.log.Debugf("initial height for the batch is %v",
617-
b.currentHeight)
650+
b.Debugf("initial height for the batch is %v", b.currentHeight)
618651

619652
case <-runCtx.Done():
620653
return runCtx.Err()
@@ -652,7 +685,7 @@ func (b *batch) Run(ctx context.Context) error {
652685
// completes.
653686
timerChan := clock.TickAfter(b.cfg.batchPublishDelay)
654687

655-
b.log.Infof("started, primary %x, total sweeps %v",
688+
b.Infof("started, primary %x, total sweeps %v",
656689
b.primarySweepID[0:6], len(b.sweeps))
657690

658691
for {
@@ -662,15 +695,15 @@ func (b *batch) Run(ctx context.Context) error {
662695

663696
// blockChan provides immediately the current tip.
664697
case height := <-blockChan:
665-
b.log.Debugf("received block %v", height)
698+
b.Debugf("received block %v", height)
666699

667700
// Set the timer to publish the batch transaction after
668701
// the configured delay.
669702
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
670703
b.currentHeight = height
671704

672705
case <-initialDelayChan:
673-
b.log.Debugf("initial delay of duration %v has ended",
706+
b.Debugf("initial delay of duration %v has ended",
674707
b.cfg.initialDelay)
675708

676709
// Set the timer to publish the batch transaction after
@@ -680,8 +713,8 @@ func (b *batch) Run(ctx context.Context) error {
680713
case <-timerChan:
681714
// Check that batch is still open.
682715
if b.state != Open {
683-
b.log.Debugf("Skipping publishing, because the"+
684-
" batch is not open (%v).", b.state)
716+
b.Debugf("Skipping publishing, because "+
717+
"the batch is not open (%v).", b.state)
685718
continue
686719
}
687720

@@ -695,7 +728,7 @@ func (b *batch) Run(ctx context.Context) error {
695728
// initialDelayChan has just fired, this check passes.
696729
now := clock.Now()
697730
if skipBefore.After(now) {
698-
b.log.Debugf(stillWaitingMsg, skipBefore, now)
731+
b.Debugf(stillWaitingMsg, skipBefore, now)
699732
continue
700733
}
701734

@@ -715,8 +748,8 @@ func (b *batch) Run(ctx context.Context) error {
715748

716749
case <-b.reorgChan:
717750
b.state = Open
718-
b.log.Warnf("reorg detected, batch is able to accept " +
719-
"new sweeps")
751+
b.Warnf("reorg detected, batch is able to " +
752+
"accept new sweeps")
720753

721754
err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
722755
if err != nil {
@@ -755,8 +788,8 @@ func (b *batch) timeout() int32 {
755788
func (b *batch) isUrgent(skipBefore time.Time) bool {
756789
timeout := b.timeout()
757790
if timeout <= 0 {
758-
b.log.Warnf("Method timeout() returned %v. Number of"+
759-
" sweeps: %d. It may be an empty batch.",
791+
b.Warnf("Method timeout() returned %v. Number of "+
792+
"sweeps: %d. It may be an empty batch.",
760793
timeout, len(b.sweeps))
761794
return false
762795
}
@@ -779,7 +812,7 @@ func (b *batch) isUrgent(skipBefore time.Time) bool {
779812
return false
780813
}
781814

782-
b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
815+
b.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
783816
"remainingWaiting is %v)", timeBank, remainingWaiting)
784817

785818
// Signal to the caller to cancel initialDelay.
@@ -795,7 +828,7 @@ func (b *batch) publish(ctx context.Context) error {
795828
)
796829

797830
if len(b.sweeps) == 0 {
798-
b.log.Debugf("skipping publish: no sweeps in the batch")
831+
b.Debugf("skipping publish: no sweeps in the batch")
799832

800833
return nil
801834
}
@@ -808,7 +841,7 @@ func (b *batch) publish(ctx context.Context) error {
808841

809842
// logPublishError is a function which logs publish errors.
810843
logPublishError := func(errMsg string, err error) {
811-
b.publishErrorHandler(err, errMsg, b.log)
844+
b.publishErrorHandler(err, errMsg, b.log())
812845
}
813846

814847
fee, err, signSuccess = b.publishMixedBatch(ctx)
@@ -830,9 +863,9 @@ func (b *batch) publish(ctx context.Context) error {
830863
}
831864
}
832865

833-
b.log.Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee)
866+
b.Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee)
834867
for _, sweep := range b.sweeps {
835-
b.log.Infof("published sweep %x, value: %v",
868+
b.Infof("published sweep %x, value: %v",
836869
sweep.swapHash[:6], sweep.value)
837870
}
838871

@@ -1026,7 +1059,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
10261059
coopInputs int
10271060
)
10281061
for attempt := 1; ; attempt++ {
1029-
b.log.Infof("Attempt %d of collecting cooperative signatures.",
1062+
b.Infof("Attempt %d of collecting cooperative signatures.",
10301063
attempt)
10311064

10321065
// Construct unsigned batch transaction.
@@ -1062,7 +1095,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
10621095
ctx, i, sweep, tx, prevOutsMap, psbtBytes,
10631096
)
10641097
if err != nil {
1065-
b.log.Infof("cooperative signing failed for "+
1098+
b.Infof("cooperative signing failed for "+
10661099
"sweep %x: %v", sweep.swapHash[:6], err)
10671100

10681101
// Set coopFailed flag for this sweep in all the
@@ -1201,7 +1234,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
12011234
}
12021235
}
12031236
txHash := tx.TxHash()
1204-
b.log.Infof("attempting to publish batch tx=%v with feerate=%v, "+
1237+
b.Infof("attempting to publish batch tx=%v with feerate=%v, "+
12051238
"weight=%v, feeForWeight=%v, fee=%v, sweeps=%d, "+
12061239
"%d cooperative: (%s) and %d non-cooperative (%s), destAddr=%s",
12071240
txHash, b.rbfCache.FeeRate, weight, feeForWeight, fee,
@@ -1215,7 +1248,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
12151248
blockchain.GetTransactionWeight(btcutil.NewTx(tx)),
12161249
)
12171250
if realWeight != weight {
1218-
b.log.Warnf("actual weight of tx %v is %v, estimated as %d",
1251+
b.Warnf("actual weight of tx %v is %v, estimated as %d",
12191252
txHash, realWeight, weight)
12201253
}
12211254

@@ -1239,11 +1272,11 @@ func (b *batch) debugLogTx(msg string, tx *wire.MsgTx) {
12391272
// Serialize the transaction and convert to hex string.
12401273
buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
12411274
if err := tx.Serialize(buf); err != nil {
1242-
b.log.Errorf("failed to serialize tx for debug log: %v", err)
1275+
b.Errorf("failed to serialize tx for debug log: %v", err)
12431276
return
12441277
}
12451278

1246-
b.log.Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes()))
1279+
b.Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes()))
12471280
}
12481281

12491282
// musig2sign signs one sweep using musig2.
@@ -1405,14 +1438,14 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
14051438
if b.rbfCache.FeeRate == 0 {
14061439
// We set minFeeRate in each sweep, so fee rate is expected to
14071440
// be initiated here.
1408-
b.log.Warnf("rbfCache.FeeRate is 0, which must not happen.")
1441+
b.Warnf("rbfCache.FeeRate is 0, which must not happen.")
14091442

14101443
if b.cfg.batchConfTarget == 0 {
1411-
b.log.Warnf("updateRbfRate called with zero " +
1444+
b.Warnf("updateRbfRate called with zero " +
14121445
"batchConfTarget")
14131446
}
14141447

1415-
b.log.Infof("initializing rbf fee rate for conf target=%v",
1448+
b.Infof("initializing rbf fee rate for conf target=%v",
14161449
b.cfg.batchConfTarget)
14171450
rate, err := b.wallet.EstimateFeeRate(
14181451
ctx, b.cfg.batchConfTarget,
@@ -1461,7 +1494,7 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
14611494
defer cancel()
14621495
defer b.wg.Done()
14631496

1464-
b.log.Infof("monitoring spend for outpoint %s",
1497+
b.Infof("monitoring spend for outpoint %s",
14651498
primarySweep.outpoint.String())
14661499

14671500
for {
@@ -1584,7 +1617,7 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
15841617
if len(spendTx.TxOut) > 0 {
15851618
b.batchPkScript = spendTx.TxOut[0].PkScript
15861619
} else {
1587-
b.log.Warnf("transaction %v has no outputs", txHash)
1620+
b.Warnf("transaction %v has no outputs", txHash)
15881621
}
15891622

15901623
// As a previous version of the batch transaction may get confirmed,
@@ -1666,13 +1699,13 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
16661699

16671700
err := b.purger(&sweep)
16681701
if err != nil {
1669-
b.log.Errorf("unable to purge sweep %x: %v",
1702+
b.Errorf("unable to purge sweep %x: %v",
16701703
sweep.SwapHash[:6], err)
16711704
}
16721705
}
16731706
}()
16741707

1675-
b.log.Infof("spent, total sweeps: %v, purged sweeps: %v",
1708+
b.Infof("spent, total sweeps: %v, purged sweeps: %v",
16761709
len(notifyList), len(purgeList))
16771710

16781711
err := b.monitorConfirmations(ctx)
@@ -1690,7 +1723,7 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
16901723
// handleConf handles a confirmation notification. This is the final step of the
16911724
// batch. Here we signal to the batcher that this batch was completed.
16921725
func (b *batch) handleConf(ctx context.Context) error {
1693-
b.log.Infof("confirmed in txid %s", b.batchTxid)
1726+
b.Infof("confirmed in txid %s", b.batchTxid)
16941727
b.state = Confirmed
16951728

16961729
return b.store.ConfirmBatch(ctx, b.id)
@@ -1769,7 +1802,7 @@ func (b *batch) insertAndAcquireID(ctx context.Context) (int32, error) {
17691802
}
17701803

17711804
b.id = id
1772-
b.log = batchPrefixLogger(fmt.Sprintf("%d", b.id))
1805+
b.setLog(batchPrefixLogger(fmt.Sprintf("%d", b.id)))
17731806

17741807
return id, nil
17751808
}

0 commit comments

Comments
 (0)