diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 2e815899e..a1cb339de 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -214,10 +214,6 @@ type batch struct { // currentHeight is the current block height. currentHeight int32 - // blockEpochChan is the channel over which block epoch notifications - // are received. - blockEpochChan chan int32 - // spendChan is the channel over which spend notifications are received. spendChan chan *chainntnfs.SpendDetail @@ -362,7 +358,6 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch { id: -1, state: Open, sweeps: make(map[lntypes.Hash]sweep), - blockEpochChan: make(chan int32), spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), reorgChan: make(chan struct{}, 1), @@ -407,7 +402,6 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) { state: bk.state, primarySweepID: bk.primaryID, sweeps: bk.sweeps, - blockEpochChan: make(chan int32), spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), reorgChan: make(chan struct{}, 1), @@ -626,6 +620,16 @@ func (b *batch) Run(ctx context.Context) error { return err } + // Set currentHeight here, because it may be needed in monitorSpend. + select { + case b.currentHeight = <-blockChan: + b.log.Debugf("initial height for the batch is %v", + b.currentHeight) + + case <-runCtx.Done(): + return runCtx.Err() + } + // If a primary sweep exists we immediately start monitoring for its // spend. if b.primarySweepID != lntypes.ZeroHash { @@ -642,9 +646,9 @@ func (b *batch) Run(ctx context.Context) error { skipBefore := clock.Now().Add(b.cfg.initialDelay) // initialDelayChan is a timer which fires upon initial delay end. - // If initialDelay is 0, it does not fire to prevent race with - // blockChan which also fires immediately with current tip. Such a race - // may result in double publishing if batchPublishDelay is also 0. + // If initialDelay is set to 0, it will not trigger to avoid setting up + // timerChan twice, which could lead to double publishing if + // batchPublishDelay is also 0. var initialDelayChan <-chan time.Time if b.cfg.initialDelay > 0 { initialDelayChan = clock.TickAfter(b.cfg.initialDelay) @@ -653,9 +657,10 @@ func (b *batch) Run(ctx context.Context) error { // We use a timer in order to not publish new transactions at the same // time as the block epoch notification. This is done to prevent // unnecessary transaction publishments when a spend is detected on that - // block. This timer starts after new block arrives or initialDelay + // block. This timer starts after new block arrives (including the + // current tip which we read from blockChan above) or when initialDelay // completes. - var timerChan <-chan time.Time + timerChan := clock.TickAfter(b.cfg.batchPublishDelay) b.log.Infof("started, primary %x, total sweeps %v", b.primarySweepID[0:6], len(b.sweeps)) @@ -1872,7 +1877,10 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int, totalSweptAmt btcutil.Amount) (btcutil.Amount, btcutil.Amount) { - totalFee := int64(totalSweptAmt) - spendTx.TxOut[0].Value + totalFee := int64(totalSweptAmt) + if len(spendTx.TxOut) > 0 { + totalFee -= spendTx.TxOut[0].Value + } feePortionPerSweep := totalFee / int64(numSweeps) roundingDiff := totalFee - (int64(numSweeps) * feePortionPerSweep) @@ -1900,7 +1908,11 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { notifyList = make([]sweep, 0, len(b.sweeps)) ) b.batchTxid = &txHash - b.batchPkScript = spendTx.TxOut[0].PkScript + if len(spendTx.TxOut) > 0 { + b.batchPkScript = spendTx.TxOut[0].PkScript + } else { + b.log.Warnf("transaction %v has no outputs", txHash) + } // As a previous version of the batch transaction may get confirmed, // which does not contain the latest sweeps, we need to detect the