Skip to content

Commit 37eb346

Browse files
committed
sweepbatcher: handle batch shutdown during re-add
Started from CI failure of TestSweepBatcherHandleSweepRace/loopdb [1] that surfaced ErrBatcherShuttingDown when the AddSweep loop caught a batch right as it finished. Reproduced via: go test ./sweepbatcher -run TestSweepBatcherHandleSweepRace/loopdb -cpu=12,4,8 -count=1 Discovered that handleSweeps treated ErrBatchShuttingDown from batch.addSweeps as fatal, so the batcher exited even though the sweep was already confirmed. Fixed by treating that error as "batch already done" so we fall through to the persisted status/monitorSpend path, and added a regression test that deterministically simulates the shutdown window. New regression test added: go test ./sweepbatcher -run TestSweepBatcherHandleBatchShutdown [1] https://github.com/lightninglabs/loop/actions/runs/19282552307/job/55136597881?pr=1041
1 parent d82b362 commit 37eb346

File tree

2 files changed

+146
-1
lines changed

2 files changed

+146
-1
lines changed

sweepbatcher/sweep_batcher.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,15 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
906906
for _, batch := range b.batches {
907907
if batch.sweepExists(sweep.outpoint) {
908908
accepted, err := batch.addSweeps(ctx, sweeps)
909-
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
909+
if errors.Is(err, ErrBatchShuttingDown) {
910+
// The batch finished while we were trying to
911+
// re-add the sweep. Leave it in the map for the
912+
// lazy cleanup below and fall back to the
913+
// monitorSpendAndNotify path below.
914+
break
915+
}
916+
917+
if err != nil {
910918
return err
911919
}
912920

sweepbatcher/sweep_batcher_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/btcsuite/btcd/blockchain"
1414
"github.com/btcsuite/btcd/btcec/v2"
1515
"github.com/btcsuite/btcd/btcutil"
16+
"github.com/btcsuite/btcd/chaincfg"
1617
"github.com/btcsuite/btcd/chaincfg/chainhash"
1718
"github.com/btcsuite/btcd/wire"
1819
"github.com/btcsuite/btclog/v2"
@@ -4262,6 +4263,136 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore,
42624263
checkBatcherError(t, runErr)
42634264
}
42644265

4266+
// testSweepBatcherHandleBatchShutdown simulates a sweep that is re-added after
4267+
// its batch has already completed and begun shutting down. Before the fix,
4268+
// addSweeps surfaced ErrBatchShuttingDown, causing handleSweeps to return an
4269+
// error and, ultimately, AddSweep to deliver ErrBatcherShuttingDown back to
4270+
// the caller. The regression ensures we fall back to the persisted sweep
4271+
// status and silently switch to the monitor path.
4272+
func testSweepBatcherHandleBatchShutdown(t *testing.T, store testStore,
4273+
batcherStore testBatcherStore) {
4274+
4275+
defer test.Guard(t)()
4276+
4277+
ctx := context.Background()
4278+
swapHash := lntypes.Hash{2, 2, 2}
4279+
sweepOutpoint := wire.OutPoint{
4280+
Hash: chainhash.Hash{0, 0, 0, 2},
4281+
Index: 1,
4282+
}
4283+
4284+
swap := &loopdb.LoopOutContract{
4285+
SwapContract: loopdb.SwapContract{
4286+
CltvExpiry: 144,
4287+
AmountRequested: 1_000,
4288+
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
4289+
HtlcKeys: htlcKeys,
4290+
Preimage: lntypes.Preimage{2},
4291+
},
4292+
DestAddr: destAddr,
4293+
SwapInvoice: swapInvoice,
4294+
SweepConfTarget: confTarget,
4295+
}
4296+
err := store.CreateLoopOut(ctx, swapHash, swap)
4297+
require.NoError(t, err)
4298+
4299+
// Insert a confirmed batch/sweep pair directly into the store so
4300+
// GetSweepStatus/GetParentBatch report that the swap already finished.
4301+
dbEntry := &dbBatch{}
4302+
batchID, err := batcherStore.InsertSweepBatch(ctx, dbEntry)
4303+
require.NoError(t, err)
4304+
4305+
dbEntry.ID = batchID
4306+
dbEntry.Confirmed = true
4307+
require.NoError(t, batcherStore.UpdateSweepBatch(ctx, dbEntry))
4308+
4309+
err = batcherStore.UpsertSweep(ctx, &dbSweep{
4310+
BatchID: batchID,
4311+
SwapHash: swapHash,
4312+
Outpoint: sweepOutpoint,
4313+
Amount: 1_000,
4314+
Completed: true,
4315+
})
4316+
require.NoError(t, err)
4317+
4318+
// Build a minimal batch that already contains the sweep. Its event-loop
4319+
// channels are serviced by a helper goroutine so scheduleNextCall can
4320+
// run without spinning up the full batch.Run machinery.
4321+
testCfg := &batchConfig{
4322+
maxTimeoutDistance: defaultMaxTimeoutDistance,
4323+
}
4324+
completedBatch := &batch{
4325+
id: batchID,
4326+
state: Confirmed,
4327+
primarySweepID: sweepOutpoint,
4328+
sweeps: map[wire.OutPoint]sweep{
4329+
sweepOutpoint: {
4330+
swapHash: swapHash,
4331+
outpoint: sweepOutpoint,
4332+
value: 1_000,
4333+
confTarget: 6,
4334+
minFeeRate: 1,
4335+
},
4336+
},
4337+
callEnter: make(chan struct{}),
4338+
callLeave: make(chan struct{}),
4339+
stopping: make(chan struct{}),
4340+
finished: make(chan struct{}),
4341+
quit: make(chan struct{}),
4342+
cfg: testCfg,
4343+
store: batcherStore,
4344+
}
4345+
completedBatch.setLog(batchPrefixLogger("test-shutdown"))
4346+
4347+
// scheduleNextCall interacts with callEnter/callLeave to serialize
4348+
// access to the batch state. We don't run the full batch.Run loop in
4349+
// this test, so we spin up a helper goroutine that grants and releases
4350+
// those slots whenever a test handler grabs them via scheduleNextCall.
4351+
// The helper also closes b.stopping the first time it runs to mimic the
4352+
// behavior of a batch whose Run method already exited (which is what
4353+
// causes ErrBatchShuttingDown).
4354+
var once sync.Once
4355+
callLoopDone := make(chan struct{})
4356+
go func() {
4357+
defer close(callLoopDone)
4358+
4359+
for range completedBatch.callEnter {
4360+
once.Do(func() {
4361+
close(completedBatch.stopping)
4362+
})
4363+
4364+
<-completedBatch.callLeave
4365+
}
4366+
}()
4367+
defer func() {
4368+
// Stop the helper loop to avoid leaking the goroutine once the
4369+
// test completes. Closing callEnter unblocks the goroutine, and
4370+
// waiting on callLoopDone ensures it has drained callLeave
4371+
// before we return.
4372+
close(completedBatch.callEnter)
4373+
<-callLoopDone
4374+
}()
4375+
4376+
testBatcher := &Batcher{
4377+
batches: map[int32]*batch{batchID: completedBatch},
4378+
store: batcherStore,
4379+
chainParams: &chaincfg.TestNet3Params,
4380+
clock: clock.NewTestClock(time.Unix(0, 0)),
4381+
initialDelayProvider: zeroInitialDelay,
4382+
}
4383+
4384+
testSweep := &sweep{
4385+
swapHash: swapHash,
4386+
outpoint: sweepOutpoint,
4387+
confTarget: 6,
4388+
minFeeRate: 1,
4389+
value: 1_000,
4390+
}
4391+
4392+
err = testBatcher.handleSweeps(ctx, []*sweep{testSweep}, nil, false)
4393+
require.NoError(t, err)
4394+
}
4395+
42654396
// testCustomSignMuSig2 tests the operation with custom musig2 signer.
42664397
func testCustomSignMuSig2(t *testing.T, store testStore,
42674398
batcherStore testBatcherStore) {
@@ -5208,6 +5339,12 @@ func TestSweepBatcherHandleSweepRace(t *testing.T) {
52085339
runTests(t, testSweepBatcherHandleSweepRace)
52095340
}
52105341

5342+
// TestSweepBatcherHandleBatchShutdown covers the regression where a sweep that
5343+
// finishes while being re-added must not surface ErrBatcherShuttingDown.
5344+
func TestSweepBatcherHandleBatchShutdown(t *testing.T) {
5345+
runTests(t, testSweepBatcherHandleBatchShutdown)
5346+
}
5347+
52115348
// TestCustomSignMuSig2 tests the operation with custom musig2 signer.
52125349
func TestCustomSignMuSig2(t *testing.T) {
52135350
runTests(t, testCustomSignMuSig2)

0 commit comments

Comments
 (0)