Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,10 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
}

loop.Resume(
d.mainCtx, notificationManager, swapClient.Store, d.impl.Conn, d.lnd,
)

// Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that
// something went wrong or that shutdown is complete. We don't add to
Expand Down
219 changes: 219 additions & 0 deletions loopout.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/sweep"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightninglabs/loop/utils"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/lightningnetwork/lnd/lntypes"
paymentsdb "github.com/lightningnetwork/lnd/payments/db"
"github.com/lightningnetwork/lnd/tlv"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -1524,3 +1526,220 @@ func (s *loopOutSwap) fillAssetOffchainPaymentResult(ctx context.Context,
func (s *loopOutSwap) isAssetSwap() bool {
return s.AssetSwapInfo != nil
}

const (
ResumeSwapInitiator = "resume_swap"
)

// NotificationManager handles subscribing to incoming unfinished swaps from the
// swap server.
type NotificationManager interface {
SubscribeUnfinishedSwaps(ctx context.Context,
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification
}

// resumeManager is responsible for recovering unfinished swaps after a
// client data loss event.
type resumeManager struct {
ntfnManager NotificationManager
swapStore loopdb.SwapStore
swapClient swapserverrpc.SwapServerClient
lnd *lndclient.GrpcLndServices

reqChan chan *swapserverrpc.ServerUnfinishedSwapNotification
}

// Resume starts the resume manager which listens for unfinished swaps
// from the server and attempts to recover them.
func Resume(ctx context.Context, ntfnManager NotificationManager,
swapStore loopdb.SwapStore,
swapClientConn *grpc.ClientConn,
lnd *lndclient.GrpcLndServices) {

resumeManager := &resumeManager{
ntfnManager: ntfnManager,
swapStore: swapStore,
swapClient: swapserverrpc.NewSwapServerClient(swapClientConn),
lnd: lnd,
reqChan: make(chan *swapserverrpc.ServerUnfinishedSwapNotification, 1),
}
go resumeManager.start(ctx)
}

// start begins listening for unfinished swap notifications from the server.
func (m *resumeManager) start(ctx context.Context) {
ntfnChan := m.ntfnManager.SubscribeUnfinishedSwaps(ctx)

for {
select {
case <-ctx.Done():
return

case ntfn, ok := <-ntfnChan:
if !ok {
return
}
m.reqChan <- ntfn

case req := <-m.reqChan:
err := m.handleUnfinishedSwap(ctx, req)
if err != nil {
go func() {
// wait a bit before retrying
time.Sleep(time.Second * 10)
m.reqChan <- req
}() // retry
}
}
}
}

// handleUnfinishedSwap processes an unfinished swap notification from the
// server.
func (m *resumeManager) handleUnfinishedSwap(ctx context.Context,
ntfn *swapserverrpc.ServerUnfinishedSwapNotification) error {

swapHash, err := lntypes.MakeHash(ntfn.SwapHash)
if err != nil {
return err
}

if ntfn.IsLoopIn {
return fmt.Errorf("loop in recovery not supported")
}

return m.handleUnfinishedLoopOut(ctx, swapHash)
}

func (m *resumeManager) handleUnfinishedLoopOut(ctx context.Context,
swapHash lntypes.Hash) error {

// Fetch the swap from the local store.
swap, err := m.swapStore.FetchLoopOutSwap(ctx, swapHash)
if err != nil {
return err
}

typ := swap.State().State.Type()
// Check the state of the swap and take appropriate action.
if typ == loopdb.StateTypePending || typ == loopdb.StateTypeFail {
return nil
}
trackChanCtx, cancel := context.WithCancel(ctx)
defer cancel()

// Check if the swap offchain htlc went through.
trackChan, errChan, err := m.lnd.Router.TrackPayment(
trackChanCtx, swap.Hash,
)
if err != nil {
return err
}

// Omit errors here as the payment may not have been
// initiated from this client.
trackChanLoop:
for {
select {
case <-ctx.Done():
return nil

case trackResp := <-trackChan:
if trackResp.State != lnrpc.Payment_FAILED {
return nil
}
break trackChanLoop

case <-errChan:
return err
}
}

if swap.LastUpdate().Cost.Server == 0 {
// If the server cost is zero resume the payment.
return m.resumeLoopOutPayment(ctx, swap)
}

return nil
}

// resumeLoopOutPayment attempts to resume the loop out payment for the
// specified swap.
func (m *resumeManager) resumeLoopOutPayment(ctx context.Context,
swap *loopdb.LoopOut) error {

swapRes, err := m.swapClient.NewLoopOutSwap(
ctx, &swapserverrpc.ServerLoopOutRequest{
SwapHash: swap.Hash[:],
UserAgent: ResumeSwapInitiator,
},
)
if err != nil {
return err
}

paymentReq := swapRes.SwapInvoice
// Verify the payment request before attempting payment.
inv, err := m.lnd.Client.DecodePaymentRequest(ctx, paymentReq)
if err != nil {
return fmt.Errorf("failed to decode loop out invoice: %v", err)
}

if swap.Hash != inv.Hash {
return fmt.Errorf("invoice payment hash %v does not match "+
"swap hash %v", inv.Hash, swap.Hash)
}

amtRequested := swap.Contract.AmountRequested

if inv.Value.ToSatoshis() > swap.Contract.MaxSwapFee*2+amtRequested {
return fmt.Errorf("invoice amount %v exceeds max "+
"allowed %v", inv.Value.ToSatoshis(),
swap.Contract.MaxSwapFee+amtRequested)
}

payChan, errChan, err := m.lnd.Router.SendPayment(
ctx,
lndclient.SendPaymentRequest{
Invoice: paymentReq,
Timeout: time.Hour,
MaxFee: swap.Contract.MaxSwapFee,
})
if err != nil {
return err
}
for {
select {
case payResp := <-payChan:
if payResp.FailureReason.String() != "" {
return fmt.Errorf("payment error: %v", payResp.FailureReason)
}
if payResp.State == lnrpc.Payment_SUCCEEDED {
cost := swap.LastUpdate().Cost
cost.Server = payResp.Value.ToSatoshis() - amtRequested
cost.Offchain = payResp.Fee.ToSatoshis()
// Payment succeeded.
updateTime := time.Now()

// Update state in store.
err = m.swapStore.UpdateLoopOut(
ctx, swap.Hash, updateTime,
loopdb.SwapStateData{
State: loopdb.StateSuccess,
Cost: cost,
HtlcTxHash: swap.LastUpdate().HtlcTxHash,
},
)
if err != nil {
return err
}
}

case err := <-errChan:
return fmt.Errorf("payment error: %v", err)

case <-ctx.Done():
return nil
}
}
}
64 changes: 52 additions & 12 deletions notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ const (
// NotificationTypeStaticLoopInSweepRequest is the notification type for
// static loop in sweep requests.
NotificationTypeStaticLoopInSweepRequest

// NotificationTypeUnfinishedSwap is the notification type for unfinished
// swap notifications.
NotificationTypeUnfinishedSwap
)

const (
// defaultMinAliveConnTime is the default minimum time that the
// connection to the server needs to be alive before we consider it a
// successful connection.
defaultMinAliveConnTime = time.Minute

// current_version is the current version of the notification listener.
current_version = swapserverrpc.SubscribeNotificationsRequest_V1
)

// Client is the interface that the notification manager needs to implement in
Expand Down Expand Up @@ -101,13 +108,10 @@ func (m *Manager) SubscribeReservations(ctx context.Context,

m.addSubscriber(NotificationTypeReservation, sub)

// Start a goroutine to remove the subscriber when the context is
// canceled.
go func() {
<-ctx.Done()
context.AfterFunc(ctx, func() {
m.removeSubscriber(NotificationTypeReservation, sub)
close(notifChan)
}()
})

return notifChan
}
Expand All @@ -120,22 +124,42 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
notifChan := make(
chan *swapserverrpc.ServerStaticLoopInSweepNotification, 1,
)

sub := subscriber{
subCtx: ctx,
recvChan: notifChan,
}

m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)

// Start a goroutine to remove the subscriber when the context is
// canceled.
go func() {
<-ctx.Done()
context.AfterFunc(ctx, func() {
m.removeSubscriber(
NotificationTypeStaticLoopInSweepRequest, sub,
NotificationTypeStaticLoopInSweepRequest,
sub,
)
close(notifChan)
}()
})

return notifChan
}

// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {

notifChan := make(
chan *swapserverrpc.ServerUnfinishedSwapNotification, 1,
)
sub := subscriber{
subCtx: ctx,
recvChan: notifChan,
}

m.addSubscriber(NotificationTypeUnfinishedSwap, sub)
context.AfterFunc(ctx, func() {
m.removeSubscriber(NotificationTypeUnfinishedSwap, sub)
close(notifChan)
})

return notifChan
}
Expand Down Expand Up @@ -239,7 +263,9 @@ func (m *Manager) subscribeNotifications(ctx context.Context) error {
defer cancel()

notifStream, err := m.cfg.Client.SubscribeNotifications(
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
callCtx, &swapserverrpc.SubscribeNotificationsRequest{
Version: current_version,
},
)
if err != nil {
return err
Expand Down Expand Up @@ -293,6 +319,20 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
recvChan <- staticLoopInSweepRequestNtfn
}

case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
// We'll forward the unfinished swap notification to all
// subscribers.
unfinishedSwapNtfn := ntfn.GetUnfinishedSwap()
m.Lock()
defer m.Unlock()

for _, sub := range m.subscribers[NotificationTypeUnfinishedSwap] {
recvChan := sub.recvChan.(chan *swapserverrpc.
ServerUnfinishedSwapNotification)

recvChan <- unfinishedSwapNtfn
}

default:
log.Warnf("Received unknown notification type: %v",
ntfn)
Expand Down
Loading
Loading