Skip to content

Commit bbf58ab

Browse files
committed
routing: add new method resumePayments to handle payments during
startup
1 parent 3b6e28d commit bbf58ab

File tree

1 file changed

+94
-82
lines changed

1 file changed

+94
-82
lines changed

routing/router.go

Lines changed: 94 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -336,88 +336,8 @@ func (r *ChannelRouter) Start() error {
336336

337337
// If any payments are still in flight, we resume, to make sure their
338338
// results are properly handled.
339-
payments, err := r.cfg.Control.FetchInFlightPayments()
340-
if err != nil {
341-
return err
342-
}
343-
344-
// Before we restart existing payments and start accepting more
345-
// payments to be made, we clean the network result store of the
346-
// Switch. We do this here at startup to ensure no more payments can be
347-
// made concurrently, so we know the toKeep map will be up-to-date
348-
// until the cleaning has finished.
349-
toKeep := make(map[uint64]struct{})
350-
for _, p := range payments {
351-
for _, a := range p.HTLCs {
352-
toKeep[a.AttemptID] = struct{}{}
353-
}
354-
}
355-
356-
log.Debugf("Cleaning network result store.")
357-
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
358-
return err
359-
}
360-
361-
for _, payment := range payments {
362-
log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier)
363-
r.wg.Add(1)
364-
go func(payment *channeldb.MPPayment) {
365-
defer r.wg.Done()
366-
367-
// Get the hashes used for the outstanding HTLCs.
368-
htlcs := make(map[uint64]lntypes.Hash)
369-
for _, a := range payment.HTLCs {
370-
a := a
371-
372-
// We check whether the individual attempts
373-
// have their HTLC hash set, if not we'll fall
374-
// back to the overall payment hash.
375-
hash := payment.Info.PaymentIdentifier
376-
if a.Hash != nil {
377-
hash = *a.Hash
378-
}
379-
380-
htlcs[a.AttemptID] = hash
381-
}
382-
383-
// Since we are not supporting creating more shards
384-
// after a restart (only receiving the result of the
385-
// shards already outstanding), we create a simple
386-
// shard tracker that will map the attempt IDs to
387-
// hashes used for the HTLCs. This will be enough also
388-
// for AMP payments, since we only need the hashes for
389-
// the individual HTLCs to regenerate the circuits, and
390-
// we don't currently persist the root share necessary
391-
// to re-derive them.
392-
shardTracker := shards.NewSimpleShardTracker(
393-
payment.Info.PaymentIdentifier, htlcs,
394-
)
395-
396-
// We create a dummy, empty payment session such that
397-
// we won't make another payment attempt when the
398-
// result for the in-flight attempt is received.
399-
paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
400-
401-
// We pass in a non-timeout context, to indicate we
402-
// don't need it to timeout. It will stop immediately
403-
// after the existing attempt has finished anyway. We
404-
// also set a zero fee limit, as no more routes should
405-
// be tried.
406-
noTimeout := time.Duration(0)
407-
_, _, err := r.sendPayment(
408-
context.Background(), 0,
409-
payment.Info.PaymentIdentifier, noTimeout,
410-
paySession, shardTracker,
411-
)
412-
if err != nil {
413-
log.Errorf("Resuming payment %v failed: %v.",
414-
payment.Info.PaymentIdentifier, err)
415-
return
416-
}
417-
418-
log.Infof("Resumed payment %v completed.",
419-
payment.Info.PaymentIdentifier)
420-
}(payment)
339+
if err := r.resumePayments(); err != nil {
340+
log.Error("Failed to resume payments during startup")
421341
}
422342

423343
return nil
@@ -1451,6 +1371,98 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi],
14511371
)
14521372
}
14531373

1374+
// resumePayments fetches inflight payments and resumes their payment
1375+
// lifecycles.
1376+
func (r *ChannelRouter) resumePayments() error {
1377+
// Get all payments that are inflight.
1378+
payments, err := r.cfg.Control.FetchInFlightPayments()
1379+
if err != nil {
1380+
return err
1381+
}
1382+
1383+
// Before we restart existing payments and start accepting more
1384+
// payments to be made, we clean the network result store of the
1385+
// Switch. We do this here at startup to ensure no more payments can be
1386+
// made concurrently, so we know the toKeep map will be up-to-date
1387+
// until the cleaning has finished.
1388+
toKeep := make(map[uint64]struct{})
1389+
for _, p := range payments {
1390+
for _, a := range p.HTLCs {
1391+
toKeep[a.AttemptID] = struct{}{}
1392+
}
1393+
}
1394+
1395+
log.Debugf("Cleaning network result store.")
1396+
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
1397+
return err
1398+
}
1399+
1400+
// launchPayment is a helper closure that handles resuming the payment.
1401+
launchPayment := func(payment *channeldb.MPPayment) {
1402+
defer r.wg.Done()
1403+
1404+
// Get the hashes used for the outstanding HTLCs.
1405+
htlcs := make(map[uint64]lntypes.Hash)
1406+
for _, a := range payment.HTLCs {
1407+
a := a
1408+
1409+
// We check whether the individual attempts have their
1410+
// HTLC hash set, if not we'll fall back to the overall
1411+
// payment hash.
1412+
hash := payment.Info.PaymentIdentifier
1413+
if a.Hash != nil {
1414+
hash = *a.Hash
1415+
}
1416+
1417+
htlcs[a.AttemptID] = hash
1418+
}
1419+
1420+
payHash := payment.Info.PaymentIdentifier
1421+
1422+
// Since we are not supporting creating more shards after a
1423+
// restart (only receiving the result of the shards already
1424+
// outstanding), we create a simple shard tracker that will map
1425+
// the attempt IDs to hashes used for the HTLCs. This will be
1426+
// enough also for AMP payments, since we only need the hashes
1427+
// for the individual HTLCs to regenerate the circuits, and we
1428+
// don't currently persist the root share necessary to
1429+
// re-derive them.
1430+
shardTracker := shards.NewSimpleShardTracker(payHash, htlcs)
1431+
1432+
// We create a dummy, empty payment session such that we won't
1433+
// make another payment attempt when the result for the
1434+
// in-flight attempt is received.
1435+
paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
1436+
1437+
// We pass in a non-timeout context, to indicate we don't need
1438+
// it to timeout. It will stop immediately after the existing
1439+
// attempt has finished anyway. We also set a zero fee limit,
1440+
// as no more routes should be tried.
1441+
noTimeout := time.Duration(0)
1442+
_, _, err := r.sendPayment(
1443+
context.Background(), 0, payHash, noTimeout, paySession,
1444+
shardTracker,
1445+
)
1446+
if err != nil {
1447+
log.Errorf("Resuming payment %v failed: %v", payHash,
1448+
err)
1449+
1450+
return
1451+
}
1452+
1453+
log.Infof("Resumed payment %v completed", payHash)
1454+
}
1455+
1456+
for _, payment := range payments {
1457+
log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier)
1458+
1459+
r.wg.Add(1)
1460+
go launchPayment(payment)
1461+
}
1462+
1463+
return nil
1464+
}
1465+
14541466
// getEdgeUnifiers returns a list of edge unifiers for the given route.
14551467
func getEdgeUnifiers(source route.Vertex, hops []route.Vertex,
14561468
outgoingChans map[uint64]struct{},

0 commit comments

Comments
 (0)