Skip to content

Commit 0b2c177

Browse files
committed
sweepbatcher: exit early in handleSweep
If the sweep was successfully updated in the batch, no need to try to add it to all other batches. Added a test reproducing adding a sweep to both batches without this change.
1 parent a135eb8 commit 0b2c177

File tree

2 files changed

+191
-0
lines changed

2 files changed

+191
-0
lines changed

sweepbatcher/sweep_batcher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
331331
"accepted by batch %d", sweep.swapHash[:6],
332332
batch.id)
333333
}
334+
335+
// The sweep was updated in the batch, our job is done.
336+
return nil
334337
}
335338
}
336339

sweepbatcher/sweep_batcher_test.go

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,3 +1111,191 @@ func TestRestoringEmptyBatch(t *testing.T) {
11111111

11121112
checkBatcherError(t, runErr)
11131113
}
1114+
1115+
type loopStoreMock struct {
1116+
loops map[lntypes.Hash]*loopdb.LoopOut
1117+
mu sync.Mutex
1118+
}
1119+
1120+
func newLoopStoreMock() *loopStoreMock {
1121+
return &loopStoreMock{
1122+
loops: make(map[lntypes.Hash]*loopdb.LoopOut),
1123+
}
1124+
}
1125+
1126+
func (s *loopStoreMock) FetchLoopOutSwap(ctx context.Context,
1127+
hash lntypes.Hash) (*loopdb.LoopOut, error) {
1128+
1129+
s.mu.Lock()
1130+
defer s.mu.Unlock()
1131+
1132+
out, has := s.loops[hash]
1133+
if !has {
1134+
return nil, errors.New("loop not found")
1135+
}
1136+
1137+
return out, nil
1138+
}
1139+
1140+
func (s *loopStoreMock) putLoopOutSwap(hash lntypes.Hash, out *loopdb.LoopOut) {
1141+
s.mu.Lock()
1142+
defer s.mu.Unlock()
1143+
1144+
s.loops[hash] = out
1145+
}
1146+
1147+
// TestHandleSweepTwice tests that handing the same sweep twice must not
1148+
// add it to different batches.
1149+
func TestHandleSweepTwice(t *testing.T) {
1150+
defer test.Guard(t)()
1151+
1152+
lnd := test.NewMockLnd()
1153+
ctx, cancel := context.WithCancel(context.Background())
1154+
1155+
store := newLoopStoreMock()
1156+
1157+
batcherStore := NewStoreMock()
1158+
1159+
batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer,
1160+
testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store)
1161+
1162+
var wg sync.WaitGroup
1163+
wg.Add(1)
1164+
1165+
var runErr error
1166+
go func() {
1167+
defer wg.Done()
1168+
runErr = batcher.Run(ctx)
1169+
}()
1170+
1171+
// Wait for the batcher to be initialized.
1172+
<-batcher.initDone
1173+
1174+
const shortCltv = 111
1175+
const longCltv = 111 + defaultMaxTimeoutDistance + 6
1176+
1177+
// Create two sweep requests with CltvExpiry distant from each other
1178+
// to go assigned to separate batches.
1179+
sweepReq1 := SweepRequest{
1180+
SwapHash: lntypes.Hash{1, 1, 1},
1181+
Value: 111,
1182+
Outpoint: wire.OutPoint{
1183+
Hash: chainhash.Hash{1, 1},
1184+
Index: 1,
1185+
},
1186+
Notifier: &dummyNotifier,
1187+
}
1188+
1189+
loopOut1 := &loopdb.LoopOut{
1190+
Loop: loopdb.Loop{
1191+
Hash: lntypes.Hash{1, 1, 1},
1192+
},
1193+
Contract: &loopdb.LoopOutContract{
1194+
SwapContract: loopdb.SwapContract{
1195+
CltvExpiry: shortCltv,
1196+
AmountRequested: 111,
1197+
},
1198+
SwapInvoice: swapInvoice,
1199+
},
1200+
}
1201+
1202+
sweepReq2 := SweepRequest{
1203+
SwapHash: lntypes.Hash{2, 2, 2},
1204+
Value: 222,
1205+
Outpoint: wire.OutPoint{
1206+
Hash: chainhash.Hash{2, 2},
1207+
Index: 2,
1208+
},
1209+
Notifier: &dummyNotifier,
1210+
}
1211+
1212+
loopOut2 := &loopdb.LoopOut{
1213+
Loop: loopdb.Loop{
1214+
Hash: lntypes.Hash{2, 2, 2},
1215+
},
1216+
Contract: &loopdb.LoopOutContract{
1217+
SwapContract: loopdb.SwapContract{
1218+
CltvExpiry: longCltv,
1219+
AmountRequested: 222,
1220+
},
1221+
SwapInvoice: swapInvoice,
1222+
},
1223+
}
1224+
1225+
store.putLoopOutSwap(sweepReq1.SwapHash, loopOut1)
1226+
store.putLoopOutSwap(sweepReq2.SwapHash, loopOut2)
1227+
1228+
// Deliver sweep request to batcher.
1229+
require.NoError(t, batcher.AddSweep(&sweepReq1))
1230+
1231+
// Since two batches were created we check that it registered for its
1232+
// primary sweep's spend.
1233+
<-lnd.RegisterSpendChannel
1234+
1235+
// Deliver the second sweep. It will go to a separate batch,
1236+
// since CltvExpiry values are distant enough.
1237+
require.NoError(t, batcher.AddSweep(&sweepReq2))
1238+
<-lnd.RegisterSpendChannel
1239+
1240+
// Once batcher receives sweep request it will eventually spin up
1241+
// batches.
1242+
require.Eventually(t, func() bool {
1243+
// Make sure that the sweep was stored and we have exactly one
1244+
// active batch.
1245+
return batcherStore.AssertSweepStored(sweepReq1.SwapHash) &&
1246+
batcherStore.AssertSweepStored(sweepReq2.SwapHash) &&
1247+
len(batcher.batches) == 2
1248+
}, test.Timeout, eventuallyCheckFrequency)
1249+
1250+
// Change the second sweep so that it can be added to the first batch.
1251+
// Change CltvExpiry.
1252+
loopOut2 = &loopdb.LoopOut{
1253+
Loop: loopdb.Loop{
1254+
Hash: lntypes.Hash{2, 2, 2},
1255+
},
1256+
Contract: &loopdb.LoopOutContract{
1257+
SwapContract: loopdb.SwapContract{
1258+
CltvExpiry: shortCltv,
1259+
AmountRequested: 222,
1260+
},
1261+
SwapInvoice: swapInvoice,
1262+
},
1263+
}
1264+
store.putLoopOutSwap(sweepReq2.SwapHash, loopOut2)
1265+
1266+
// Re-add the second sweep. It is expected to stay in second batch,
1267+
// not added to both batches.
1268+
require.NoError(t, batcher.AddSweep(&sweepReq2))
1269+
1270+
require.Eventually(t, func() bool {
1271+
// Make sure there are two batches.
1272+
batches := batcher.batches
1273+
if len(batches) != 2 {
1274+
return false
1275+
}
1276+
1277+
// Make sure the second batch has the second sweep.
1278+
sweep2, has := batches[1].sweeps[sweepReq2.SwapHash]
1279+
if !has {
1280+
return false
1281+
}
1282+
1283+
// Make sure the second sweep's timeout has been updated.
1284+
if sweep2.timeout != shortCltv {
1285+
return false
1286+
}
1287+
1288+
return true
1289+
}, test.Timeout, eventuallyCheckFrequency)
1290+
1291+
// Make sure each batch has one sweep. If the second sweep was added to
1292+
// both batches, the following check won't pass.
1293+
require.Equal(t, 1, len(batcher.batches[0].sweeps))
1294+
require.Equal(t, 1, len(batcher.batches[1].sweeps))
1295+
1296+
// Now make it quit by canceling the context.
1297+
cancel()
1298+
wg.Wait()
1299+
1300+
checkBatcherError(t, runErr)
1301+
}

0 commit comments

Comments
 (0)