Skip to content

Commit 8ba5bcb

Browse files
committed
notifications: add notification manager
This commit adds a generic notification manager that can be used to subscribe to different types of notifications.
1 parent aa595b3 commit 8ba5bcb

File tree

3 files changed

+409
-0
lines changed

3 files changed

+409
-0
lines changed

notifications/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package notifications
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "NTFNS"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

notifications/manager.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/lightninglabs/loop/swapserverrpc"
9+
"google.golang.org/grpc"
10+
)
11+
12+
// NotificationType is the type of notification that the manager can handle.
13+
type NotificationType int
14+
15+
const (
16+
// NotificationTypeUnknown is the default notification type.
17+
NotificationTypeUnknown NotificationType = iota
18+
19+
// NotificationTypeReservation is the notification type for reservation
20+
// notifications.
21+
NotificationTypeReservation
22+
)
23+
24+
// Client is the interface that the notification manager needs to implement in
25+
// order to be able to subscribe to notifications.
26+
type Client interface {
27+
// SubscribeNotifications subscribes to the notifications from the server.
28+
SubscribeNotifications(ctx context.Context,
29+
in *swapserverrpc.SubscribeNotificationsRequest,
30+
opts ...grpc.CallOption) (
31+
swapserverrpc.SwapServer_SubscribeNotificationsClient, error)
32+
}
33+
34+
// Config contains all the services that the notification manager needs to
35+
// operate.
36+
type Config struct {
37+
// Client is the client used to communicate with the swap server.
38+
Client Client
39+
40+
// FetchL402 is the function used to fetch the l402 token.
41+
FetchL402 func(context.Context) error
42+
}
43+
44+
// Manager is a manager for notifications that the swap server sends to the
45+
// client.
46+
type Manager struct {
47+
cfg *Config
48+
49+
hasL402 bool
50+
51+
subscribers map[NotificationType][]subscriber
52+
sync.Mutex
53+
}
54+
55+
// NewManager creates a new notification manager.
56+
func NewManager(cfg *Config) *Manager {
57+
return &Manager{
58+
cfg: cfg,
59+
subscribers: make(map[NotificationType][]subscriber),
60+
}
61+
}
62+
63+
type subscriber struct {
64+
subCtx context.Context
65+
recvChan interface{}
66+
}
67+
68+
// SubscribeReservations subscribes to the reservation notifications.
69+
func (m *Manager) SubscribeReservations(ctx context.Context,
70+
) <-chan *swapserverrpc.ServerReservationNotification {
71+
72+
notifChan := make(chan *swapserverrpc.ServerReservationNotification, 1)
73+
sub := subscriber{
74+
subCtx: ctx,
75+
recvChan: notifChan,
76+
}
77+
78+
m.Lock()
79+
m.subscribers[NotificationTypeReservation] = append(
80+
m.subscribers[NotificationTypeReservation],
81+
sub,
82+
)
83+
m.Unlock()
84+
85+
// Start a goroutine to remove the subscriber when the context is canceled
86+
go func() {
87+
<-ctx.Done()
88+
m.removeSubscriber(NotificationTypeReservation, sub)
89+
close(notifChan)
90+
}()
91+
92+
return notifChan
93+
}
94+
95+
// Run starts the notification manager. It will keep on running until the
96+
// context is canceled. It will subscribe to notifications and forward them to
97+
// the subscribers. On a first successful connection to the server, it will
98+
// close the readyChan to signal that the manager is ready.
99+
func (m *Manager) Run(ctx context.Context) error {
100+
// Initially we want to immediately try to connect to the server.
101+
waitTime := time.Duration(0)
102+
103+
// Start the notification runloop.
104+
for {
105+
timer := time.NewTimer(waitTime)
106+
// Increase the wait time for the next iteration.
107+
waitTime += time.Second * 1
108+
109+
// Return if the context has been canceled.
110+
select {
111+
case <-ctx.Done():
112+
return nil
113+
114+
case <-timer.C:
115+
}
116+
117+
// In order to create a valid l402 we first are going to call
118+
// the FetchL402 method. As a client might not have outbound capacity
119+
// yet, we'll retry until we get a valid response.
120+
if !m.hasL402 {
121+
err := m.cfg.FetchL402(ctx)
122+
if err != nil {
123+
log.Errorf("Error fetching L402: %v", err)
124+
continue
125+
}
126+
m.hasL402 = true
127+
}
128+
129+
connectedFunc := func() {
130+
// Reset the wait time to 10 seconds.
131+
waitTime = time.Second * 10
132+
}
133+
134+
err := m.subscribeNotifications(ctx, connectedFunc)
135+
if err != nil {
136+
log.Errorf("Error subscribing to notifications: %v", err)
137+
}
138+
}
139+
}
140+
141+
// subscribeNotifications subscribes to the notifications from the server.
142+
func (m *Manager) subscribeNotifications(ctx context.Context,
143+
connectedFunc func()) error {
144+
145+
callCtx, cancel := context.WithCancel(ctx)
146+
defer cancel()
147+
148+
notifStream, err := m.cfg.Client.SubscribeNotifications(
149+
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
150+
)
151+
if err != nil {
152+
return err
153+
}
154+
155+
// Signal that we're connected to the server.
156+
connectedFunc()
157+
log.Debugf("Successfully subscribed to server notifications")
158+
159+
for {
160+
notification, err := notifStream.Recv()
161+
if err == nil && notification != nil {
162+
log.Debugf("Received notification: %v", notification)
163+
m.handleNotification(notification)
164+
continue
165+
}
166+
167+
log.Errorf("Error receiving notification: %v", err)
168+
169+
return err
170+
}
171+
}
172+
173+
// handleNotification handles an incoming notification from the server,
174+
// forwarding it to the appropriate subscribers.
175+
func (m *Manager) handleNotification(notification *swapserverrpc.
176+
SubscribeNotificationsResponse) {
177+
178+
switch notification.Notification.(type) {
179+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
180+
// We'll forward the reservation notification to all subscribers.
181+
reservationNtfn := notification.GetReservationNotification()
182+
m.Lock()
183+
defer m.Unlock()
184+
185+
for _, sub := range m.subscribers[NotificationTypeReservation] {
186+
recvChan := sub.recvChan.(chan *swapserverrpc.
187+
ServerReservationNotification)
188+
189+
recvChan <- reservationNtfn
190+
}
191+
192+
default:
193+
log.Warnf("Received unknown notification type: %v",
194+
notification)
195+
}
196+
}
197+
198+
// removeSubscriber removes a subscriber from the manager.
199+
func (m *Manager) removeSubscriber(notifType NotificationType, sub subscriber) {
200+
m.Lock()
201+
defer m.Unlock()
202+
subs := m.subscribers[notifType]
203+
newSubs := make([]subscriber, 0, len(subs))
204+
for _, s := range subs {
205+
if s != sub {
206+
newSubs = append(newSubs, s)
207+
}
208+
}
209+
m.subscribers[notifType] = newSubs
210+
}

0 commit comments

Comments
 (0)