Skip to content

Commit b7c1e68

Browse files
committed
instantout: add instantout manager
1 parent ee0309f commit b7c1e68

File tree

2 files changed

+226
-24
lines changed

2 files changed

+226
-24
lines changed

instantout/manager.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package instantout
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/lightninglabs/loop/instantout/reservation"
11+
"github.com/lightningnetwork/lnd/lntypes"
12+
)
13+
14+
var (
15+
defaultStateWaitTime = 30 * time.Second
16+
defaultCltv = 100
17+
ErrSwapDoesNotExist = errors.New("swap does not exist")
18+
)
19+
20+
// Manager manages the instantout state machines.
21+
type Manager struct {
22+
// cfg contains all the services that the reservation manager needs to
23+
// operate.
24+
cfg *Config
25+
26+
// activeInstantOuts contains all the active instantouts.
27+
activeInstantOuts map[lntypes.Hash]*FSM
28+
29+
// currentHeight stores the currently best known block height.
30+
currentHeight int32
31+
32+
// blockEpochChan receives new block heights.
33+
blockEpochChan chan int32
34+
35+
runCtx context.Context
36+
37+
sync.Mutex
38+
}
39+
40+
// NewInstantOutManager creates a new instantout manager.
41+
func NewInstantOutManager(cfg *Config) *Manager {
42+
return &Manager{
43+
cfg: cfg,
44+
activeInstantOuts: make(map[lntypes.Hash]*FSM),
45+
blockEpochChan: make(chan int32),
46+
}
47+
}
48+
49+
// Run runs the instantout manager.
50+
func (m *Manager) Run(ctx context.Context, initChan chan struct{},
51+
height int32) error {
52+
53+
log.Debugf("Starting instantout manager")
54+
defer func() {
55+
log.Debugf("Stopping instantout manager")
56+
}()
57+
58+
runCtx, cancel := context.WithCancel(ctx)
59+
defer cancel()
60+
61+
m.runCtx = runCtx
62+
m.currentHeight = height
63+
64+
err := m.recoverInstantOuts(runCtx)
65+
if err != nil {
66+
close(initChan)
67+
return err
68+
}
69+
70+
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.
71+
RegisterBlockEpochNtfn(ctx)
72+
if err != nil {
73+
close(initChan)
74+
return err
75+
}
76+
77+
close(initChan)
78+
79+
for {
80+
select {
81+
case <-runCtx.Done():
82+
return nil
83+
84+
case height := <-newBlockChan:
85+
m.Lock()
86+
m.currentHeight = height
87+
m.Unlock()
88+
89+
case err := <-newBlockErrChan:
90+
return err
91+
}
92+
}
93+
}
94+
95+
// recoverInstantOuts recovers all the active instantouts from the database.
96+
func (m *Manager) recoverInstantOuts(ctx context.Context) error {
97+
// Fetch all the active instantouts from the database.
98+
activeInstantOuts, err := m.cfg.Store.ListInstantLoopOuts(ctx)
99+
if err != nil {
100+
return err
101+
}
102+
103+
for _, instantOut := range activeInstantOuts {
104+
if isFinalState(instantOut.State) {
105+
continue
106+
}
107+
108+
log.Debugf("Recovering instantout %v", instantOut.SwapHash)
109+
110+
instantOutFSM, err := NewFSMFromInstantOut(
111+
ctx, m.cfg, instantOut,
112+
)
113+
if err != nil {
114+
return err
115+
}
116+
117+
m.activeInstantOuts[instantOut.SwapHash] = instantOutFSM
118+
119+
// As SendEvent can block, we'll start a goroutine to process
120+
// the event.
121+
go func() {
122+
err := instantOutFSM.SendEvent(OnRecover, nil)
123+
if err != nil {
124+
log.Errorf("FSM %v Error sending recover "+
125+
"event %v, state: %v",
126+
instantOutFSM.InstantOut.SwapHash, err,
127+
instantOutFSM.InstantOut.State)
128+
}
129+
}()
130+
}
131+
132+
return nil
133+
}
134+
135+
// NewInstantOut creates a new instantout.
136+
func (m *Manager) NewInstantOut(ctx context.Context,
137+
reservations []reservation.ID) (*FSM, error) {
138+
139+
m.Lock()
140+
// Create the instantout request.
141+
request := &InitInstantOutCtx{
142+
cltvExpiry: m.currentHeight + int32(defaultCltv),
143+
reservations: reservations,
144+
initationHeight: m.currentHeight,
145+
protocolVersion: CurrentProtocolVersion(),
146+
}
147+
148+
instantOut, err := NewFSM(
149+
m.runCtx, m.cfg, ProtocolVersionFullReservation,
150+
)
151+
if err != nil {
152+
m.Unlock()
153+
return nil, err
154+
}
155+
m.activeInstantOuts[instantOut.InstantOut.SwapHash] = instantOut
156+
m.Unlock()
157+
158+
// Start the instantout FSM.
159+
go func() {
160+
err := instantOut.SendEvent(OnStart, request)
161+
if err != nil {
162+
log.Errorf("Error sending event: %v", err)
163+
}
164+
}()
165+
166+
// If everything went well, we'll wait for the instant out to be
167+
// waiting for sweepless sweep to be confirmed.
168+
err = instantOut.DefaultObserver.WaitForState(
169+
ctx, defaultStateWaitTime, WaitForSweeplessSweepConfirmed,
170+
)
171+
if err != nil {
172+
if instantOut.LastActionError != nil {
173+
return instantOut, fmt.Errorf(
174+
"error waiting for sweepless sweep "+
175+
"confirmed: %w", instantOut.LastActionError,
176+
)
177+
}
178+
return instantOut, nil
179+
}
180+
181+
return instantOut, nil
182+
}
183+
184+
// GetActiveInstantOut returns an active instant out.
185+
func (m *Manager) GetActiveInstantOut(swapHash lntypes.Hash) (*FSM, error) {
186+
m.Lock()
187+
defer m.Unlock()
188+
189+
fsm, ok := m.activeInstantOuts[swapHash]
190+
if !ok {
191+
return nil, ErrSwapDoesNotExist
192+
}
193+
194+
// If the instant out is in a final state, we'll remove it from the
195+
// active instant outs.
196+
if isFinalState(fsm.InstantOut.State) {
197+
delete(m.activeInstantOuts, swapHash)
198+
}
199+
200+
return fsm, nil
201+
}

instantout/reservation/manager.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type Manager struct {
2323
// activeReservations contains all the active reservationsFSMs.
2424
activeReservations map[ID]*FSM
2525

26+
runCtx context.Context
27+
2628
sync.Mutex
2729
}
2830

@@ -41,6 +43,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
4143
runCtx, cancel := context.WithCancel(ctx)
4244
defer cancel()
4345

46+
m.runCtx = runCtx
4447
currentHeight := height
4548

4649
err := m.RecoverReservations(runCtx)
@@ -58,7 +61,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
5861
chan *reservationrpc.ServerReservationNotification,
5962
)
6063

61-
err = m.RegisterReservationNotifications(runCtx, reservationResChan)
64+
err = m.RegisterReservationNotifications(reservationResChan)
6265
if err != nil {
6366
return err
6467
}
@@ -155,25 +158,29 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
155158
// RegisterReservationNotifications registers a new reservation notification
156159
// stream.
157160
func (m *Manager) RegisterReservationNotifications(
158-
ctx context.Context, reservationChan chan *reservationrpc.
159-
ServerReservationNotification) error {
161+
reservationChan chan *reservationrpc.ServerReservationNotification) error {
160162

161163
// In order to create a valid lsat we first are going to call
162164
// the FetchL402 method.
163-
err := m.cfg.FetchL402(ctx)
165+
err := m.cfg.FetchL402(m.runCtx)
164166
if err != nil {
165167
return err
166168
}
167169

170+
ctx, cancel := context.WithCancel(m.runCtx)
171+
168172
// We'll now subscribe to the reservation notifications.
169173
reservationStream, err := m.cfg.ReservationClient.
170174
ReservationNotificationStream(
171175
ctx, &reservationrpc.ReservationNotificationRequest{},
172176
)
173177
if err != nil {
178+
cancel()
174179
return err
175180
}
176181

182+
log.Debugf("Successfully subscribed to reservation notifications")
183+
177184
// We'll now start a goroutine that will forward all the reservation
178185
// notifications to the reservationChan.
179186
go func() {
@@ -188,36 +195,30 @@ func (m *Manager) RegisterReservationNotifications(
188195
log.Errorf("Error receiving "+
189196
"reservation: %v", err)
190197

191-
reconnectTimer := time.NewTimer(time.Second * 10)
198+
cancel()
192199

193200
// If we encounter an error, we'll
194201
// try to reconnect.
195202
for {
196203
select {
197-
case <-ctx.Done():
204+
case <-m.runCtx.Done():
198205
return
199-
case <-reconnectTimer.C:
206+
207+
case <-time.After(time.Second * 10):
208+
log.Debugf("Reconnecting to " +
209+
"reservation notifications")
200210
err = m.RegisterReservationNotifications(
201-
ctx, reservationChan,
211+
reservationChan,
202212
)
203-
if err == nil {
204-
log.Debugf(
205-
"Successfully " +
206-
"reconnected",
207-
)
208-
reconnectTimer.Stop()
209-
// If we were able to
210-
// reconnect, we'll
211-
// return.
212-
return
213+
if err != nil {
214+
log.Errorf("Error "+
215+
"reconnecting: %v", err)
216+
continue
213217
}
214-
log.Errorf("Error "+
215-
"reconnecting: %v",
216-
err)
217218

218-
reconnectTimer.Reset(
219-
time.Second * 10,
220-
)
219+
// If we were able to reconnect, we'll
220+
// return.
221+
return
221222
}
222223
}
223224
}

0 commit comments

Comments
 (0)