From 1ab73a62726b814f0024ff3d16c23048965b64f6 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Mon, 16 Jun 2025 23:28:26 -0300 Subject: [PATCH 1/2] loopout: ignore saved HTLC txid If a reorg happens, the saved txid prevents loop-out from discoveing new txid. --- client_test.go | 2 +- loopout.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/client_test.go b/client_test.go index dfd96c785..15b9c7c9d 100644 --- a/client_test.go +++ b/client_test.go @@ -291,7 +291,7 @@ func testLoopOutResume(t *testing.T, confs uint32, expired, preimageRevealed, // Expect client to register for our expected number of confirmations. confIntent := ctx.Context.AssertRegisterConf( - preimageRevealed, int32(confs), + false, int32(confs), ) htlc, err := utils.GetHtlc( diff --git a/loopout.go b/loopout.go index fde6d2826..f7da0de67 100644 --- a/loopout.go +++ b/loopout.go @@ -330,7 +330,6 @@ func resumeLoopOutSwap(cfg *swapConfig, pend *loopdb.LoopOut, } else { swap.state = lastUpdate.State swap.lastUpdateTime = lastUpdate.Time - swap.htlcTxHash = lastUpdate.HtlcTxHash } return swap, nil From 10362141608720f1265fe03bcb27bab9d80bd161 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Tue, 17 Jun 2025 14:01:59 -0300 Subject: [PATCH 2/2] sweepbatcher: add an option to ignore HTLC txids Added option WithSkippedTxns, which has one historical problematic tx by default. Sweeps originating from these transactions are omitted when reading from DB. loopdb: add column sweep_batches.cancelled and replaced DropBatch with CancelBatch. It is needed, because sweep.batch_id is a foreign key to batch. Changed StoreMock.InsertSweepBatch not to reuse batch_id. This is needed by the test, which checks that new batch has fresh ID. --- client.go | 34 +++- loopd/config.go | 2 + loopd/utils.go | 1 + loopdb/sqlc/batch.sql.go | 28 +-- .../000016_batch_cancelled.down.sql | 1 + .../migrations/000016_batch_cancelled.up.sql | 1 + loopdb/sqlc/models.go | 1 + loopdb/sqlc/querier.go | 2 +- loopdb/sqlc/queries/batch.sql | 8 +- sweepbatcher/presigned.go | 2 + sweepbatcher/presigned_test.go | 65 ++++++- sweepbatcher/store.go | 26 +-- sweepbatcher/store_mock.go | 14 +- sweepbatcher/sweep_batch.go | 4 + sweepbatcher/sweep_batcher.go | 59 ++++++- sweepbatcher/sweep_batcher_test.go | 159 ++++++++++++++++++ 16 files changed, 349 insertions(+), 58 deletions(-) create mode 100644 loopdb/sqlc/migrations/000016_batch_cancelled.down.sql create mode 100644 loopdb/sqlc/migrations/000016_batch_cancelled.up.sql diff --git a/client.go b/client.go index 13a3c3c95..e7f693db3 100644 --- a/client.go +++ b/client.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightninglabs/aperture/l402" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop/assets" @@ -146,6 +147,10 @@ type ClientConfig struct { // be attempted. LoopOutMaxParts uint32 + // SkippedTxns is the list of existing HTLC txids to skip when starting + // Loop. This should only be used if affected by the historical bug. + SkippedTxns []string + // TotalPaymentTimeout is the total amount of time until we time out // off-chain payments (used in loop out). TotalPaymentTimeout time.Duration @@ -246,11 +251,7 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore, sweeper, loopDB, cfg.Lnd.ChainParams, getHeight, ) - batcher := sweepbatcher.NewBatcher( - cfg.Lnd.WalletKit, cfg.Lnd.ChainNotifier, cfg.Lnd.Signer, - swapServerClient.MultiMuSig2SignSweep, verifySchnorrSig, - cfg.Lnd.ChainParams, sweeperDb, sweepStore, - + batcherOpts := []sweepbatcher.BatcherOption{ // Disable 100 sats/kw fee bump every block and retarget feerate // every block according to the current mempool condition. sweepbatcher.WithCustomFeeRate( @@ -265,8 +266,29 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore, // delay time to sweepbatcher's handling. The delay used in // loopout.go is repushDelay. sweepbatcher.WithPublishDelay( - repushDelay+additionalDelay, + repushDelay + additionalDelay, ), + } + + if len(cfg.SkippedTxns) != 0 { + skippedTxns := make(map[chainhash.Hash]struct{}) + for _, txid := range cfg.SkippedTxns { + txid, err := chainhash.NewHashFromStr(txid) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse "+ + "txid to skip %v: %w", txid, err) + } + skippedTxns[*txid] = struct{}{} + } + batcherOpts = append(batcherOpts, sweepbatcher.WithSkippedTxns( + skippedTxns, + )) + } + + batcher := sweepbatcher.NewBatcher( + cfg.Lnd.WalletKit, cfg.Lnd.ChainNotifier, cfg.Lnd.Signer, + swapServerClient.MultiMuSig2SignSweep, verifySchnorrSig, + cfg.Lnd.ChainParams, sweeperDb, sweepStore, batcherOpts..., ) executor = newExecutor(&executorConfig{ diff --git a/loopd/config.go b/loopd/config.go index 3a0a78e3e..63c472690 100644 --- a/loopd/config.go +++ b/loopd/config.go @@ -184,6 +184,8 @@ type Config struct { LoopOutMaxParts uint32 `long:"loopoutmaxparts" description:"The maximum number of payment parts that may be used for a loop out swap."` + SkippedTxns []string `long:"skippedtxns" description:"The list of existing HTLC txids to skip when starting Loop. This should only be used if affected by the historical bug." hidden:"true"` + TotalPaymentTimeout time.Duration `long:"totalpaymenttimeout" description:"The timeout to use for off-chain payments."` MaxPaymentRetries int `long:"maxpaymentretries" description:"The maximum number of times an off-chain payment may be retried."` diff --git a/loopd/utils.go b/loopd/utils.go index 8504898a7..9b439ac5a 100644 --- a/loopd/utils.go +++ b/loopd/utils.go @@ -50,6 +50,7 @@ func getClient(cfg *Config, swapDb loopdb.SwapStore, MaxL402Cost: btcutil.Amount(cfg.MaxL402Cost), MaxL402Fee: btcutil.Amount(cfg.MaxL402Fee), LoopOutMaxParts: cfg.LoopOutMaxParts, + SkippedTxns: cfg.SkippedTxns, TotalPaymentTimeout: cfg.TotalPaymentTimeout, MaxPaymentRetries: cfg.MaxPaymentRetries, MaxStaticAddrHtlcFeePercentage: cfg.MaxStaticAddrHtlcFeePercentage, diff --git a/loopdb/sqlc/batch.sql.go b/loopdb/sqlc/batch.sql.go index 9c8aedc1c..875473fe7 100644 --- a/loopdb/sqlc/batch.sql.go +++ b/loopdb/sqlc/batch.sql.go @@ -10,6 +10,17 @@ import ( "database/sql" ) +const cancelBatch = `-- name: CancelBatch :exec +UPDATE sweep_batches SET + cancelled = TRUE +WHERE id = $1 +` + +func (q *Queries) CancelBatch(ctx context.Context, id int32) error { + _, err := q.db.ExecContext(ctx, cancelBatch, id) + return err +} + const confirmBatch = `-- name: ConfirmBatch :exec UPDATE sweep_batches @@ -24,15 +35,6 @@ func (q *Queries) ConfirmBatch(ctx context.Context, id int32) error { return err } -const dropBatch = `-- name: DropBatch :exec -DELETE FROM sweep_batches WHERE id = $1 -` - -func (q *Queries) DropBatch(ctx context.Context, id int32) error { - _, err := q.db.ExecContext(ctx, dropBatch, id) - return err -} - const getBatchSweeps = `-- name: GetBatchSweeps :many SELECT id, swap_hash, batch_id, outpoint, amt, completed @@ -94,7 +96,7 @@ func (q *Queries) GetBatchSweptAmount(ctx context.Context, batchID int32) (int64 const getParentBatch = `-- name: GetParentBatch :one SELECT - sweep_batches.id, sweep_batches.confirmed, sweep_batches.batch_tx_id, sweep_batches.batch_pk_script, sweep_batches.last_rbf_height, sweep_batches.last_rbf_sat_per_kw, sweep_batches.max_timeout_distance + sweep_batches.id, sweep_batches.confirmed, sweep_batches.batch_tx_id, sweep_batches.batch_pk_script, sweep_batches.last_rbf_height, sweep_batches.last_rbf_sat_per_kw, sweep_batches.max_timeout_distance, sweep_batches.cancelled FROM sweep_batches JOIN @@ -114,6 +116,7 @@ func (q *Queries) GetParentBatch(ctx context.Context, outpoint string) (SweepBat &i.LastRbfHeight, &i.LastRbfSatPerKw, &i.MaxTimeoutDistance, + &i.Cancelled, ) return i, err } @@ -136,11 +139,11 @@ func (q *Queries) GetSweepStatus(ctx context.Context, outpoint string) (bool, er const getUnconfirmedBatches = `-- name: GetUnconfirmedBatches :many SELECT - id, confirmed, batch_tx_id, batch_pk_script, last_rbf_height, last_rbf_sat_per_kw, max_timeout_distance + id, confirmed, batch_tx_id, batch_pk_script, last_rbf_height, last_rbf_sat_per_kw, max_timeout_distance, cancelled FROM sweep_batches WHERE - confirmed = FALSE + confirmed = FALSE AND cancelled = FALSE ` func (q *Queries) GetUnconfirmedBatches(ctx context.Context) ([]SweepBatch, error) { @@ -160,6 +163,7 @@ func (q *Queries) GetUnconfirmedBatches(ctx context.Context) ([]SweepBatch, erro &i.LastRbfHeight, &i.LastRbfSatPerKw, &i.MaxTimeoutDistance, + &i.Cancelled, ); err != nil { return nil, err } diff --git a/loopdb/sqlc/migrations/000016_batch_cancelled.down.sql b/loopdb/sqlc/migrations/000016_batch_cancelled.down.sql new file mode 100644 index 000000000..a6e1d174c --- /dev/null +++ b/loopdb/sqlc/migrations/000016_batch_cancelled.down.sql @@ -0,0 +1 @@ +ALTER TABLE sweep_batches DROP COLUMN cancelled; diff --git a/loopdb/sqlc/migrations/000016_batch_cancelled.up.sql b/loopdb/sqlc/migrations/000016_batch_cancelled.up.sql new file mode 100644 index 000000000..bc8ad94d2 --- /dev/null +++ b/loopdb/sqlc/migrations/000016_batch_cancelled.up.sql @@ -0,0 +1 @@ +ALTER TABLE sweep_batches ADD COLUMN cancelled BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/loopdb/sqlc/models.go b/loopdb/sqlc/models.go index c71f02392..6728bb420 100644 --- a/loopdb/sqlc/models.go +++ b/loopdb/sqlc/models.go @@ -197,6 +197,7 @@ type SweepBatch struct { LastRbfHeight sql.NullInt32 LastRbfSatPerKw sql.NullInt32 MaxTimeoutDistance int32 + Cancelled bool } type SweepsOld struct { diff --git a/loopdb/sqlc/querier.go b/loopdb/sqlc/querier.go index 9b600727e..c293deb98 100644 --- a/loopdb/sqlc/querier.go +++ b/loopdb/sqlc/querier.go @@ -12,13 +12,13 @@ import ( type Querier interface { AllDeposits(ctx context.Context) ([]Deposit, error) AllStaticAddresses(ctx context.Context) ([]StaticAddress, error) + CancelBatch(ctx context.Context, id int32) error ConfirmBatch(ctx context.Context, id int32) error CreateDeposit(ctx context.Context, arg CreateDepositParams) error CreateReservation(ctx context.Context, arg CreateReservationParams) error CreateStaticAddress(ctx context.Context, arg CreateStaticAddressParams) error CreateWithdrawal(ctx context.Context, arg CreateWithdrawalParams) error CreateWithdrawalDeposit(ctx context.Context, arg CreateWithdrawalDepositParams) error - DropBatch(ctx context.Context, id int32) error FetchLiquidityParams(ctx context.Context) ([]byte, error) GetAllWithdrawals(ctx context.Context) ([]Withdrawal, error) GetBatchSweeps(ctx context.Context, batchID int32) ([]Sweep, error) diff --git a/loopdb/sqlc/queries/batch.sql b/loopdb/sqlc/queries/batch.sql index b02241273..2a8696072 100644 --- a/loopdb/sqlc/queries/batch.sql +++ b/loopdb/sqlc/queries/batch.sql @@ -4,7 +4,7 @@ SELECT FROM sweep_batches WHERE - confirmed = FALSE; + confirmed = FALSE AND cancelled = FALSE; -- name: InsertBatch :one INSERT INTO sweep_batches ( @@ -23,8 +23,10 @@ INSERT INTO sweep_batches ( $6 ) RETURNING id; --- name: DropBatch :exec -DELETE FROM sweep_batches WHERE id = $1; +-- name: CancelBatch :exec +UPDATE sweep_batches SET + cancelled = TRUE +WHERE id = $1; -- name: UpdateBatch :exec UPDATE sweep_batches SET diff --git a/sweepbatcher/presigned.go b/sweepbatcher/presigned.go index 0d52bcbfe..7ca325938 100644 --- a/sweepbatcher/presigned.go +++ b/sweepbatcher/presigned.go @@ -126,6 +126,8 @@ func (b *batch) getOrderedSweeps(ctx context.Context) ([]sweep, error) { return nil, fmt.Errorf("FetchBatchSweeps(%d) failed: %w", b.id, err) } + dbSweeps = filterDbSweeps(b.cfg.skippedTxns, dbSweeps) + if len(dbSweeps) != len(utxo2sweep) { return nil, fmt.Errorf("FetchBatchSweeps(%d) returned %d "+ "sweeps, len(b.sweeps) is %d", b.id, len(dbSweeps), diff --git a/sweepbatcher/presigned_test.go b/sweepbatcher/presigned_test.go index 60f287764..915038889 100644 --- a/sweepbatcher/presigned_test.go +++ b/sweepbatcher/presigned_test.go @@ -33,8 +33,9 @@ func TestOrderedSweeps(t *testing.T) { ctx := context.Background() cases := []struct { - name string - sweeps []sweep + name string + sweeps []sweep + skippedTxns map[chainhash.Hash]struct{} // Testing errors. skipStore bool @@ -69,6 +70,20 @@ func TestOrderedSweeps(t *testing.T) { }, }, + { + name: "one sweep, skipped", + sweeps: []sweep{ + { + outpoint: op1, + swapHash: swapHash1, + }, + }, + skippedTxns: map[chainhash.Hash]struct{}{ + op1.Hash: {}, + }, + wantGroups: [][]sweep{}, + }, + { name: "two sweeps, one swap", sweeps: []sweep{ @@ -95,6 +110,31 @@ func TestOrderedSweeps(t *testing.T) { }, }, + { + name: "two sweeps, one swap, one skipped", + sweeps: []sweep{ + { + outpoint: op2, + swapHash: swapHash1, + }, + { + outpoint: op1, + swapHash: swapHash1, + }, + }, + skippedTxns: map[chainhash.Hash]struct{}{ + op1.Hash: {}, + }, + wantGroups: [][]sweep{ + { + { + outpoint: op2, + swapHash: swapHash1, + }, + }, + }, + }, + { name: "two sweeps, two swap", sweeps: []sweep{ @@ -266,6 +306,9 @@ func TestOrderedSweeps(t *testing.T) { b := &batch{ sweeps: m, store: NewStoreMock(), + cfg: &batchConfig{ + skippedTxns: tc.skippedTxns, + }, } // Store the sweeps in mock store. @@ -299,6 +342,14 @@ func TestOrderedSweeps(t *testing.T) { m[added.outpoint] = added } + // Remove skipped sweeps from the batch to make it + // match with what is read from DB after filtering. + for op := range m { + if _, has := tc.skippedTxns[op.Hash]; has { + delete(m, op) + } + } + // Now run the tested functions. orderedSweeps, err := b.getOrderedSweeps(ctx) if tc.wantErr1 != "" { @@ -313,7 +364,15 @@ func TestOrderedSweeps(t *testing.T) { } // The wanted list of sweeps matches the input order. - require.Equal(t, tc.sweeps, orderedSweeps) + notSkipped := make([]sweep, 0, len(tc.sweeps)) + for _, s := range tc.sweeps { + _, has := tc.skippedTxns[s.outpoint.Hash] + if has { + continue + } + notSkipped = append(notSkipped, s) + } + require.Equal(t, notSkipped, orderedSweeps) groups, err := b.getSweepsGroups(ctx) if tc.wantErr2 != "" { diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 1b87cde8b..643d3219f 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -3,7 +3,6 @@ package sweepbatcher import ( "context" "database/sql" - "fmt" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" @@ -44,8 +43,8 @@ type Querier interface { InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) ( int32, error) - // DropBatch drops a batch from the database. - DropBatch(ctx context.Context, id int32) error + // CancelBatch marks the batch as cancelled. + CancelBatch(ctx context.Context, id int32) error // UpdateBatch updates a batch in the database. UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error @@ -113,22 +112,11 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch)) } -// DropBatch drops a batch from the database. Note that we only use this call -// for batches that have no sweeps and so we'd not be able to resume. -func (s *SQLStore) DropBatch(ctx context.Context, id int32) error { - readOpts := loopdb.NewSqlWriteOpts() - return s.baseDb.ExecTx(ctx, readOpts, func(tx Querier) error { - dbSweeps, err := tx.GetBatchSweeps(ctx, id) - if err != nil { - return err - } - - if len(dbSweeps) != 0 { - return fmt.Errorf("cannot drop a non-empty batch") - } - - return tx.DropBatch(ctx, id) - }) +// CancelBatch marks a batch as cancelled in the database. Note that we only use +// this call for batches that have no sweeps or all the sweeps are in skipped +// transaction and so we'd not be able to resume. +func (s *SQLStore) CancelBatch(ctx context.Context, id int32) error { + return s.baseDb.CancelBatch(ctx, id) } // UpdateSweepBatch updates a batch in the database. diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index d5a3ffbc1..23e15f9ed 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -16,6 +16,7 @@ type StoreMock struct { sweeps map[wire.OutPoint]dbSweep mu sync.Mutex sweepID int32 + batchID int32 } // NewStoreMock instantiates a new mock store. @@ -52,20 +53,15 @@ func (s *StoreMock) InsertSweepBatch(ctx context.Context, s.mu.Lock() defer s.mu.Unlock() - var id int32 - - if len(s.batches) == 0 { - id = 0 - } else { - id = int32(len(s.batches)) - } + id := s.batchID + s.batchID++ s.batches[id] = *batch return id, nil } -// DropBatch drops a batch from the database. -func (s *StoreMock) DropBatch(ctx context.Context, id int32) error { +// CancelBatch drops a batch from the database. +func (s *StoreMock) CancelBatch(ctx context.Context, id int32) error { delete(s.batches, id) return nil } diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 3084a26aa..583c6e41a 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -187,6 +187,10 @@ type batchConfig struct { // enabled. presignedHelper PresignedHelper + // skippedTxns is the list of previous transactions to ignore when + // loading the sweeps from DB. This is needed to fix a historical bug. + skippedTxns map[chainhash.Hash]struct{} + // chainParams are the chain parameters of the chain that is used by // batches. chainParams *chaincfg.Params diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index b89ba5f6b..e669e0ccb 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -50,9 +50,10 @@ type BatcherStore interface { // of the inserted batch. InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error) - // DropBatch drops a batch from the database. This should only be used - // when a batch is empty. - DropBatch(ctx context.Context, id int32) error + // CancelBatch marks a batch as cancelled in the database. Note that we + // only use this call for batches that have no sweeps or all the sweeps + // are in skipped transaction and so we'd not be able to resume. + CancelBatch(ctx context.Context, id int32) error // UpdateSweepBatch updates a batch in the database. UpdateSweepBatch(ctx context.Context, batch *dbBatch) error @@ -440,6 +441,10 @@ type Batcher struct { // presignedHelper provides methods used when presigned batches are // enabled. presignedHelper PresignedHelper + + // skippedTxns is the list of previous transactions to ignore when + // loading the sweeps from DB. This is needed to fix a historical bug. + skippedTxns map[chainhash.Hash]struct{} } // BatcherConfig holds batcher configuration. @@ -484,6 +489,10 @@ type BatcherConfig struct { // presignedHelper provides methods used when presigned batches are // enabled. presignedHelper PresignedHelper + + // skippedTxns is the list of previous transactions to ignore when + // loading the sweeps from DB. This is needed to fix a historical bug. + skippedTxns map[chainhash.Hash]struct{} } // BatcherOption configures batcher behaviour. @@ -566,6 +575,14 @@ func WithPresignedHelper(presignedHelper PresignedHelper) BatcherOption { } } +// WithSkippedTxns is the list of previous transactions to ignore when +// loading the sweeps from DB. This is needed to fix a historical bug. +func WithSkippedTxns(skippedTxns map[chainhash.Hash]struct{}) BatcherOption { + return func(cfg *BatcherConfig) { + cfg.skippedTxns = skippedTxns + } +} + // NewBatcher creates a new Batcher instance. func NewBatcher(wallet lndclient.WalletKitClient, chainNotifier lndclient.ChainNotifierClient, @@ -574,6 +591,14 @@ func NewBatcher(wallet lndclient.WalletKitClient, store BatcherStore, sweepStore SweepFetcher, opts ...BatcherOption) *Batcher { + badTx1, err := chainhash.NewHashFromStr( + "7028bdac753a254785d29506f311abcda323706b531345105f38999" + + "aecd6f3d1", + ) + if err != nil { + panic(err) + } + cfg := BatcherConfig{ // By default, loop/labels.LoopOutBatchSweepSuccess is used // to label sweep transactions. @@ -583,6 +608,10 @@ func NewBatcher(wallet lndclient.WalletKitClient, // publishing error. By default, it logs all errors as warnings, // but "insufficient fee" as Info. publishErrorHandler: defaultPublishErrorLogger, + + skippedTxns: map[chainhash.Hash]struct{}{ + *badTx1: {}, + }, } for _, opt := range opts { opt(&cfg) @@ -621,6 +650,7 @@ func NewBatcher(wallet lndclient.WalletKitClient, customMuSig2Signer: cfg.customMuSig2Signer, publishErrorHandler: cfg.publishErrorHandler, presignedHelper: cfg.presignedHelper, + skippedTxns: cfg.skippedTxns, } } @@ -1000,6 +1030,22 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { return batch, nil } +// filterDbSweeps copies dbSweeps, skipping the sweeps from skipped txs. +func filterDbSweeps(skippedTxns map[chainhash.Hash]struct{}, + dbSweeps []*dbSweep) []*dbSweep { + + result := make([]*dbSweep, 0, len(dbSweeps)) + for _, dbSweep := range dbSweeps { + if _, has := skippedTxns[dbSweep.Outpoint.Hash]; has { + continue + } + + result = append(result, dbSweep) + } + + return result +} + // spinUpBatchFromDB spins up a batch that already existed in storage, then // returns it. func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { @@ -1007,13 +1053,15 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { if err != nil { return err } + dbSweeps = filterDbSweeps(b.skippedTxns, dbSweeps) if len(dbSweeps) == 0 { infof("skipping restored batch %d as it has no sweeps", batch.id) - // It is safe to drop this empty batch as it has no sweeps. - err := b.store.DropBatch(ctx, batch.id) + // It is safe to cancel this empty batch as it has no sweeps + // that are not skipped. + err := b.store.CancelBatch(ctx, batch.id) if err != nil { warnf("unable to drop empty batch %d: %v", batch.id, err) @@ -1502,6 +1550,7 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig { txLabeler: b.txLabeler, customMuSig2Signer: b.customMuSig2Signer, presignedHelper: b.presignedHelper, + skippedTxns: b.skippedTxns, clock: b.clock, chainParams: b.chainParams, } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index fa871829f..3e398fe12 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -1172,6 +1172,159 @@ func testSweepBatcherSimpleLifecycle(t *testing.T, store testStore, require.ErrorIs(t, runErr, testError) } +// testSweepBatcherSkippedTxns tests that option WithSkippedTxns +// works as expected. +func testSweepBatcherSkippedTxns(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + batcher := NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, + ) + var wg sync.WaitGroup + wg.Add(1) + var runErr error + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Create a sweep request. + op1 := wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + } + swapHash := lntypes.Hash{1, 1, 1} + const ( + inputValue = 111 + initiationHeight = 550 + ) + + swap1 := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 111, + AmountRequested: inputValue, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + InitiationHeight: initiationHeight, + }, + + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: 111, + } + + err = store.CreateLoopOut(ctx, swapHash, swap1) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ + SwapHash: swapHash, + Inputs: []Input{{ + Value: inputValue, + Outpoint: op1, + }}, + Notifier: &dummyNotifier, + })) + + // When batch is successfully created it will execute it's first step, + // which leads to a spend monitor of the primary sweep. + <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + <-lnd.TxPublishChannel + + // Record batch ID. + var oldBatchID int32 + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + oldBatchID = batch.id + + return true + }, test.Timeout, eventuallyCheckFrequency) + + // Restart the batcher, adding the oldBatchID to skipped batches. + cancel() + wg.Wait() + checkBatcherError(t, runErr) + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + batcher = NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, + WithSkippedTxns(map[chainhash.Hash]struct{}{ + op1.Hash: {}, + }), + ) + wg.Add(1) + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Add the same swap with another outpoint. + op2 := wire.OutPoint{ + Hash: chainhash.Hash{2, 2}, + Index: 2, + } + require.NoError(t, batcher.AddSweep(ctx, &SweepRequest{ + SwapHash: swapHash, + Inputs: []Input{{ + Value: inputValue, + Outpoint: op2, + }}, + Notifier: &dummyNotifier, + })) + + // Make sure it is launched in a new batch. + <-lnd.RegisterSpendChannel + + // Wait for tx to be published. + tx := <-lnd.TxPublishChannel + require.Len(t, tx.TxIn, 1) + + // Record new batch ID. + var newBatchID int32 + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + newBatchID = batch.id + + return true + }, test.Timeout, eventuallyCheckFrequency) + + // Make sure it is another batch. + require.NotEqual(t, oldBatchID, newBatchID) + + // Stop the batcher. + cancel() + wg.Wait() + checkBatcherError(t, runErr) +} + // wrappedLogger implements btclog.Logger, recording last debug message format. // It is needed to watch for messages in tests. type wrappedLogger struct { @@ -4745,6 +4898,12 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) { runTests(t, testSweepBatcherSimpleLifecycle) } +// TestSweepBatcherSkippedTxns tests that option WithSkippedTxns +// works as expected. +func TestSweepBatcherSkippedTxns(t *testing.T) { + runTests(t, testSweepBatcherSkippedTxns) +} + // TestDelays tests that WithInitialDelay and WithPublishDelay work. func TestDelays(t *testing.T) { runTests(t, testDelays)