Skip to content

Commit de969fd

Browse files
authored
Merge pull request #1042 from starius/sweepbatcher-fix-race-addsweeps
sweepbatcher: handle batch shutdown during re-add
2 parents eb8c3dd + 37eb346 commit de969fd

File tree

3 files changed

+150
-4
lines changed

3 files changed

+150
-4
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2439,6 +2439,7 @@ func (b *batch) isComplete() bool {
24392439
if err != nil && err != ErrBatchShuttingDown {
24402440
return false
24412441
}
2442+
24422443
return b.state == Confirmed
24432444
}
24442445

sweepbatcher/sweep_batcher.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,15 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
906906
for _, batch := range b.batches {
907907
if batch.sweepExists(sweep.outpoint) {
908908
accepted, err := batch.addSweeps(ctx, sweeps)
909-
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
909+
if errors.Is(err, ErrBatchShuttingDown) {
910+
// The batch finished while we were trying to
911+
// re-add the sweep. Leave it in the map for the
912+
// lazy cleanup below and fall back to the
913+
// monitorSpendAndNotify path below.
914+
break
915+
}
916+
917+
if err != nil {
910918
return err
911919
}
912920

sweepbatcher/sweep_batcher_test.go

Lines changed: 140 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/btcsuite/btcd/blockchain"
1414
"github.com/btcsuite/btcd/btcec/v2"
1515
"github.com/btcsuite/btcd/btcutil"
16+
"github.com/btcsuite/btcd/chaincfg"
1617
"github.com/btcsuite/btcd/chaincfg/chainhash"
1718
"github.com/btcsuite/btcd/wire"
1819
"github.com/btcsuite/btclog/v2"
@@ -4149,9 +4150,9 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore,
41494150
go func() {
41504151
defer addWG.Done()
41514152

4152-
// After this goroutine completes, stop the goroutine that handles
4153-
// registrations as well. Give it one second to finish the last
4154-
// AddSweep to prevent goroutine leaks.
4153+
// After this goroutine completes, stop the goroutine that
4154+
// handles registrations as well. Give it one second to finish
4155+
// the last AddSweep to prevent goroutine leaks.
41554156
defer time.AfterFunc(time.Second, confCancel)
41564157

41574158
for {
@@ -4262,6 +4263,136 @@ func testSweepBatcherHandleSweepRace(t *testing.T, store testStore,
42624263
checkBatcherError(t, runErr)
42634264
}
42644265

4266+
// testSweepBatcherHandleBatchShutdown simulates a sweep that is re-added after
4267+
// its batch has already completed and begun shutting down. Before the fix,
4268+
// addSweeps surfaced ErrBatchShuttingDown, causing handleSweeps to return an
4269+
// error and, ultimately, AddSweep to deliver ErrBatcherShuttingDown back to
4270+
// the caller. The regression ensures we fall back to the persisted sweep
4271+
// status and silently switch to the monitor path.
4272+
func testSweepBatcherHandleBatchShutdown(t *testing.T, store testStore,
4273+
batcherStore testBatcherStore) {
4274+
4275+
defer test.Guard(t)()
4276+
4277+
ctx := context.Background()
4278+
swapHash := lntypes.Hash{2, 2, 2}
4279+
sweepOutpoint := wire.OutPoint{
4280+
Hash: chainhash.Hash{0, 0, 0, 2},
4281+
Index: 1,
4282+
}
4283+
4284+
swap := &loopdb.LoopOutContract{
4285+
SwapContract: loopdb.SwapContract{
4286+
CltvExpiry: 144,
4287+
AmountRequested: 1_000,
4288+
ProtocolVersion: loopdb.ProtocolVersionMuSig2,
4289+
HtlcKeys: htlcKeys,
4290+
Preimage: lntypes.Preimage{2},
4291+
},
4292+
DestAddr: destAddr,
4293+
SwapInvoice: swapInvoice,
4294+
SweepConfTarget: confTarget,
4295+
}
4296+
err := store.CreateLoopOut(ctx, swapHash, swap)
4297+
require.NoError(t, err)
4298+
4299+
// Insert a confirmed batch/sweep pair directly into the store so
4300+
// GetSweepStatus/GetParentBatch report that the swap already finished.
4301+
dbEntry := &dbBatch{}
4302+
batchID, err := batcherStore.InsertSweepBatch(ctx, dbEntry)
4303+
require.NoError(t, err)
4304+
4305+
dbEntry.ID = batchID
4306+
dbEntry.Confirmed = true
4307+
require.NoError(t, batcherStore.UpdateSweepBatch(ctx, dbEntry))
4308+
4309+
err = batcherStore.UpsertSweep(ctx, &dbSweep{
4310+
BatchID: batchID,
4311+
SwapHash: swapHash,
4312+
Outpoint: sweepOutpoint,
4313+
Amount: 1_000,
4314+
Completed: true,
4315+
})
4316+
require.NoError(t, err)
4317+
4318+
// Build a minimal batch that already contains the sweep. Its event-loop
4319+
// channels are serviced by a helper goroutine so scheduleNextCall can
4320+
// run without spinning up the full batch.Run machinery.
4321+
testCfg := &batchConfig{
4322+
maxTimeoutDistance: defaultMaxTimeoutDistance,
4323+
}
4324+
completedBatch := &batch{
4325+
id: batchID,
4326+
state: Confirmed,
4327+
primarySweepID: sweepOutpoint,
4328+
sweeps: map[wire.OutPoint]sweep{
4329+
sweepOutpoint: {
4330+
swapHash: swapHash,
4331+
outpoint: sweepOutpoint,
4332+
value: 1_000,
4333+
confTarget: 6,
4334+
minFeeRate: 1,
4335+
},
4336+
},
4337+
callEnter: make(chan struct{}),
4338+
callLeave: make(chan struct{}),
4339+
stopping: make(chan struct{}),
4340+
finished: make(chan struct{}),
4341+
quit: make(chan struct{}),
4342+
cfg: testCfg,
4343+
store: batcherStore,
4344+
}
4345+
completedBatch.setLog(batchPrefixLogger("test-shutdown"))
4346+
4347+
// scheduleNextCall interacts with callEnter/callLeave to serialize
4348+
// access to the batch state. We don't run the full batch.Run loop in
4349+
// this test, so we spin up a helper goroutine that grants and releases
4350+
// those slots whenever a test handler grabs them via scheduleNextCall.
4351+
// The helper also closes b.stopping the first time it runs to mimic the
4352+
// behavior of a batch whose Run method already exited (which is what
4353+
// causes ErrBatchShuttingDown).
4354+
var once sync.Once
4355+
callLoopDone := make(chan struct{})
4356+
go func() {
4357+
defer close(callLoopDone)
4358+
4359+
for range completedBatch.callEnter {
4360+
once.Do(func() {
4361+
close(completedBatch.stopping)
4362+
})
4363+
4364+
<-completedBatch.callLeave
4365+
}
4366+
}()
4367+
defer func() {
4368+
// Stop the helper loop to avoid leaking the goroutine once the
4369+
// test completes. Closing callEnter unblocks the goroutine, and
4370+
// waiting on callLoopDone ensures it has drained callLeave
4371+
// before we return.
4372+
close(completedBatch.callEnter)
4373+
<-callLoopDone
4374+
}()
4375+
4376+
testBatcher := &Batcher{
4377+
batches: map[int32]*batch{batchID: completedBatch},
4378+
store: batcherStore,
4379+
chainParams: &chaincfg.TestNet3Params,
4380+
clock: clock.NewTestClock(time.Unix(0, 0)),
4381+
initialDelayProvider: zeroInitialDelay,
4382+
}
4383+
4384+
testSweep := &sweep{
4385+
swapHash: swapHash,
4386+
outpoint: sweepOutpoint,
4387+
confTarget: 6,
4388+
minFeeRate: 1,
4389+
value: 1_000,
4390+
}
4391+
4392+
err = testBatcher.handleSweeps(ctx, []*sweep{testSweep}, nil, false)
4393+
require.NoError(t, err)
4394+
}
4395+
42654396
// testCustomSignMuSig2 tests the operation with custom musig2 signer.
42664397
func testCustomSignMuSig2(t *testing.T, store testStore,
42674398
batcherStore testBatcherStore) {
@@ -5208,6 +5339,12 @@ func TestSweepBatcherHandleSweepRace(t *testing.T) {
52085339
runTests(t, testSweepBatcherHandleSweepRace)
52095340
}
52105341

5342+
// TestSweepBatcherHandleBatchShutdown covers the regression where a sweep that
5343+
// finishes while being re-added must not surface ErrBatcherShuttingDown.
5344+
func TestSweepBatcherHandleBatchShutdown(t *testing.T) {
5345+
runTests(t, testSweepBatcherHandleBatchShutdown)
5346+
}
5347+
52115348
// TestCustomSignMuSig2 tests the operation with custom musig2 signer.
52125349
func TestCustomSignMuSig2(t *testing.T) {
52135350
runTests(t, testCustomSignMuSig2)

0 commit comments

Comments
 (0)