Skip to content

Commit c34d04e

Browse files
committed
sweepbatcher: fix race condition when stopping
The race was detected in CI and locally when running with -race. It happened between the following calls: WARNING: DATA RACE Write at 0x00c0003e6638 by goroutine 1374: runtime.racewrite() <autogenerated>:1 +0x1e github.com/lightninglabs/loop/sweepbatcher.(*batch).Wait() sweepbatcher/sweep_batch.go:463 +0x6e github.com/lightninglabs/loop/sweepbatcher.(*Batcher).Run.func1() sweepbatcher/sweep_batcher.go:272 +0x10e Previous read at 0x00c0003e6638 by goroutine 1388: runtime.raceread() <autogenerated>:1 +0x1e github.com/lightninglabs/loop/sweepbatcher.(*batch).monitorConfirmations() sweepbatcher/sweep_batch.go:1144 +0x285 github.com/lightninglabs/loop/sweepbatcher.(*batch).handleSpend() sweepbatcher/sweep_batch.go:1309 +0x10e4 github.com/lightninglabs/loop/sweepbatcher.(*batch).Run() sweepbatcher/sweep_batch.go:526 +0xb04 github.com/lightninglabs/loop/sweepbatcher.(*Batcher).spinUpBatch.func1() sweepbatcher/sweep_batcher.go:455 +0xbd The race was caused because wg.Add(1) and wg.Wait() were running from different goroutines (one goroutine was running batch.Run() and another - batcher.Run()). To avoid this scenario, wg.Wait() call was moved into batch.Run() call, so it waits itself for its children goroutines, after which the channel b.finished is closed, and it serves a signal for external waiters (the batcher, calling batch.Wait()). Also the channel batch.stopped was renamed to batch.stopping to better reflect its nature. Added TestSweepBatcherCloseDuringAdding to make sure adding a sweep during shutting down does not cause a crash. The test did not catch the original race condition.
1 parent 8f98514 commit c34d04e

File tree

2 files changed

+116
-7
lines changed

2 files changed

+116
-7
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,12 @@ type batch struct {
193193
// main event loop.
194194
callLeave chan struct{}
195195

196-
// stopped signals that the batch has stopped.
197-
stopped chan struct{}
196+
// stopping signals that the batch is stopping.
197+
stopping chan struct{}
198+
199+
// finished signals that the batch has stopped and all child goroutines
200+
// have finished.
201+
finished chan struct{}
198202

199203
// quit is owned by the parent batcher and signals that the batch must
200204
// stop.
@@ -273,7 +277,10 @@ func (b *batch) scheduleNextCall() (func(), error) {
273277
case <-b.quit:
274278
return func() {}, ErrBatcherShuttingDown
275279

276-
case <-b.stopped:
280+
case <-b.stopping:
281+
return func() {}, ErrBatchShuttingDown
282+
283+
case <-b.finished:
277284
return func() {}, ErrBatchShuttingDown
278285
}
279286

@@ -297,7 +304,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
297304
errChan: make(chan error, 1),
298305
callEnter: make(chan struct{}),
299306
callLeave: make(chan struct{}),
300-
stopped: make(chan struct{}),
307+
stopping: make(chan struct{}),
308+
finished: make(chan struct{}),
301309
quit: bk.quit,
302310
batchTxid: bk.batchTxid,
303311
wallet: bk.wallet,
@@ -340,7 +348,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
340348
errChan: make(chan error, 1),
341349
callEnter: make(chan struct{}),
342350
callLeave: make(chan struct{}),
343-
stopped: make(chan struct{}),
351+
stopping: make(chan struct{}),
352+
finished: make(chan struct{}),
344353
quit: bk.quit,
345354
batchTxid: bk.batchTxid,
346355
batchPkScript: bk.batchPkScript,
@@ -460,16 +469,20 @@ func (b *batch) sweepExists(hash lntypes.Hash) bool {
460469
// Wait waits for the batch to gracefully stop.
461470
func (b *batch) Wait() {
462471
b.log.Infof("Stopping")
463-
b.wg.Wait()
472+
<-b.finished
464473
}
465474

466475
// Run is the batch's main event loop.
467476
func (b *batch) Run(ctx context.Context) error {
468477
runCtx, cancel := context.WithCancel(ctx)
469478
defer func() {
470479
cancel()
471-
close(b.stopped)
480+
close(b.stopping)
481+
482+
// Make sure not to call b.wg.Wait from any other place to avoid
483+
// race condition between b.wg.Add(1) and b.wg.Wait().
472484
b.wg.Wait()
485+
close(b.finished)
473486
}()
474487

475488
if b.muSig2SignSweep == nil {

sweepbatcher/sweep_batcher_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,3 +1538,99 @@ func TestSweepFetcher(t *testing.T) {
15381538
// Make sure the batcher exited without an error.
15391539
checkBatcherError(t, runErr)
15401540
}
1541+
1542+
// TestSweepBatcherCloseDuringAdding tests that sweep batcher works correctly
1543+
// if it is closed (stops running) during AddSweep call.
1544+
func TestSweepBatcherCloseDuringAdding(t *testing.T) {
1545+
defer test.Guard(t)()
1546+
1547+
lnd := test.NewMockLnd()
1548+
ctx, cancel := context.WithCancel(context.Background())
1549+
defer cancel()
1550+
1551+
store := loopdb.NewStoreMock(t)
1552+
sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams)
1553+
require.NoError(t, err)
1554+
1555+
batcherStore := NewStoreMock()
1556+
1557+
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1558+
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore,
1559+
sweepStore)
1560+
go func() {
1561+
err := batcher.Run(ctx)
1562+
checkBatcherError(t, err)
1563+
}()
1564+
1565+
// Add many swaps.
1566+
for i := byte(1); i < 255; i++ {
1567+
swapHash := lntypes.Hash{i, i, i}
1568+
1569+
// Create a swap contract.
1570+
swap := &loopdb.LoopOutContract{
1571+
SwapContract: loopdb.SwapContract{
1572+
CltvExpiry: 111,
1573+
AmountRequested: 111,
1574+
},
1575+
1576+
SwapInvoice: swapInvoice,
1577+
}
1578+
1579+
err = store.CreateLoopOut(ctx, swapHash, swap)
1580+
require.NoError(t, err)
1581+
store.AssertLoopOutStored()
1582+
}
1583+
1584+
var wg sync.WaitGroup
1585+
wg.Add(1)
1586+
go func() {
1587+
defer wg.Done()
1588+
// Add many sweeps.
1589+
for i := byte(1); i < 255; i++ {
1590+
// Create a sweep request.
1591+
sweepReq := SweepRequest{
1592+
SwapHash: lntypes.Hash{i, i, i},
1593+
Value: 111,
1594+
Outpoint: wire.OutPoint{
1595+
Hash: chainhash.Hash{i, i},
1596+
Index: 1,
1597+
},
1598+
Notifier: &dummyNotifier,
1599+
}
1600+
1601+
// Deliver sweep request to batcher.
1602+
err := batcher.AddSweep(&sweepReq)
1603+
if err == ErrBatcherShuttingDown {
1604+
break
1605+
}
1606+
require.NoError(t, err)
1607+
}
1608+
}()
1609+
1610+
wg.Add(1)
1611+
go func() {
1612+
defer wg.Done()
1613+
// Close sweepbatcher during addings.
1614+
time.Sleep(1 * time.Millisecond)
1615+
cancel()
1616+
}()
1617+
1618+
// We don't know how many spend notification registrations will be
1619+
// issued, so accept them while waiting for two goroutines to stop.
1620+
quit := make(chan struct{})
1621+
registrationChan := make(chan struct{})
1622+
go func() {
1623+
defer close(registrationChan)
1624+
for {
1625+
select {
1626+
case <-lnd.RegisterSpendChannel:
1627+
case <-quit:
1628+
return
1629+
}
1630+
}
1631+
}()
1632+
1633+
wg.Wait()
1634+
close(quit)
1635+
<-registrationChan
1636+
}

0 commit comments

Comments
 (0)