Skip to content

Commit 5608a8d

Browse files
committed
notifications: add notification manager
This commit adds a generic notification manager that can be used to subscribe to different types of notifications.
1 parent aa595b3 commit 5608a8d

File tree

3 files changed

+403
-0
lines changed

3 files changed

+403
-0
lines changed

notifications/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package notifications
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "NTFNS"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

notifications/manager.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/lightninglabs/loop/swapserverrpc"
9+
"google.golang.org/grpc"
10+
)
11+
12+
// NotificationType is the type of notification that the manager can handle.
13+
type NotificationType int
14+
15+
const (
16+
// NotificationTypeUnknown is the default notification type.
17+
NotificationTypeUnknown NotificationType = iota
18+
19+
// NotificationTypeReservation is the notification type for reservation
20+
// notifications.
21+
NotificationTypeReservation
22+
)
23+
24+
// Client is the interface that the notification manager needs to implement in
25+
// order to be able to subscribe to notifications.
26+
type Client interface {
27+
// SubscribeNotifications subscribes to the notifications from the server.
28+
SubscribeNotifications(ctx context.Context,
29+
in *swapserverrpc.SubscribeNotificationsRequest,
30+
opts ...grpc.CallOption) (
31+
swapserverrpc.SwapServer_SubscribeNotificationsClient, error)
32+
}
33+
34+
// Config contains all the services that the notification manager needs to
35+
// operate.
36+
type Config struct {
37+
// Client is the client used to communicate with the swap server.
38+
Client Client
39+
40+
// FetchL402 is the function used to fetch the l402 token.
41+
FetchL402 func(context.Context) error
42+
}
43+
44+
// Manager is a manager for notifications that the swap server sends to the
45+
// client.
46+
type Manager struct {
47+
cfg *Config
48+
49+
hasL402 bool
50+
51+
subscribers map[NotificationType][]subscriber
52+
sync.Mutex
53+
}
54+
55+
// NewManager creates a new notification manager.
56+
func NewManager(cfg *Config) *Manager {
57+
return &Manager{
58+
cfg: cfg,
59+
subscribers: make(map[NotificationType][]subscriber),
60+
}
61+
}
62+
63+
type subscriber struct {
64+
subCtx context.Context
65+
recvChan interface{}
66+
}
67+
68+
// SubscribeReservations subscribes to the reservation notifications.
69+
func (m *Manager) SubscribeReservations(ctx context.Context,
70+
) <-chan *swapserverrpc.ServerReservationNotification {
71+
72+
// We'll create a channel that we'll use to send the notifications to the
73+
// caller.
74+
notifChan := make(
75+
chan *swapserverrpc.ServerReservationNotification, 1,
76+
)
77+
sub := subscriber{
78+
subCtx: ctx,
79+
recvChan: notifChan,
80+
}
81+
82+
m.Lock()
83+
m.subscribers[NotificationTypeReservation] = append(
84+
m.subscribers[NotificationTypeReservation],
85+
sub,
86+
)
87+
m.Unlock()
88+
89+
return notifChan
90+
}
91+
92+
// Run starts the notification manager. It will keep on running until the
93+
// context is canceled. It will subscribe to notifications and forward them to
94+
// the subscribers. On a first successful connection to the server, it will
95+
// close the readyChan to signal that the manager is ready.
96+
func (m *Manager) Run(ctx context.Context) error {
97+
// Initially we want to immediately try to connect to the server.
98+
waitTime := time.Duration(0)
99+
100+
// Start the notification runloop.
101+
for {
102+
timer := time.NewTimer(waitTime)
103+
// Increase the wait time for the next iteration.
104+
waitTime += time.Second * 1
105+
106+
// Return if the context has been canceled.
107+
select {
108+
case <-ctx.Done():
109+
return nil
110+
111+
case <-timer.C:
112+
}
113+
114+
// In order to create a valid l402 we first are going to call
115+
// the FetchL402 method. As a client might not have outbound capacity
116+
// yet, we'll retry until we get a valid response.
117+
if !m.hasL402 {
118+
err := m.cfg.FetchL402(ctx)
119+
if err != nil {
120+
log.Errorf("Error fetching L402: %v", err)
121+
continue
122+
}
123+
m.hasL402 = true
124+
}
125+
126+
connectedFunc := func() {
127+
// Reset the wait time to 10 seconds.
128+
waitTime = time.Second * 10
129+
}
130+
131+
err := m.subscribeNotifications(ctx, connectedFunc)
132+
if err != nil {
133+
log.Errorf("Error subscribing to notifications: %v", err)
134+
}
135+
}
136+
}
137+
138+
// subscribeNotifications subscribes to the notifications from the server.
139+
func (m *Manager) subscribeNotifications(ctx context.Context,
140+
connectedFunc func()) error {
141+
142+
callCtx, cancel := context.WithCancel(ctx)
143+
defer cancel()
144+
145+
notifStream, err := m.cfg.Client.SubscribeNotifications(
146+
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
147+
)
148+
if err != nil {
149+
return err
150+
}
151+
152+
// Signal that we're connected to the server.
153+
connectedFunc()
154+
log.Debugf("Successfully subscribed to server notifications")
155+
156+
for {
157+
notification, err := notifStream.Recv()
158+
if err == nil && notification != nil {
159+
log.Debugf("Received notification: %v", notification)
160+
m.handleNotification(notification)
161+
continue
162+
}
163+
164+
log.Errorf("Error receiving notification: %v", err)
165+
166+
return err
167+
}
168+
}
169+
170+
// handleNotification handles an incoming notification from the server,
171+
// forwarding it to the appropriate subscribers.
172+
func (m *Manager) handleNotification(notification *swapserverrpc.
173+
SubscribeNotificationsResponse) {
174+
175+
switch notification.Notification.(type) {
176+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
177+
// We'll forward the reservation notification to all subscribers.
178+
// Cleaning up any subscribers that have been canceled.
179+
newSubs := make(
180+
[]subscriber, 0, len(m.subscribers[NotificationTypeReservation]),
181+
)
182+
reservationNtfn := notification.GetReservationNotification()
183+
m.Lock()
184+
defer m.Unlock()
185+
186+
for _, sub := range m.subscribers[NotificationTypeReservation] {
187+
recvChan := sub.recvChan.(chan *swapserverrpc.
188+
ServerReservationNotification)
189+
190+
select {
191+
case <-sub.subCtx.Done():
192+
close(recvChan)
193+
continue
194+
195+
case recvChan <- reservationNtfn:
196+
newSubs = append(newSubs, sub)
197+
}
198+
}
199+
m.subscribers[NotificationTypeReservation] = newSubs
200+
201+
default:
202+
log.Warnf("Received unknown notification type: %v",
203+
notification)
204+
}
205+
}

0 commit comments

Comments
 (0)