Skip to content

Commit 0a182ee

Browse files
authored
Merge pull request #1029 from bhandras/resume-stuck-swaps
loop: add resume manager
2 parents 6c8bb7f + 79af5b3 commit 0a182ee

File tree

5 files changed

+738
-291
lines changed

5 files changed

+738
-291
lines changed

loopd/daemon.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,10 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
993993
}
994994
}
995995

996+
loop.Resume(
997+
d.mainCtx, notificationManager, swapClient.Store, d.impl.Conn, d.lnd,
998+
)
999+
9961000
// Last, start our internal error handler. This will return exactly one
9971001
// error or nil on the main error channel to inform the caller that
9981002
// something went wrong or that shutdown is complete. We don't add to

loopout.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/lightninglabs/lndclient"
1919
"github.com/lightninglabs/loop/loopdb"
2020
"github.com/lightninglabs/loop/swap"
21+
"github.com/lightninglabs/loop/swapserverrpc"
2122
"github.com/lightninglabs/loop/sweep"
2223
"github.com/lightninglabs/loop/sweepbatcher"
2324
"github.com/lightninglabs/loop/utils"
@@ -28,6 +29,7 @@ import (
2829
"github.com/lightningnetwork/lnd/lntypes"
2930
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
3031
"github.com/lightningnetwork/lnd/tlv"
32+
"google.golang.org/grpc"
3133
)
3234

3335
const (
@@ -1524,3 +1526,220 @@ func (s *loopOutSwap) fillAssetOffchainPaymentResult(ctx context.Context,
15241526
func (s *loopOutSwap) isAssetSwap() bool {
15251527
return s.AssetSwapInfo != nil
15261528
}
1529+
1530+
const (
1531+
ResumeSwapInitiator = "resume_swap"
1532+
)
1533+
1534+
// NotificationManager handles subscribing to incoming unfinished swaps from the
1535+
// swap server.
1536+
type NotificationManager interface {
1537+
SubscribeUnfinishedSwaps(ctx context.Context,
1538+
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification
1539+
}
1540+
1541+
// resumeManager is responsible for recovering unfinished swaps after a
1542+
// client data loss event.
1543+
type resumeManager struct {
1544+
ntfnManager NotificationManager
1545+
swapStore loopdb.SwapStore
1546+
swapClient swapserverrpc.SwapServerClient
1547+
lnd *lndclient.GrpcLndServices
1548+
1549+
reqChan chan *swapserverrpc.ServerUnfinishedSwapNotification
1550+
}
1551+
1552+
// Resume starts the resume manager which listens for unfinished swaps
1553+
// from the server and attempts to recover them.
1554+
func Resume(ctx context.Context, ntfnManager NotificationManager,
1555+
swapStore loopdb.SwapStore,
1556+
swapClientConn *grpc.ClientConn,
1557+
lnd *lndclient.GrpcLndServices) {
1558+
1559+
resumeManager := &resumeManager{
1560+
ntfnManager: ntfnManager,
1561+
swapStore: swapStore,
1562+
swapClient: swapserverrpc.NewSwapServerClient(swapClientConn),
1563+
lnd: lnd,
1564+
reqChan: make(chan *swapserverrpc.ServerUnfinishedSwapNotification, 1),
1565+
}
1566+
go resumeManager.start(ctx)
1567+
}
1568+
1569+
// start begins listening for unfinished swap notifications from the server.
1570+
func (m *resumeManager) start(ctx context.Context) {
1571+
ntfnChan := m.ntfnManager.SubscribeUnfinishedSwaps(ctx)
1572+
1573+
for {
1574+
select {
1575+
case <-ctx.Done():
1576+
return
1577+
1578+
case ntfn, ok := <-ntfnChan:
1579+
if !ok {
1580+
return
1581+
}
1582+
m.reqChan <- ntfn
1583+
1584+
case req := <-m.reqChan:
1585+
err := m.handleUnfinishedSwap(ctx, req)
1586+
if err != nil {
1587+
go func() {
1588+
// wait a bit before retrying
1589+
time.Sleep(time.Second * 10)
1590+
m.reqChan <- req
1591+
}() // retry
1592+
}
1593+
}
1594+
}
1595+
}
1596+
1597+
// handleUnfinishedSwap processes an unfinished swap notification from the
1598+
// server.
1599+
func (m *resumeManager) handleUnfinishedSwap(ctx context.Context,
1600+
ntfn *swapserverrpc.ServerUnfinishedSwapNotification) error {
1601+
1602+
swapHash, err := lntypes.MakeHash(ntfn.SwapHash)
1603+
if err != nil {
1604+
return err
1605+
}
1606+
1607+
if ntfn.IsLoopIn {
1608+
return fmt.Errorf("loop in recovery not supported")
1609+
}
1610+
1611+
return m.handleUnfinishedLoopOut(ctx, swapHash)
1612+
}
1613+
1614+
func (m *resumeManager) handleUnfinishedLoopOut(ctx context.Context,
1615+
swapHash lntypes.Hash) error {
1616+
1617+
// Fetch the swap from the local store.
1618+
swap, err := m.swapStore.FetchLoopOutSwap(ctx, swapHash)
1619+
if err != nil {
1620+
return err
1621+
}
1622+
1623+
typ := swap.State().State.Type()
1624+
// Check the state of the swap and take appropriate action.
1625+
if typ == loopdb.StateTypePending || typ == loopdb.StateTypeFail {
1626+
return nil
1627+
}
1628+
trackChanCtx, cancel := context.WithCancel(ctx)
1629+
defer cancel()
1630+
1631+
// Check if the swap offchain htlc went through.
1632+
trackChan, errChan, err := m.lnd.Router.TrackPayment(
1633+
trackChanCtx, swap.Hash,
1634+
)
1635+
if err != nil {
1636+
return err
1637+
}
1638+
1639+
// Omit errors here as the payment may not have been
1640+
// initiated from this client.
1641+
trackChanLoop:
1642+
for {
1643+
select {
1644+
case <-ctx.Done():
1645+
return nil
1646+
1647+
case trackResp := <-trackChan:
1648+
if trackResp.State != lnrpc.Payment_FAILED {
1649+
return nil
1650+
}
1651+
break trackChanLoop
1652+
1653+
case <-errChan:
1654+
return err
1655+
}
1656+
}
1657+
1658+
if swap.LastUpdate().Cost.Server == 0 {
1659+
// If the server cost is zero resume the payment.
1660+
return m.resumeLoopOutPayment(ctx, swap)
1661+
}
1662+
1663+
return nil
1664+
}
1665+
1666+
// resumeLoopOutPayment attempts to resume the loop out payment for the
1667+
// specified swap.
1668+
func (m *resumeManager) resumeLoopOutPayment(ctx context.Context,
1669+
swap *loopdb.LoopOut) error {
1670+
1671+
swapRes, err := m.swapClient.NewLoopOutSwap(
1672+
ctx, &swapserverrpc.ServerLoopOutRequest{
1673+
SwapHash: swap.Hash[:],
1674+
UserAgent: ResumeSwapInitiator,
1675+
},
1676+
)
1677+
if err != nil {
1678+
return err
1679+
}
1680+
1681+
paymentReq := swapRes.SwapInvoice
1682+
// Verify the payment request before attempting payment.
1683+
inv, err := m.lnd.Client.DecodePaymentRequest(ctx, paymentReq)
1684+
if err != nil {
1685+
return fmt.Errorf("failed to decode loop out invoice: %v", err)
1686+
}
1687+
1688+
if swap.Hash != inv.Hash {
1689+
return fmt.Errorf("invoice payment hash %v does not match "+
1690+
"swap hash %v", inv.Hash, swap.Hash)
1691+
}
1692+
1693+
amtRequested := swap.Contract.AmountRequested
1694+
1695+
if inv.Value.ToSatoshis() > swap.Contract.MaxSwapFee*2+amtRequested {
1696+
return fmt.Errorf("invoice amount %v exceeds max "+
1697+
"allowed %v", inv.Value.ToSatoshis(),
1698+
swap.Contract.MaxSwapFee+amtRequested)
1699+
}
1700+
1701+
payChan, errChan, err := m.lnd.Router.SendPayment(
1702+
ctx,
1703+
lndclient.SendPaymentRequest{
1704+
Invoice: paymentReq,
1705+
Timeout: time.Hour,
1706+
MaxFee: swap.Contract.MaxSwapFee,
1707+
})
1708+
if err != nil {
1709+
return err
1710+
}
1711+
for {
1712+
select {
1713+
case payResp := <-payChan:
1714+
if payResp.FailureReason.String() != "" {
1715+
return fmt.Errorf("payment error: %v", payResp.FailureReason)
1716+
}
1717+
if payResp.State == lnrpc.Payment_SUCCEEDED {
1718+
cost := swap.LastUpdate().Cost
1719+
cost.Server = payResp.Value.ToSatoshis() - amtRequested
1720+
cost.Offchain = payResp.Fee.ToSatoshis()
1721+
// Payment succeeded.
1722+
updateTime := time.Now()
1723+
1724+
// Update state in store.
1725+
err = m.swapStore.UpdateLoopOut(
1726+
ctx, swap.Hash, updateTime,
1727+
loopdb.SwapStateData{
1728+
State: loopdb.StateSuccess,
1729+
Cost: cost,
1730+
HtlcTxHash: swap.LastUpdate().HtlcTxHash,
1731+
},
1732+
)
1733+
if err != nil {
1734+
return err
1735+
}
1736+
}
1737+
1738+
case err := <-errChan:
1739+
return fmt.Errorf("payment error: %v", err)
1740+
1741+
case <-ctx.Done():
1742+
return nil
1743+
}
1744+
}
1745+
}

notifications/manager.go

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,20 @@ const (
2525
// NotificationTypeStaticLoopInSweepRequest is the notification type for
2626
// static loop in sweep requests.
2727
NotificationTypeStaticLoopInSweepRequest
28+
29+
// NotificationTypeUnfinishedSwap is the notification type for unfinished
30+
// swap notifications.
31+
NotificationTypeUnfinishedSwap
2832
)
2933

3034
const (
3135
// defaultMinAliveConnTime is the default minimum time that the
3236
// connection to the server needs to be alive before we consider it a
3337
// successful connection.
3438
defaultMinAliveConnTime = time.Minute
39+
40+
// current_version is the current version of the notification listener.
41+
current_version = swapserverrpc.SubscribeNotificationsRequest_V1
3542
)
3643

3744
// Client is the interface that the notification manager needs to implement in
@@ -101,13 +108,10 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
101108

102109
m.addSubscriber(NotificationTypeReservation, sub)
103110

104-
// Start a goroutine to remove the subscriber when the context is
105-
// canceled.
106-
go func() {
107-
<-ctx.Done()
111+
context.AfterFunc(ctx, func() {
108112
m.removeSubscriber(NotificationTypeReservation, sub)
109113
close(notifChan)
110-
}()
114+
})
111115

112116
return notifChan
113117
}
@@ -120,22 +124,42 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
120124
notifChan := make(
121125
chan *swapserverrpc.ServerStaticLoopInSweepNotification, 1,
122126
)
127+
123128
sub := subscriber{
124129
subCtx: ctx,
125130
recvChan: notifChan,
126131
}
127132

128133
m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)
129134

130-
// Start a goroutine to remove the subscriber when the context is
131-
// canceled.
132-
go func() {
133-
<-ctx.Done()
135+
context.AfterFunc(ctx, func() {
134136
m.removeSubscriber(
135-
NotificationTypeStaticLoopInSweepRequest, sub,
137+
NotificationTypeStaticLoopInSweepRequest,
138+
sub,
136139
)
137140
close(notifChan)
138-
}()
141+
})
142+
143+
return notifChan
144+
}
145+
146+
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
147+
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
148+
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
149+
150+
notifChan := make(
151+
chan *swapserverrpc.ServerUnfinishedSwapNotification, 1,
152+
)
153+
sub := subscriber{
154+
subCtx: ctx,
155+
recvChan: notifChan,
156+
}
157+
158+
m.addSubscriber(NotificationTypeUnfinishedSwap, sub)
159+
context.AfterFunc(ctx, func() {
160+
m.removeSubscriber(NotificationTypeUnfinishedSwap, sub)
161+
close(notifChan)
162+
})
139163

140164
return notifChan
141165
}
@@ -239,7 +263,9 @@ func (m *Manager) subscribeNotifications(ctx context.Context) error {
239263
defer cancel()
240264

241265
notifStream, err := m.cfg.Client.SubscribeNotifications(
242-
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
266+
callCtx, &swapserverrpc.SubscribeNotificationsRequest{
267+
Version: current_version,
268+
},
243269
)
244270
if err != nil {
245271
return err
@@ -293,6 +319,20 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
293319
recvChan <- staticLoopInSweepRequestNtfn
294320
}
295321

322+
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
323+
// We'll forward the unfinished swap notification to all
324+
// subscribers.
325+
unfinishedSwapNtfn := ntfn.GetUnfinishedSwap()
326+
m.Lock()
327+
defer m.Unlock()
328+
329+
for _, sub := range m.subscribers[NotificationTypeUnfinishedSwap] {
330+
recvChan := sub.recvChan.(chan *swapserverrpc.
331+
ServerUnfinishedSwapNotification)
332+
333+
recvChan <- unfinishedSwapNtfn
334+
}
335+
296336
default:
297337
log.Warnf("Received unknown notification type: %v",
298338
ntfn)

0 commit comments

Comments
 (0)