Skip to content

Commit 6b23607

Browse files
committed
notifications: add notification manager
This commit adds a generic notification manager that can be used to subscribe to different types of notifications.
1 parent aea7578 commit 6b23607

File tree

3 files changed

+388
-0
lines changed

3 files changed

+388
-0
lines changed

notifications/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package notifications
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "NTFNS"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

notifications/manager.go

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/lightninglabs/loop/swapserverrpc"
8+
"google.golang.org/grpc"
9+
)
10+
11+
type NotificationType int
12+
13+
const (
14+
NotificationTypeUnknown NotificationType = iota
15+
NotificationTypeReservation
16+
)
17+
18+
type NotificationsClient interface {
19+
SubscribeNotifications(ctx context.Context,
20+
in *swapserverrpc.SubscribeNotificationsRequest,
21+
opts ...grpc.CallOption) (
22+
swapserverrpc.SwapServer_SubscribeNotificationsClient, error)
23+
}
24+
25+
// Config contains all the services that the notification manager needs to
26+
// operate.
27+
type Config struct {
28+
// Client is the client used to communicate with the swap server.
29+
Client NotificationsClient
30+
31+
// FetchL402 is the function used to fetch the l402 token.
32+
FetchL402 func(context.Context) error
33+
}
34+
35+
// Manager is a manager for notifications that the swap server sends to the
36+
// client.
37+
type Manager struct {
38+
cfg *Config
39+
40+
hasL402 bool
41+
42+
subscribers map[NotificationType][]subscriber
43+
}
44+
45+
// NewManager creates a new notification manager.
46+
func NewManager(cfg *Config) *Manager {
47+
return &Manager{
48+
cfg: cfg,
49+
subscribers: make(map[NotificationType][]subscriber),
50+
}
51+
}
52+
53+
type subscriber struct {
54+
subCtx context.Context
55+
recvChan interface{}
56+
}
57+
58+
// SubscribeReservations subscribes to the reservation notifications.
59+
func (m *Manager) SubscribeReservations(ctx context.Context,
60+
) <-chan *swapserverrpc.ServerReservationNotification {
61+
62+
// We'll create a channel that we'll use to send the notifications to the
63+
// caller.
64+
notifChan := make(
65+
chan *swapserverrpc.ServerReservationNotification, //nolint:lll
66+
)
67+
sub := subscriber{
68+
subCtx: ctx,
69+
recvChan: notifChan,
70+
}
71+
72+
m.subscribers[NotificationTypeReservation] = append(
73+
m.subscribers[NotificationTypeReservation],
74+
sub,
75+
)
76+
77+
return notifChan
78+
}
79+
80+
// Run starts the notification manager.
81+
func (n *Manager) Run(ctx context.Context) error {
82+
err := n.registerNotifications(ctx)
83+
if err != nil {
84+
return err
85+
}
86+
87+
return nil
88+
}
89+
90+
// registerNotifications registers a new server notification stream.
91+
func (n *Manager) registerNotifications(mainCtx context.Context) error {
92+
// In order to create a valid l402 we first are going to call
93+
// the FetchL402 method. As a client might not have outbound capacity
94+
// yet, we'll retry until we get a valid response.
95+
if !n.hasL402 {
96+
n.fetchL402(mainCtx)
97+
}
98+
99+
callCtx, cancel := context.WithCancel(mainCtx)
100+
101+
// We'll now subscribe to the swap server notifications.
102+
notifStream, err := n.cfg.Client.SubscribeNotifications(
103+
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
104+
)
105+
if err != nil {
106+
cancel()
107+
return err
108+
}
109+
110+
log.Debugf("Successfully subscribed to server notifications")
111+
112+
// We'll now start a goroutine that will handle all the incoming
113+
// notifications.
114+
go func() {
115+
for {
116+
notification, err := notifStream.Recv()
117+
if err == nil && notification != nil {
118+
log.Debugf("Received notification: %v", notification)
119+
n.handleNotification(notification)
120+
continue
121+
}
122+
log.Errorf("Error receiving "+
123+
"notification: %v", err)
124+
125+
cancel()
126+
127+
// If we encounter an error, we'll
128+
// try to reconnect.
129+
for {
130+
select {
131+
case <-mainCtx.Done():
132+
return
133+
134+
case <-time.After(time.Second * 10):
135+
log.Debugf("Reconnecting to server notifications")
136+
err = n.registerNotifications(mainCtx)
137+
if err != nil {
138+
log.Errorf("Error reconnecting: %v", err)
139+
continue
140+
}
141+
142+
// If we were able to reconnect, we'll
143+
// return.
144+
return
145+
}
146+
}
147+
}
148+
}()
149+
150+
return nil
151+
}
152+
153+
// fetchL402 fetches the L402 from the server. This method will keep on
154+
// retrying until it gets a valid response.
155+
func (m *Manager) fetchL402(ctx context.Context) {
156+
// Add a 0 timer so that we initially fetch the L402 immediately.
157+
timer := time.NewTimer(0)
158+
for {
159+
select {
160+
case <-ctx.Done():
161+
return
162+
163+
case <-timer.C:
164+
err := m.cfg.FetchL402(ctx)
165+
if err != nil {
166+
log.Warnf("Error fetching L402: %v", err)
167+
timer.Reset(time.Second * 10)
168+
continue
169+
}
170+
m.hasL402 = true
171+
172+
return
173+
}
174+
}
175+
}
176+
177+
// handleNotification handles an incoming notification from the server,
178+
// forwarding it to the appropriate subscribers.
179+
func (n *Manager) handleNotification(notification *swapserverrpc.
180+
SubscribeNotificationsResponse) {
181+
182+
switch notification.Notification.(type) {
183+
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
184+
// We'll forward the reservation notification to all subscribers.
185+
// Cleaning up any subscribers that have been canceled.
186+
newSubs := []subscriber{}
187+
for _, sub := range n.subscribers[NotificationTypeReservation] {
188+
recvChan := sub.recvChan.(chan *swapserverrpc.
189+
ServerReservationNotification)
190+
191+
select {
192+
case <-sub.subCtx.Done():
193+
continue
194+
case recvChan <- notification.GetReservationNotification():
195+
newSubs = append(newSubs, sub)
196+
}
197+
}
198+
199+
n.subscribers[NotificationTypeReservation] = newSubs
200+
201+
default:
202+
log.Warnf("Received unknown notification type: %v",
203+
notification)
204+
}
205+
}

notifications/manager_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package notifications
2+
3+
import (
4+
"context"
5+
"io"
6+
"testing"
7+
"time"
8+
9+
"github.com/lightninglabs/loop/swapserverrpc"
10+
"github.com/stretchr/testify/require"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/metadata"
13+
)
14+
15+
var (
16+
testReservationId = []byte{0x01, 0x02}
17+
testReservationId2 = []byte{0x01, 0x02}
18+
)
19+
20+
// mockNotificationsClient implements the NotificationsClient interface for testing.
21+
type mockNotificationsClient struct {
22+
mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient
23+
subscribeErr error
24+
timesCalled int
25+
}
26+
27+
func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context,
28+
in *swapserverrpc.SubscribeNotificationsRequest,
29+
opts ...grpc.CallOption) (
30+
swapserverrpc.SwapServer_SubscribeNotificationsClient, error) {
31+
32+
m.timesCalled++
33+
if m.subscribeErr != nil {
34+
return nil, m.subscribeErr
35+
}
36+
return m.mockStream, nil
37+
}
38+
39+
// mockSubscribeNotificationsClient simulates the server stream.
40+
type mockSubscribeNotificationsClient struct {
41+
grpc.ClientStream
42+
recvChan chan *swapserverrpc.SubscribeNotificationsResponse
43+
recvErrChan chan error
44+
}
45+
46+
func (m *mockSubscribeNotificationsClient) Recv() (
47+
*swapserverrpc.SubscribeNotificationsResponse, error) {
48+
49+
select {
50+
case err := <-m.recvErrChan:
51+
return nil, err
52+
case notif, ok := <-m.recvChan:
53+
if !ok {
54+
return nil, io.EOF
55+
}
56+
return notif, nil
57+
}
58+
}
59+
60+
func (m *mockSubscribeNotificationsClient) Header() (metadata.MD, error) {
61+
return nil, nil
62+
}
63+
64+
func (m *mockSubscribeNotificationsClient) Trailer() metadata.MD {
65+
return nil
66+
}
67+
68+
func (m *mockSubscribeNotificationsClient) CloseSend() error {
69+
return nil
70+
}
71+
72+
func (m *mockSubscribeNotificationsClient) Context() context.Context {
73+
return context.TODO()
74+
}
75+
76+
func (m *mockSubscribeNotificationsClient) SendMsg(interface{}) error {
77+
return nil
78+
}
79+
80+
func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error {
81+
return nil
82+
}
83+
84+
func TestManager_ReservationNotification(t *testing.T) {
85+
// Create a mock notification client
86+
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1)
87+
errChan := make(chan error, 1)
88+
mockStream := &mockSubscribeNotificationsClient{
89+
recvChan: recvChan,
90+
recvErrChan: errChan,
91+
}
92+
mockClient := &mockNotificationsClient{
93+
mockStream: mockStream,
94+
}
95+
96+
// Create a Manager with the mock client
97+
mgr := NewManager(&Config{
98+
Client: mockClient,
99+
FetchL402: func(ctx context.Context) error {
100+
// Simulate successful fetching of L402
101+
return nil
102+
},
103+
})
104+
105+
// Subscribe to reservation notifications.
106+
subCtx, subCancel := context.WithCancel(context.Background())
107+
subChan := mgr.SubscribeReservations(subCtx)
108+
109+
// Run the manager.
110+
ctx, cancel := context.WithCancel(context.Background())
111+
defer cancel()
112+
113+
err := mgr.Run(ctx)
114+
require.NoError(t, err)
115+
116+
// Wait a bit to ensure manager is running and has subscribed
117+
time.Sleep(100 * time.Millisecond)
118+
require.Equal(t, 1, mockClient.timesCalled)
119+
120+
// Send a test notification
121+
testNotif := getTestNotification(testReservationId)
122+
123+
// Send the notification to the recvChan
124+
recvChan <- testNotif
125+
126+
// Collect the notification in the callback
127+
receivedNotification := <-subChan
128+
129+
// Now, check that the notification received in the callback matches the one sent
130+
require.NotNil(t, receivedNotification)
131+
require.Equal(t, testReservationId, receivedNotification.ReservationId)
132+
133+
// Cancel the subscription
134+
subCancel()
135+
136+
// Send another test notification`
137+
testNotif2 := getTestNotification(testReservationId2)
138+
recvChan <- testNotif2
139+
140+
// Wait a bit to ensure the notification is not received
141+
time.Sleep(100 * time.Millisecond)
142+
143+
require.Len(t, mgr.subscribers[NotificationTypeReservation], 0)
144+
145+
// Close the recvChan to stop the manager's receive loop
146+
close(recvChan)
147+
}
148+
149+
func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResponse {
150+
return &swapserverrpc.SubscribeNotificationsResponse{
151+
Notification: &swapserverrpc.SubscribeNotificationsResponse_ReservationNotification{
152+
ReservationNotification: &swapserverrpc.ServerReservationNotification{
153+
ReservationId: resId,
154+
},
155+
},
156+
}
157+
}

0 commit comments

Comments
 (0)