Skip to content

Commit 2aa5962

Browse files
committed
notifications: add notification manager
This commit adds a generic notification manager that can be used to subscribe to different types of notifications.
1 parent f4347b8 commit 2aa5962

File tree

2 files changed

+240
-0
lines changed

2 files changed

+240
-0
lines changed

notifications/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package notifications
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "NTF"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

notifications/manager.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"encoding/hex"
7+
"time"
8+
9+
"github.com/lightninglabs/loop/swapserverrpc"
10+
)
11+
12+
type NotificationType int
13+
14+
const (
15+
NotificationTypeUnknown NotificationType = iota
16+
NotificationTypeReservation
17+
)
18+
19+
// Config contains all the services that the notification manager needs to
20+
// operate.
21+
type Config struct {
22+
// Client is the client used to communicate with the swap server.
23+
Client swapserverrpc.SwapServerClient
24+
25+
// FetchL402 is the function used to fetch the l402 token.
26+
FetchL402 func(context.Context) error
27+
}
28+
29+
// Manager is a manager for notifications that the swap server sends to the
30+
// client.
31+
type Manager struct {
32+
cfg *Config
33+
34+
hasL402 bool
35+
36+
runCtx context.Context
37+
38+
subscribers map[NotificationType]map[string]func(*swapserverrpc.SubscribeNotificationsResponse) //nolint:lll
39+
}
40+
41+
// NewManager creates a new notification manager.
42+
func NewManager(cfg *Config) *Manager {
43+
return &Manager{
44+
cfg: cfg,
45+
subscribers: make(map[NotificationType]map[string]func(*swapserverrpc.SubscribeNotificationsResponse)), //nolint:lll
46+
}
47+
}
48+
49+
// SubscribeReservations subscribes to the reservation notifications.
50+
func (n *Manager) SubscribeReservations(
51+
callback func(*swapserverrpc.ServerReservationNotification)) string {
52+
53+
fn := func(notif *swapserverrpc.SubscribeNotificationsResponse) {
54+
callback(notif.GetReservationNotification())
55+
}
56+
return n.subscribe(NotificationTypeReservation, fn)
57+
}
58+
59+
// subscribe subscribes to a notification type and returns a unique ID that can
60+
// be used to unsubscribe.
61+
func (n *Manager) subscribe(notificationType NotificationType,
62+
callback func(*swapserverrpc.SubscribeNotificationsResponse)) string {
63+
if n.subscribers[notificationType] == nil {
64+
n.subscribers[notificationType] = make(map[string]func(*swapserverrpc.SubscribeNotificationsResponse)) //nolint:lll
65+
}
66+
67+
// Generate a unique ID for this subscription
68+
id := generateUniqueID()
69+
70+
n.subscribers[notificationType][id] = callback
71+
72+
return id
73+
}
74+
75+
// Unsubscribe removes a subscription based on its ID.
76+
func (n *Manager) Unsubscribe(id string) {
77+
for _, subscribers := range n.subscribers {
78+
delete(subscribers, id)
79+
}
80+
}
81+
82+
// Run starts the notification manager.
83+
func (n *Manager) Run(ctx context.Context, readyChan chan struct{},
84+
) error {
85+
86+
n.runCtx = ctx
87+
88+
err := n.RegisterNotifications()
89+
if err != nil {
90+
return err
91+
}
92+
93+
// Notify the caller that we're ready, this means we have successfully
94+
// fetched a l402 token and have subscribed to the notifications at least
95+
// once.
96+
close(readyChan)
97+
98+
for {
99+
select {
100+
case <-ctx.Done():
101+
return nil
102+
}
103+
}
104+
}
105+
106+
// RegisterNotifications registers a new server notification stream.
107+
func (n *Manager) RegisterNotifications() error {
108+
109+
// In order to create a valid l402 we first are going to call
110+
// the FetchL402 method. As a client might not have outbound capacity
111+
// yet, we'll retry until we get a valid response.
112+
if !n.hasL402 {
113+
n.fetchL402(n.runCtx)
114+
}
115+
116+
ctx, cancel := context.WithCancel(n.runCtx)
117+
118+
// We'll now subscribe to the swap server notifications.
119+
notifStream, err := n.cfg.Client.SubscribeNotifications(
120+
ctx, &swapserverrpc.SubscribeNotificationsRequest{},
121+
)
122+
if err != nil {
123+
cancel()
124+
return err
125+
}
126+
127+
log.Debugf("Successfully subscribed to server notifications")
128+
129+
// We'll now start a goroutine that will handle all the incoming
130+
// notifications.
131+
go func() {
132+
for {
133+
notification, err := notifStream.Recv()
134+
if err == nil && notification != nil {
135+
log.Debugf("Received notification: %v", notification)
136+
n.handleNotification(notification)
137+
continue
138+
}
139+
log.Errorf("Error receiving "+
140+
"notification: %v", err)
141+
142+
cancel()
143+
144+
// If we encounter an error, we'll
145+
// try to reconnect.
146+
for {
147+
select {
148+
case <-n.runCtx.Done():
149+
return
150+
151+
case <-time.After(time.Second * 10):
152+
log.Debugf("Reconnecting to server notifications")
153+
err = n.RegisterNotifications()
154+
if err != nil {
155+
log.Errorf("Error reconnecting: %v", err)
156+
continue
157+
}
158+
159+
// If we were able to reconnect, we'll
160+
// return.
161+
return
162+
}
163+
}
164+
}
165+
}()
166+
167+
return nil
168+
}
169+
170+
// fetchL402 fetches the L402 from the server. This method will keep on
171+
// retrying until it gets a valid response.
172+
func (m *Manager) fetchL402(ctx context.Context) {
173+
// Add a 0 timer so that we initially fetch the L402 immediately.
174+
timer := time.NewTimer(0)
175+
for {
176+
select {
177+
case <-ctx.Done():
178+
return
179+
180+
case <-timer.C:
181+
err := m.cfg.FetchL402(ctx)
182+
if err != nil {
183+
log.Warnf("Error fetching L402: %v", err)
184+
timer.Reset(time.Second * 10)
185+
continue
186+
}
187+
m.hasL402 = true
188+
return
189+
}
190+
}
191+
}
192+
193+
// handleNotification handles an incoming notification from the server,
194+
// forwarding it to the appropriate subscribers.
195+
func (n *Manager) handleNotification(notification *swapserverrpc.
196+
SubscribeNotificationsResponse) {
197+
198+
switch notification.Notification.(type) {
199+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
200+
for _, fn := range n.subscribers[NotificationTypeReservation] {
201+
fn(notification)
202+
}
203+
}
204+
}
205+
206+
// generateUniqueID generates a unique ID for a subscription.
207+
func generateUniqueID() string {
208+
var b [16]byte
209+
_, err := rand.Read(b[:])
210+
if err != nil {
211+
panic(err)
212+
}
213+
return hex.EncodeToString(b[:])
214+
}

0 commit comments

Comments
 (0)