@@ -20,6 +20,10 @@ const (
2020 // NotificationTypeReservation is the notification type for reservation
2121 // notifications.
2222 NotificationTypeReservation
23+
24+ // NotificationTypeStaticLoopInSweepRequest is the notification type for
25+ // static loop in sweep requests.
26+ NotificationTypeStaticLoopInSweepRequest
2327)
2428
2529// Client is the interface that the notification manager needs to implement in
@@ -89,6 +93,31 @@ func (m *Manager) SubscribeReservations(ctx context.Context,
8993 return notifChan
9094}
9195
96+ // SubscribeStaticLoopInSweepRequests subscribes to the static loop in sweep
97+ // requests.
98+ func (m * Manager ) SubscribeStaticLoopInSweepRequests (ctx context.Context ,
99+ ) <- chan * swapserverrpc.ServerStaticLoopInSweepNotification {
100+
101+ notifChan := make (
102+ chan * swapserverrpc.ServerStaticLoopInSweepNotification , 1 ,
103+ )
104+ sub := subscriber {
105+ subCtx : ctx ,
106+ recvChan : notifChan ,
107+ }
108+
109+ m .addSubscriber (NotificationTypeStaticLoopInSweepRequest , sub )
110+
111+ // Start a goroutine to remove the subscriber when the context is canceled
112+ go func () {
113+ <- ctx .Done ()
114+ m .removeSubscriber (NotificationTypeStaticLoopInSweepRequest , sub )
115+ close (notifChan )
116+ }()
117+
118+ return notifChan
119+ }
120+
92121// Run starts the notification manager. It will keep on running until the
93122// context is canceled. It will subscribe to notifications and forward them to
94123// the subscribers. On a first successful connection to the server, it will
@@ -160,7 +189,7 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
160189 for {
161190 notification , err := notifStream .Recv ()
162191 if err == nil && notification != nil {
163- log .Debugf ("Received notification: %v" , notification )
192+ log .Tracef ("Received notification: %v" , notification )
164193 m .handleNotification (notification )
165194 continue
166195 }
@@ -173,13 +202,13 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
173202
174203// handleNotification handles an incoming notification from the server,
175204// forwarding it to the appropriate subscribers.
176- func (m * Manager ) handleNotification (notification * swapserverrpc.
205+ func (m * Manager ) handleNotification (ntfn * swapserverrpc.
177206 SubscribeNotificationsResponse ) {
178207
179- switch notification .Notification .(type ) {
180- case * swapserverrpc.SubscribeNotificationsResponse_ReservationNotification :
208+ switch ntfn .Notification .(type ) {
209+ case * swapserverrpc.SubscribeNotificationsResponse_ReservationNotification : // nolint: lll
181210 // We'll forward the reservation notification to all subscribers.
182- reservationNtfn := notification .GetReservationNotification ()
211+ reservationNtfn := ntfn .GetReservationNotification ()
183212 m .Lock ()
184213 defer m .Unlock ()
185214
@@ -189,10 +218,24 @@ func (m *Manager) handleNotification(notification *swapserverrpc.
189218
190219 recvChan <- reservationNtfn
191220 }
221+ case * swapserverrpc.SubscribeNotificationsResponse_StaticLoopInSweep : // nolint: lll
222+ // We'll forward the static loop in sweep request to all
223+ // subscribers.
224+ staticLoopInSweepRequestNtfn := ntfn .
225+ GetStaticLoopInSweep ()
226+ m .Lock ()
227+ defer m .Unlock ()
228+
229+ for _ , sub := range m .subscribers [NotificationTypeStaticLoopInSweepRequest ] { // nolint: lll
230+ recvChan := sub .recvChan .(chan * swapserverrpc.
231+ ServerStaticLoopInSweepNotification )
232+
233+ recvChan <- staticLoopInSweepRequestNtfn
234+ }
192235
193236 default :
194237 log .Warnf ("Received unknown notification type: %v" ,
195- notification )
238+ ntfn )
196239 }
197240}
198241
0 commit comments