diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 4415d3fc4..58cdf19dc 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -2439,6 +2439,7 @@ func (b *batch) isComplete() bool { if err != nil && err != ErrBatchShuttingDown { return false } + return b.state == Confirmed } diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index ec5453a8e..dd5d01751 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -906,7 +906,15 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, for _, batch := range b.batches { if batch.sweepExists(sweep.outpoint) { accepted, err := batch.addSweeps(ctx, sweeps) - if err != nil && !errors.Is(err, ErrBatchShuttingDown) { + if errors.Is(err, ErrBatchShuttingDown) { + // The batch finished while we were trying to + // re-add the sweep. Leave it in the map for the + // lazy cleanup below and fall back to the + // monitorSpendAndNotify path below. + break + } + + if err != nil { return err } diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 009e8da02..2d35541b2 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -13,6 +13,7 @@ import ( "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" @@ -4149,9 +4150,9 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore, go func() { defer addWG.Done() - // After this goroutine completes, stop the goroutine that handles - // registrations as well. Give it one second to finish the last - // AddSweep to prevent goroutine leaks. + // After this goroutine completes, stop the goroutine that + // handles registrations as well. Give it one second to finish + // the last AddSweep to prevent goroutine leaks. defer time.AfterFunc(time.Second, confCancel) for { @@ -4262,6 +4263,136 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore, checkBatcherError(t, runErr) } +// testSweepBatcherHandleBatchShutdown simulates a sweep that is re-added after +// its batch has already completed and begun shutting down. Before the fix, +// addSweeps surfaced ErrBatchShuttingDown, causing handleSweeps to return an +// error and, ultimately, AddSweep to deliver ErrBatcherShuttingDown back to +// the caller. The regression ensures we fall back to the persisted sweep +// status and silently switch to the monitor path. +func testSweepBatcherHandleBatchShutdown(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + ctx := context.Background() + swapHash := lntypes.Hash{2, 2, 2} + sweepOutpoint := wire.OutPoint{ + Hash: chainhash.Hash{0, 0, 0, 2}, + Index: 1, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 144, + AmountRequested: 1_000, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + Preimage: lntypes.Preimage{2}, + }, + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: confTarget, + } + err := store.CreateLoopOut(ctx, swapHash, swap) + require.NoError(t, err) + + // Insert a confirmed batch/sweep pair directly into the store so + // GetSweepStatus/GetParentBatch report that the swap already finished. + dbEntry := &dbBatch{} + batchID, err := batcherStore.InsertSweepBatch(ctx, dbEntry) + require.NoError(t, err) + + dbEntry.ID = batchID + dbEntry.Confirmed = true + require.NoError(t, batcherStore.UpdateSweepBatch(ctx, dbEntry)) + + err = batcherStore.UpsertSweep(ctx, &dbSweep{ + BatchID: batchID, + SwapHash: swapHash, + Outpoint: sweepOutpoint, + Amount: 1_000, + Completed: true, + }) + require.NoError(t, err) + + // Build a minimal batch that already contains the sweep. Its event-loop + // channels are serviced by a helper goroutine so scheduleNextCall can + // run without spinning up the full batch.Run machinery. + testCfg := &batchConfig{ + maxTimeoutDistance: defaultMaxTimeoutDistance, + } + completedBatch := &batch{ + id: batchID, + state: Confirmed, + primarySweepID: sweepOutpoint, + sweeps: map[wire.OutPoint]sweep{ + sweepOutpoint: { + swapHash: swapHash, + outpoint: sweepOutpoint, + value: 1_000, + confTarget: 6, + minFeeRate: 1, + }, + }, + callEnter: make(chan struct{}), + callLeave: make(chan struct{}), + stopping: make(chan struct{}), + finished: make(chan struct{}), + quit: make(chan struct{}), + cfg: testCfg, + store: batcherStore, + } + completedBatch.setLog(batchPrefixLogger("test-shutdown")) + + // scheduleNextCall interacts with callEnter/callLeave to serialize + // access to the batch state. We don't run the full batch.Run loop in + // this test, so we spin up a helper goroutine that grants and releases + // those slots whenever a test handler grabs them via scheduleNextCall. + // The helper also closes b.stopping the first time it runs to mimic the + // behavior of a batch whose Run method already exited (which is what + // causes ErrBatchShuttingDown). + var once sync.Once + callLoopDone := make(chan struct{}) + go func() { + defer close(callLoopDone) + + for range completedBatch.callEnter { + once.Do(func() { + close(completedBatch.stopping) + }) + + <-completedBatch.callLeave + } + }() + defer func() { + // Stop the helper loop to avoid leaking the goroutine once the + // test completes. Closing callEnter unblocks the goroutine, and + // waiting on callLoopDone ensures it has drained callLeave + // before we return. + close(completedBatch.callEnter) + <-callLoopDone + }() + + testBatcher := &Batcher{ + batches: map[int32]*batch{batchID: completedBatch}, + store: batcherStore, + chainParams: &chaincfg.TestNet3Params, + clock: clock.NewTestClock(time.Unix(0, 0)), + initialDelayProvider: zeroInitialDelay, + } + + testSweep := &sweep{ + swapHash: swapHash, + outpoint: sweepOutpoint, + confTarget: 6, + minFeeRate: 1, + value: 1_000, + } + + err = testBatcher.handleSweeps(ctx, []*sweep{testSweep}, nil, false) + require.NoError(t, err) +} + // testCustomSignMuSig2 tests the operation with custom musig2 signer. func testCustomSignMuSig2(t *testing.T, store testStore, batcherStore testBatcherStore) { @@ -5208,6 +5339,12 @@ func TestSweepBatcherHandleSweepRace(t *testing.T) { runTests(t, testSweepBatcherHandleSweepRace) } +// TestSweepBatcherHandleBatchShutdown covers the regression where a sweep that +// finishes while being re-added must not surface ErrBatcherShuttingDown. +func TestSweepBatcherHandleBatchShutdown(t *testing.T) { + runTests(t, testSweepBatcherHandleBatchShutdown) +} + // TestCustomSignMuSig2 tests the operation with custom musig2 signer. func TestCustomSignMuSig2(t *testing.T) { runTests(t, testCustomSignMuSig2)