Skip to content

Commit 2e3d51d

Browse files
committed
notifications: add notification manager
This commit adds a generic notification manager that can be used to subscribe to different types of notifications.
1 parent 97cb0d7 commit 2e3d51d

File tree

3 files changed

+407
-0
lines changed

3 files changed

+407
-0
lines changed

notifications/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package notifications
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "NTFNS"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

notifications/manager.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/lightninglabs/loop/swapserverrpc"
9+
"google.golang.org/grpc"
10+
)
11+
12+
type NotificationType int
13+
14+
const (
15+
NotificationTypeUnknown NotificationType = iota
16+
NotificationTypeReservation
17+
)
18+
19+
type NotificationsClient interface {
20+
SubscribeNotifications(ctx context.Context,
21+
in *swapserverrpc.SubscribeNotificationsRequest,
22+
opts ...grpc.CallOption) (
23+
swapserverrpc.SwapServer_SubscribeNotificationsClient, error)
24+
}
25+
26+
// Config contains all the services that the notification manager needs to
27+
// operate.
28+
type Config struct {
29+
// Client is the client used to communicate with the swap server.
30+
Client NotificationsClient
31+
32+
// FetchL402 is the function used to fetch the l402 token.
33+
FetchL402 func(context.Context) error
34+
}
35+
36+
// Manager is a manager for notifications that the swap server sends to the
37+
// client.
38+
type Manager struct {
39+
cfg *Config
40+
41+
hasL402 bool
42+
43+
subscribers map[NotificationType][]subscriber
44+
sync.Mutex
45+
}
46+
47+
// NewManager creates a new notification manager.
48+
func NewManager(cfg *Config) *Manager {
49+
return &Manager{
50+
cfg: cfg,
51+
subscribers: make(map[NotificationType][]subscriber),
52+
}
53+
}
54+
55+
type subscriber struct {
56+
subCtx context.Context
57+
recvChan interface{}
58+
}
59+
60+
// SubscribeReservations subscribes to the reservation notifications.
61+
func (m *Manager) SubscribeReservations(ctx context.Context,
62+
) <-chan *swapserverrpc.ServerReservationNotification {
63+
64+
// We'll create a channel that we'll use to send the notifications to the
65+
// caller.
66+
notifChan := make(
67+
chan *swapserverrpc.ServerReservationNotification, //nolint:lll
68+
)
69+
sub := subscriber{
70+
subCtx: ctx,
71+
recvChan: notifChan,
72+
}
73+
74+
m.Lock()
75+
m.subscribers[NotificationTypeReservation] = append(
76+
m.subscribers[NotificationTypeReservation],
77+
sub,
78+
)
79+
m.Unlock()
80+
81+
return notifChan
82+
}
83+
84+
// Run starts the notification manager. It will keep on running until the
85+
// context is canceled. It will subscribe to notifications and forward them to
86+
// the subscribers. On a first successful connection to the server, it will
87+
// close the readyChan to signal that the manager is ready.
88+
func (n *Manager) Run(ctx context.Context, readyChan chan<- struct{}) error {
89+
// In order to create a valid l402 we first are going to call
90+
// the FetchL402 method. As a client might not have outbound capacity
91+
// yet, we'll retry until we get a valid response.
92+
if !n.hasL402 {
93+
n.fetchL402(ctx)
94+
}
95+
96+
closeChanFunc := sync.OnceFunc(func() {
97+
close(readyChan)
98+
})
99+
100+
// Initially we want to immediately try to connect to the server.
101+
waitTime := time.Duration(0)
102+
103+
// Start the notification runloop.
104+
for {
105+
timer := time.NewTimer(waitTime)
106+
// Increase the wait time for the next iteration.
107+
waitTime += time.Second * 10
108+
109+
// Return if the context has been canceled.
110+
select {
111+
case <-ctx.Done():
112+
return nil
113+
case <-timer.C:
114+
}
115+
116+
connectedFunc := func() {
117+
closeChanFunc()
118+
// Reset the wait time to 10.
119+
waitTime = 10
120+
}
121+
122+
err := n.subscribeNotifications(ctx, connectedFunc)
123+
if err != nil {
124+
log.Errorf("Error subscribing to notifications: %v", err)
125+
}
126+
}
127+
}
128+
129+
// subscribeNotifications subscribes to the notifications from the server.
130+
func (m *Manager) subscribeNotifications(ctx context.Context,
131+
connectedFunc func()) error {
132+
133+
callCtx, cancel := context.WithCancel(ctx)
134+
defer cancel()
135+
136+
notifStream, err := m.cfg.Client.SubscribeNotifications(
137+
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
138+
)
139+
if err != nil {
140+
return err
141+
}
142+
143+
// Signal that we're connected to the server.
144+
connectedFunc()
145+
log.Debugf("Successfully subscribed to server notifications")
146+
147+
for {
148+
notification, err := notifStream.Recv()
149+
if err == nil && notification != nil {
150+
log.Debugf("Received notification: %v", notification)
151+
m.handleNotification(notification)
152+
continue
153+
}
154+
155+
log.Errorf("Error receiving notification: %v", err)
156+
157+
return err
158+
}
159+
}
160+
161+
// fetchL402 fetches the L402 from the server. This method will keep on
162+
// retrying until it gets a valid response.
163+
func (m *Manager) fetchL402(ctx context.Context) {
164+
// Add a 0 timer so that we initially fetch the L402 immediately.
165+
waitTime := time.Duration(0)
166+
timer := time.NewTimer(waitTime)
167+
for {
168+
select {
169+
case <-ctx.Done():
170+
return
171+
case <-timer.C:
172+
err := m.cfg.FetchL402(ctx)
173+
if err != nil {
174+
log.Warnf("Error fetching L402: %v", err)
175+
waitTime += time.Second * 10
176+
timer.Reset(waitTime)
177+
continue
178+
}
179+
m.hasL402 = true
180+
181+
return
182+
}
183+
}
184+
}
185+
186+
// handleNotification handles an incoming notification from the server,
187+
// forwarding it to the appropriate subscribers.
188+
func (n *Manager) handleNotification(notification *swapserverrpc.
189+
SubscribeNotificationsResponse) {
190+
191+
switch notification.Notification.(type) {
192+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
193+
// We'll forward the reservation notification to all subscribers.
194+
// Cleaning up any subscribers that have been canceled.
195+
newSubs := make(
196+
[]subscriber, 0, len(n.subscribers[NotificationTypeReservation]),
197+
)
198+
reservationNtfn := notification.GetReservationNotification()
199+
for _, sub := range n.subscribers[NotificationTypeReservation] {
200+
recvChan := sub.recvChan.(chan *swapserverrpc.
201+
ServerReservationNotification)
202+
203+
select {
204+
case <-sub.subCtx.Done():
205+
close(recvChan)
206+
continue
207+
case recvChan <- reservationNtfn:
208+
newSubs = append(newSubs, sub)
209+
}
210+
}
211+
212+
n.Lock()
213+
n.subscribers[NotificationTypeReservation] = newSubs
214+
n.Unlock()
215+
216+
default:
217+
log.Warnf("Received unknown notification type: %v",
218+
notification)
219+
}
220+
}

0 commit comments

Comments
 (0)