Skip to content

Commit cd55820

Browse files
committed
loop: add recovery manager
1 parent ded8ab5 commit cd55820

File tree

6 files changed

+622
-249
lines changed

6 files changed

+622
-249
lines changed

client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/lightninglabs/lndclient"
1818
"github.com/lightninglabs/loop/assets"
1919
"github.com/lightninglabs/loop/loopdb"
20+
"github.com/lightninglabs/loop/notifications"
2021
"github.com/lightninglabs/loop/swap"
2122
"github.com/lightninglabs/loop/sweep"
2223
"github.com/lightninglabs/loop/sweepbatcher"
@@ -106,6 +107,7 @@ type Client struct {
106107
lndServices *lndclient.LndServices
107108
sweeper *sweep.Sweeper
108109
executor *executor
110+
ntfnManager *notifications.Manager
109111

110112
resumeReady chan struct{}
111113
wg sync.WaitGroup

loopd/daemon.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,10 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
993993
}
994994
}
995995

996+
loop.Resume(
997+
d.mainCtx, notificationManager, swapClient.Store, d.impl.Conn, d.lnd,
998+
)
999+
9961000
// Last, start our internal error handler. This will return exactly one
9971001
// error or nil on the main error channel to inform the caller that
9981002
// something went wrong or that shutdown is complete. We don't add to

loopout.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/lightninglabs/lndclient"
1919
"github.com/lightninglabs/loop/loopdb"
2020
"github.com/lightninglabs/loop/swap"
21+
"github.com/lightninglabs/loop/swapserverrpc"
2122
"github.com/lightninglabs/loop/sweep"
2223
"github.com/lightninglabs/loop/sweepbatcher"
2324
"github.com/lightninglabs/loop/utils"
@@ -28,6 +29,7 @@ import (
2829
"github.com/lightningnetwork/lnd/lntypes"
2930
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
3031
"github.com/lightningnetwork/lnd/tlv"
32+
"google.golang.org/grpc"
3133
)
3234

3335
const (
@@ -1524,3 +1526,220 @@ func (s *loopOutSwap) fillAssetOffchainPaymentResult(ctx context.Context,
15241526
func (s *loopOutSwap) isAssetSwap() bool {
15251527
return s.AssetSwapInfo != nil
15261528
}
1529+
1530+
const (
1531+
ResumeSwapInitiator = "resume_swap"
1532+
)
1533+
1534+
// NotificationManager handles subscribing to incoming unfinished swaps from the
1535+
// swap server.
1536+
type NotificationManager interface {
1537+
SubscribeUnfinishedSwaps(ctx context.Context,
1538+
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification
1539+
}
1540+
1541+
// resumeManager is responsible for recovering unfinished swaps after a
1542+
// client data loss event.
1543+
type resumeManager struct {
1544+
ntfnManager NotificationManager
1545+
swapStore loopdb.SwapStore
1546+
swapClient swapserverrpc.SwapServerClient
1547+
lnd *lndclient.GrpcLndServices
1548+
1549+
reqChan chan *swapserverrpc.ServerUnfinishedSwapNotification
1550+
}
1551+
1552+
// Resume starts the resume manager which listens for unfinished swaps
1553+
// from the server and attempts to recover them.
1554+
func Resume(ctx context.Context, ntfnManager NotificationManager,
1555+
swapStore loopdb.SwapStore,
1556+
swapClientConn *grpc.ClientConn,
1557+
lnd *lndclient.GrpcLndServices) {
1558+
1559+
recoveryManager := &resumeManager{
1560+
ntfnManager: ntfnManager,
1561+
swapStore: swapStore,
1562+
swapClient: swapserverrpc.NewSwapServerClient(swapClientConn),
1563+
lnd: lnd,
1564+
reqChan: make(chan *swapserverrpc.ServerUnfinishedSwapNotification, 1),
1565+
}
1566+
go recoveryManager.start(ctx)
1567+
}
1568+
1569+
// start begins listening for unfinished swap notifications from the server.
1570+
func (m *resumeManager) start(ctx context.Context) {
1571+
ntfnChan := m.ntfnManager.SubscribeUnfinishedSwaps(ctx)
1572+
1573+
for {
1574+
select {
1575+
case <-ctx.Done():
1576+
return
1577+
1578+
case ntfn, ok := <-ntfnChan:
1579+
if !ok {
1580+
return
1581+
}
1582+
m.reqChan <- ntfn
1583+
1584+
case req := <-m.reqChan:
1585+
err := m.handleUnfinishedSwap(ctx, req)
1586+
if err != nil {
1587+
go func() {
1588+
// wait a bit before retrying
1589+
time.Sleep(time.Second * 10)
1590+
m.reqChan <- req
1591+
}() // retry
1592+
}
1593+
}
1594+
}
1595+
}
1596+
1597+
// handleUnfinishedSwap processes an unfinished swap notification from the
1598+
// server.
1599+
func (m *resumeManager) handleUnfinishedSwap(ctx context.Context,
1600+
ntfn *swapserverrpc.ServerUnfinishedSwapNotification) error {
1601+
1602+
swapHash, err := lntypes.MakeHash(ntfn.SwapHash)
1603+
if err != nil {
1604+
return err
1605+
}
1606+
1607+
if ntfn.IsLoopIn {
1608+
return fmt.Errorf("loop in recovery not implemented yet")
1609+
}
1610+
1611+
return m.handleUnfinishedLoopOut(ctx, swapHash)
1612+
}
1613+
1614+
func (m *resumeManager) handleUnfinishedLoopOut(ctx context.Context,
1615+
swapHash lntypes.Hash) error {
1616+
1617+
// Fetch the swap from the local store.
1618+
swap, err := m.swapStore.FetchLoopOutSwap(ctx, swapHash)
1619+
if err != nil {
1620+
return err
1621+
}
1622+
1623+
typ := swap.State().State.Type()
1624+
// Check the state of the swap and take appropriate action.
1625+
if typ != loopdb.StateTypePending && typ != loopdb.StateTypeFail {
1626+
trackChanCtx, cancel := context.WithCancel(ctx)
1627+
// Check if the swap offchain htlc went through.
1628+
trackChan, errChan, err := m.lnd.Router.TrackPayment(
1629+
trackChanCtx, swap.Hash,
1630+
)
1631+
if err == nil {
1632+
// Omit errors here as the payment may not have been
1633+
// initiated from this client.
1634+
trackChanLoop:
1635+
for {
1636+
select {
1637+
case <-ctx.Done():
1638+
cancel()
1639+
return nil
1640+
1641+
case trackResp := <-trackChan:
1642+
if trackResp.State !=
1643+
lnrpc.Payment_FAILED {
1644+
// Nothing more to do.
1645+
cancel()
1646+
return nil
1647+
}
1648+
break trackChanLoop
1649+
1650+
case <-errChan:
1651+
break trackChanLoop
1652+
}
1653+
}
1654+
}
1655+
cancel()
1656+
1657+
if swap.LastUpdate().Cost.Server == 0 {
1658+
// If the server cost is zero resume the payment.
1659+
return m.resumeLoopOutPayment(ctx, swap)
1660+
}
1661+
}
1662+
1663+
return nil
1664+
}
1665+
1666+
// resumeLoopOutPayment attempts to resume the loop out payment for the
1667+
// specified swap.
1668+
func (m *resumeManager) resumeLoopOutPayment(ctx context.Context,
1669+
swap *loopdb.LoopOut) error {
1670+
1671+
swapRes, err := m.swapClient.NewLoopOutSwap(
1672+
ctx, &swapserverrpc.ServerLoopOutRequest{
1673+
SwapHash: swap.Hash[:],
1674+
UserAgent: ResumeSwapInitiator,
1675+
},
1676+
)
1677+
if err != nil {
1678+
return err
1679+
}
1680+
1681+
paymentReq := swapRes.SwapInvoice
1682+
// Verify the payment request before attempting payment.
1683+
inv, err := m.lnd.Client.DecodePaymentRequest(ctx, paymentReq)
1684+
if err != nil {
1685+
return fmt.Errorf("failed to decode loop out invoice: %v", err)
1686+
}
1687+
1688+
if swap.Hash != inv.Hash {
1689+
return fmt.Errorf("invoice payment hash %v does not match "+
1690+
"swap hash %v", inv.Hash, swap.Hash)
1691+
}
1692+
1693+
amtRequested := swap.Contract.AmountRequested
1694+
1695+
if inv.Value.ToSatoshis() > swap.Contract.MaxSwapFee*2+amtRequested {
1696+
return fmt.Errorf("invoice amount %v exceeds max "+
1697+
"allowed %v", inv.Value.ToSatoshis(),
1698+
swap.Contract.MaxSwapFee+amtRequested)
1699+
}
1700+
1701+
payChan, errChan, err := m.lnd.Router.SendPayment(
1702+
ctx,
1703+
lndclient.SendPaymentRequest{
1704+
Invoice: paymentReq,
1705+
Timeout: time.Hour,
1706+
MaxFee: swap.Contract.MaxSwapFee,
1707+
})
1708+
if err != nil {
1709+
return err
1710+
}
1711+
for {
1712+
select {
1713+
case payResp := <-payChan:
1714+
if payResp.FailureReason.String() != "" {
1715+
return fmt.Errorf("payment error: %v", payResp.FailureReason)
1716+
}
1717+
if payResp.State == lnrpc.Payment_SUCCEEDED {
1718+
cost := swap.LastUpdate().Cost
1719+
cost.Server = payResp.Value.ToSatoshis() - amtRequested
1720+
cost.Offchain = payResp.Fee.ToSatoshis()
1721+
// Payment succeeded.
1722+
updateTime := time.Now()
1723+
1724+
// Update state in store.
1725+
err = m.swapStore.UpdateLoopOut(
1726+
ctx, swap.Hash, updateTime,
1727+
loopdb.SwapStateData{
1728+
State: loopdb.StateSuccess,
1729+
Cost: cost,
1730+
HtlcTxHash: swap.LastUpdate().HtlcTxHash,
1731+
},
1732+
)
1733+
if err != nil {
1734+
return err
1735+
}
1736+
}
1737+
1738+
case err := <-errChan:
1739+
return fmt.Errorf("payment error: %v", err)
1740+
1741+
case <-ctx.Done():
1742+
return nil
1743+
}
1744+
}
1745+
}

notifications/manager.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ const (
2525
// NotificationTypeStaticLoopInSweepRequest is the notification type for
2626
// static loop in sweep requests.
2727
NotificationTypeStaticLoopInSweepRequest
28+
29+
// NotificationTypeUnfinishedSwap is the notification type for unfinished
30+
// swap notifications.
31+
NotificationTypeUnfinishedSwap
2832
)
2933

3034
const (
@@ -101,13 +105,10 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
101105

102106
m.addSubscriber(NotificationTypeReservation, sub)
103107

104-
// Start a goroutine to remove the subscriber when the context is
105-
// canceled.
106-
go func() {
107-
<-ctx.Done()
108+
context.AfterFunc(ctx, func() {
108109
m.removeSubscriber(NotificationTypeReservation, sub)
109110
close(notifChan)
110-
}()
111+
})
111112

112113
return notifChan
113114
}
@@ -120,22 +121,42 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
120121
notifChan := make(
121122
chan *swapserverrpc.ServerStaticLoopInSweepNotification, 1,
122123
)
124+
123125
sub := subscriber{
124126
subCtx: ctx,
125127
recvChan: notifChan,
126128
}
127129

128130
m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)
129131

130-
// Start a goroutine to remove the subscriber when the context is
131-
// canceled.
132-
go func() {
133-
<-ctx.Done()
132+
context.AfterFunc(ctx, func() {
134133
m.removeSubscriber(
135-
NotificationTypeStaticLoopInSweepRequest, sub,
134+
NotificationTypeStaticLoopInSweepRequest,
135+
sub,
136136
)
137137
close(notifChan)
138-
}()
138+
})
139+
140+
return notifChan
141+
}
142+
143+
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
144+
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
145+
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
146+
147+
notifChan := make(
148+
chan *swapserverrpc.ServerUnfinishedSwapNotification, 1,
149+
)
150+
sub := subscriber{
151+
subCtx: ctx,
152+
recvChan: notifChan,
153+
}
154+
155+
m.addSubscriber(NotificationTypeUnfinishedSwap, sub)
156+
context.AfterFunc(ctx, func() {
157+
m.removeSubscriber(NotificationTypeUnfinishedSwap, sub)
158+
close(notifChan)
159+
})
139160

140161
return notifChan
141162
}
@@ -293,6 +314,20 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
293314
recvChan <- staticLoopInSweepRequestNtfn
294315
}
295316

317+
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
318+
// We'll forward the unfinished swap notification to all
319+
// subscribers.
320+
unfinishedSwapNtfn := ntfn.GetUnfinishedSwap()
321+
m.Lock()
322+
defer m.Unlock()
323+
324+
for _, sub := range m.subscribers[NotificationTypeUnfinishedSwap] {
325+
recvChan := sub.recvChan.(chan *swapserverrpc.
326+
ServerUnfinishedSwapNotification)
327+
328+
recvChan <- unfinishedSwapNtfn
329+
}
330+
296331
default:
297332
log.Warnf("Received unknown notification type: %v",
298333
ntfn)

0 commit comments

Comments
 (0)