@@ -232,6 +232,10 @@ type batch struct {
232232 // spendChan is the channel over which spend notifications are received.
233233 spendChan chan * chainntnfs.SpendDetail
234234
235+ // spendErrChan is the channel over which spend notifier errors are
236+ // received.
237+ spendErrChan chan error
238+
235239 // confChan is the channel over which confirmation notifications are
236240 // received.
237241 confChan chan * chainntnfs.TxConfirmation
@@ -378,9 +382,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
378382 id : - 1 ,
379383 state : Open ,
380384 sweeps : make (map [wire.OutPoint ]sweep ),
381- spendChan : make (chan * chainntnfs.SpendDetail ),
382385 confChan : make (chan * chainntnfs.TxConfirmation , 1 ),
383- reorgChan : make (chan struct {}, 1 ),
384386 testReqs : make (chan * testRequest ),
385387 errChan : make (chan error , 1 ),
386388 callEnter : make (chan struct {}),
@@ -423,9 +425,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
423425 state : bk .state ,
424426 primarySweepID : bk .primaryID ,
425427 sweeps : bk .sweeps ,
426- spendChan : make (chan * chainntnfs.SpendDetail ),
427428 confChan : make (chan * chainntnfs.TxConfirmation , 1 ),
428- reorgChan : make (chan struct {}, 1 ),
429429 testReqs : make (chan * testRequest ),
430430 errChan : make (chan error , 1 ),
431431 callEnter : make (chan struct {}),
@@ -979,23 +979,26 @@ func (b *batch) Run(ctx context.Context) error {
979979 return fmt .Errorf ("handleSpend error: %w" , err )
980980 }
981981
982+ case err := <- b .spendErrChan :
983+ b .writeToSpendErrChan (ctx , err )
984+
985+ return fmt .Errorf ("spend notifier failed: %w" , err )
986+
982987 case conf := <- b .confChan :
983988 if err := b .handleConf (runCtx , conf ); err != nil {
984989 return fmt .Errorf ("handleConf error: %w" , err )
985990 }
986991
987992 return nil
988993
994+ // A re-org has been detected. We set the batch state back to
995+ // open since our batch transaction is no longer present in any
996+ // block. We can accept more sweeps and try to publish.
989997 case <- b .reorgChan :
990998 b .state = Open
991999 b .Warnf ("reorg detected, batch is able to " +
9921000 "accept new sweeps" )
9931001
994- err := b .monitorSpend (ctx , b .sweeps [b .primarySweepID ])
995- if err != nil {
996- return fmt .Errorf ("monitorSpend error: %w" , err )
997- }
998-
9991002 case testReq := <- b .testReqs :
10001003 testReq .handler ()
10011004 close (testReq .quit )
@@ -1812,44 +1815,31 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
18121815// of the batch transaction gets confirmed, due to the uncertainty of RBF
18131816// replacements and network propagation, we can always detect the transaction.
18141817func (b * batch ) monitorSpend (ctx context.Context , primarySweep sweep ) error {
1815- spendCtx , cancel := context .WithCancel (ctx )
1818+ if b .spendChan != nil || b .spendErrChan != nil || b .reorgChan != nil {
1819+ return fmt .Errorf ("an attempt to run monitorSpend multiple " +
1820+ "times per batch" )
1821+ }
18161822
1817- spendChan , spendErr , err := b .chainNotifier .RegisterSpendNtfn (
1818- spendCtx , & primarySweep .outpoint , primarySweep .htlc .PkScript ,
1823+ reorgChan := make (chan struct {}, 1 )
1824+
1825+ spendChan , spendErrChan , err := b .chainNotifier .RegisterSpendNtfn (
1826+ ctx , & primarySweep .outpoint , primarySweep .htlc .PkScript ,
18191827 primarySweep .initiationHeight ,
1828+ lndclient .WithReOrgChan (reorgChan ),
18201829 )
18211830 if err != nil {
1822- cancel ()
1823-
1824- return err
1831+ return fmt .Errorf ("failed to register spend notifier for " +
1832+ "primary sweep %v, pkscript %x, height %d: %w" ,
1833+ primarySweep .outpoint , primarySweep .htlc .PkScript ,
1834+ primarySweep .initiationHeight , err )
18251835 }
18261836
1827- b .wg .Add (1 )
1828- go func () {
1829- defer cancel ()
1830- defer b .wg .Done ()
1837+ b .Infof ("monitoring spend for outpoint %s" ,
1838+ primarySweep .outpoint .String ())
18311839
1832- b .Infof ("monitoring spend for outpoint %s" ,
1833- primarySweep .outpoint .String ())
1834-
1835- select {
1836- case spend := <- spendChan :
1837- select {
1838- case b .spendChan <- spend :
1839-
1840- case <- ctx .Done ():
1841- }
1842-
1843- case err := <- spendErr :
1844- b .writeToSpendErrChan (ctx , err )
1845-
1846- b .writeToErrChan (
1847- fmt .Errorf ("spend error: %w" , err ),
1848- )
1849-
1850- case <- ctx .Done ():
1851- }
1852- }()
1840+ b .spendChan = spendChan
1841+ b .spendErrChan = spendErrChan
1842+ b .reorgChan = reorgChan
18531843
18541844 return nil
18551845}
@@ -1862,14 +1852,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18621852 return fmt .Errorf ("can't find primarySweep" )
18631853 }
18641854
1865- reorgChan := make (chan struct {})
1866-
18671855 confCtx , cancel := context .WithCancel (ctx )
18681856
18691857 confChan , errChan , err := b .chainNotifier .RegisterConfirmationsNtfn (
18701858 confCtx , b .batchTxid , b .batchPkScript , batchConfHeight ,
18711859 primarySweep .initiationHeight ,
1872- lndclient .WithReOrgChan (reorgChan ),
18731860 )
18741861 if err != nil {
18751862 cancel ()
@@ -1895,18 +1882,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18951882 b .writeToErrChan (fmt .Errorf ("confirmations " +
18961883 "monitoring error: %w" , err ))
18971884
1898- case <- reorgChan :
1899- // A re-org has been detected. We set the batch
1900- // state back to open since our batch
1901- // transaction is no longer present in any
1902- // block. We can accept more sweeps and try to
1903- // publish new transactions, at this point we
1904- // need to monitor again for a new spend.
1905- select {
1906- case b .reorgChan <- struct {}{}:
1907- case <- ctx .Done ():
1908- }
1909-
19101885 case <- ctx .Done ():
19111886 }
19121887 }()
@@ -2395,12 +2370,6 @@ func (b *batch) writeToErrChan(err error) {
23952370
23962371// writeToSpendErrChan sends an error to spend error channels of all the sweeps.
23972372func (b * batch ) writeToSpendErrChan (ctx context.Context , spendErr error ) {
2398- done , err := b .scheduleNextCall ()
2399- if err != nil {
2400- done ()
2401-
2402- return
2403- }
24042373 notifiers := make ([]* SpendNotifier , 0 , len (b .sweeps ))
24052374 for _ , s := range b .sweeps {
24062375 // If the sweep's notifier is empty then this means that a swap
@@ -2412,7 +2381,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
24122381
24132382 notifiers = append (notifiers , s .notifier )
24142383 }
2415- done ()
24162384
24172385 for _ , notifier := range notifiers {
24182386 select {
0 commit comments