Skip to content

Commit 61a5f9d

Browse files
committed
reservation: add reservation manager
This commit adds the reservation manager to the reservation package. This manager manages the lifecycle of reservations.
1 parent 60df5fe commit 61a5f9d

File tree

1 file changed

+265
-0
lines changed

1 file changed

+265
-0
lines changed

instantout/reservation/manager.go

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
package reservation
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/btcsuite/btcd/btcec/v2"
9+
"github.com/btcsuite/btcd/btcutil"
10+
11+
"github.com/lightninglabs/loop/fsm"
12+
reservationrpc "github.com/lightninglabs/loop/swapserverrpc"
13+
)
14+
15+
// Manager manages the reservation state machines.
16+
type Manager struct {
17+
// cfg contains all the services that the reservation manager needs to
18+
// operate.
19+
cfg *Config
20+
21+
// activeReservations contains all the active reservationsFSMs.
22+
activeReservations map[ID]*FSM
23+
24+
sync.Mutex
25+
}
26+
27+
// NewReservationManager creates a new reservation manager.
28+
func NewReservationManager(cfg *Config) *Manager {
29+
return &Manager{
30+
cfg: cfg,
31+
activeReservations: make(map[ID]*FSM),
32+
}
33+
}
34+
35+
// Run runs the reservation manager.
36+
func (m *Manager) Run(ctx context.Context, height int32) error {
37+
// todo(sputn1ck): recover swaps on startup
38+
log.Debugf("Starting reservation manager")
39+
40+
runCtx, cancel := context.WithCancel(ctx)
41+
defer cancel()
42+
43+
currentHeight := height
44+
45+
err := m.RecoverReservations(runCtx)
46+
if err != nil {
47+
return err
48+
}
49+
50+
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.
51+
RegisterBlockEpochNtfn(runCtx)
52+
if err != nil {
53+
return err
54+
}
55+
56+
reservationResChan := make(
57+
chan *reservationrpc.ServerReservationNotification,
58+
)
59+
60+
err = m.RegisterReservationNotifications(runCtx, reservationResChan)
61+
if err != nil {
62+
return err
63+
}
64+
65+
for {
66+
select {
67+
case height := <-newBlockChan:
68+
log.Debugf("Received block %v", height)
69+
currentHeight = height
70+
71+
case reservationRes := <-reservationResChan:
72+
log.Debugf("Received reservation %x",
73+
reservationRes.ReservationId)
74+
err := m.newReservation(
75+
runCtx, uint32(currentHeight), reservationRes,
76+
)
77+
if err != nil {
78+
return err
79+
}
80+
81+
case err := <-newBlockErrChan:
82+
return err
83+
84+
case <-runCtx.Done():
85+
log.Debugf("Stopping reservation manager")
86+
return nil
87+
}
88+
}
89+
}
90+
91+
// newReservation creates a new reservation from the reservation request.
92+
func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
93+
req *reservationrpc.ServerReservationNotification) error {
94+
95+
var reservationID ID
96+
err := reservationID.FromByteSlice(
97+
req.ReservationId,
98+
)
99+
if err != nil {
100+
return err
101+
}
102+
103+
serverKey, err := btcec.ParsePubKey(req.ServerKey)
104+
if err != nil {
105+
return err
106+
}
107+
108+
// Create the reservation state machine. We need to pass in the runCtx
109+
// of the reservation manager so that the state machine will keep on
110+
// running even if the grpc conte
111+
reservationFSM := NewFSM(
112+
ctx, m.cfg,
113+
)
114+
115+
// Add the reservation to the active reservations map.
116+
m.Lock()
117+
m.activeReservations[reservationID] = reservationFSM
118+
m.Unlock()
119+
120+
initContext := &InitReservationContext{
121+
reservationID: reservationID,
122+
serverPubkey: serverKey,
123+
value: btcutil.Amount(req.Value),
124+
expiry: req.Expiry,
125+
heightHint: currentHeight,
126+
}
127+
128+
// Send the init event to the state machine.
129+
go func() {
130+
err = reservationFSM.SendEvent(OnServerRequest, initContext)
131+
if err != nil {
132+
log.Errorf("Error sending init event: %v", err)
133+
}
134+
}()
135+
136+
// We'll now wait for the reservation to be in the state where it is
137+
// waiting to be confirmed.
138+
err = reservationFSM.DefaultObserver.WaitForState(
139+
ctx, time.Minute, WaitForConfirmation,
140+
fsm.WithWaitForStateOption(time.Second),
141+
)
142+
if err != nil {
143+
return err
144+
}
145+
146+
return nil
147+
}
148+
149+
// RegisterReservationNotifications registers a new reservation notification
150+
// stream.
151+
func (m *Manager) RegisterReservationNotifications(
152+
ctx context.Context, reservationChan chan *reservationrpc.
153+
ServerReservationNotification) error {
154+
155+
// In order to create a valid lsat we first are going to call
156+
// the FetchL402 method.
157+
err := m.cfg.FetchL402(ctx)
158+
if err != nil {
159+
return err
160+
}
161+
162+
// We'll now subscribe to the reservation notifications.
163+
reservationStream, err := m.cfg.ReservationClient.
164+
ReservationNotificationStream(
165+
ctx, &reservationrpc.ReservationNotificationRequest{},
166+
)
167+
if err != nil {
168+
return err
169+
}
170+
171+
// We'll now start a goroutine that will forward all the reservation
172+
// notifications to the reservationChan.
173+
go func() {
174+
for {
175+
reservationRes, err := reservationStream.Recv()
176+
if err == nil && reservationRes != nil {
177+
log.Debugf("Received reservation %x",
178+
reservationRes.ReservationId)
179+
reservationChan <- reservationRes
180+
continue
181+
}
182+
log.Errorf("Error receiving "+
183+
"reservation: %v", err)
184+
185+
reconnectTimer := time.NewTimer(time.Second * 10)
186+
187+
// If we encounter an error, we'll
188+
// try to reconnect.
189+
for {
190+
select {
191+
case <-ctx.Done():
192+
return
193+
case <-reconnectTimer.C:
194+
err = m.RegisterReservationNotifications(
195+
ctx, reservationChan,
196+
)
197+
if err == nil {
198+
log.Debugf(
199+
"Successfully " +
200+
"reconnected",
201+
)
202+
reconnectTimer.Stop()
203+
// If we were able to
204+
// reconnect, we'll
205+
// return.
206+
return
207+
}
208+
log.Errorf("Error "+
209+
"reconnecting: %v",
210+
err)
211+
212+
reconnectTimer.Reset(
213+
time.Second * 10,
214+
)
215+
}
216+
}
217+
}
218+
}()
219+
220+
return nil
221+
}
222+
223+
// RecoverReservations tries to recover all reservations that are still active
224+
// from the database.
225+
func (m *Manager) RecoverReservations(ctx context.Context) error {
226+
reservations, err := m.cfg.Store.ListReservations(ctx)
227+
if err != nil {
228+
return err
229+
}
230+
231+
for _, reservation := range reservations {
232+
if isFinalState(reservation.State) {
233+
continue
234+
}
235+
236+
log.Debugf("Recovering reservation %x", reservation.ID)
237+
238+
fsmCtx := context.WithValue(ctx, reservation.ID, nil)
239+
240+
reservationFSM := NewFSMFromReservation(
241+
fsmCtx, m.cfg, reservation,
242+
)
243+
244+
m.activeReservations[reservation.ID] = reservationFSM
245+
246+
// As SendEvent can block, we'll start a goroutine to process
247+
// the event.
248+
go func() {
249+
err := reservationFSM.SendEvent(OnRecover, nil)
250+
if err != nil {
251+
log.Errorf("FSM %v Error sending recover "+
252+
"event %v, state: %v",
253+
reservationFSM.reservation.ID, err,
254+
reservationFSM.reservation.State)
255+
}
256+
}()
257+
}
258+
259+
return nil
260+
}
261+
262+
// GetReservations retrieves all reservations from the database.
263+
func (m *Manager) GetReservations(ctx context.Context) ([]*Reservation, error) {
264+
return m.cfg.Store.ListReservations(ctx)
265+
}

0 commit comments

Comments
 (0)