@@ -92,15 +92,21 @@ pub struct SubscriptionManager {
9292
9393 /// Keep the notification listener alive
9494 listener : StoreEventListener ,
95+
96+ logger : Logger ,
9597}
9698
9799impl SubscriptionManager {
98100 pub fn new ( logger : Logger , postgres_url : String , registry : Arc < MetricsRegistry > ) -> Self {
99- let ( listener, store_events) = StoreEventListener :: new ( logger, postgres_url, registry) ;
101+ let logger = logger. new ( o ! ( "component" => "StoreEventListener" ) ) ;
102+
103+ let ( listener, store_events) =
104+ StoreEventListener :: new ( logger. cheap_clone ( ) , postgres_url, registry) ;
100105
101106 let mut manager = SubscriptionManager {
102107 subscriptions : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
103108 listener,
109+ logger,
104110 } ;
105111
106112 // Deal with store subscriptions
@@ -112,6 +118,32 @@ impl SubscriptionManager {
112118 manager
113119 }
114120
121+ async fn broadcast_event (
122+ logger : & Logger ,
123+ subscriptions : & Arc < RwLock < HashMap < usize , Sender < Arc < StoreEvent > > > > > ,
124+ event : StoreEvent ,
125+ ) {
126+ let event = Arc :: new ( event) ;
127+
128+ // Send to `subscriptions`.
129+ {
130+ let senders = subscriptions. read ( ) . unwrap ( ) . clone ( ) ;
131+
132+ // Write change to all matching subscription streams; remove subscriptions
133+ // whose receiving end has been dropped
134+ for ( id, sender) in senders {
135+ if let Err ( e) = sender. send ( event. cheap_clone ( ) ) . await {
136+ error ! (
137+ logger,
138+ "Failed to send store event to subscriber {}: {}" , id, e
139+ ) ;
140+ // Receiver was dropped
141+ subscriptions. write ( ) . unwrap ( ) . remove ( & id) ;
142+ }
143+ }
144+ }
145+ }
146+
115147 /// Receive store events from Postgres and send them to all active
116148 /// subscriptions. Detect stale subscriptions in the process and
117149 /// close them.
@@ -121,24 +153,22 @@ impl SubscriptionManager {
121153 ) {
122154 let subscriptions = self . subscriptions . cheap_clone ( ) ;
123155 let mut store_events = store_events. compat ( ) ;
156+ let logger = self . logger . cheap_clone ( ) ;
124157
125158 // This channel is constantly receiving things and there are locks involved,
126159 // so it's best to use a blocking task.
127160 graph:: spawn_blocking ( async move {
128- while let Some ( Ok ( event) ) = store_events. next ( ) . await {
129- let event = Arc :: new ( event) ;
130-
131- // Send to `subscriptions`.
132- {
133- let senders = subscriptions. read ( ) . unwrap ( ) . clone ( ) ;
134-
135- // Write change to all matching subscription streams; remove subscriptions
136- // whose receiving end has been dropped
137- for ( id, sender) in senders {
138- if sender. send ( event. cheap_clone ( ) ) . await . is_err ( ) {
139- // Receiver was dropped
140- subscriptions. write ( ) . unwrap ( ) . remove ( & id) ;
141- }
161+ loop {
162+ match store_events. next ( ) . await {
163+ Some ( Ok ( event) ) => {
164+ Self :: broadcast_event ( & logger, & subscriptions, event) . await ;
165+ }
166+ Some ( Err ( _) ) => {
167+ error ! ( logger, "Error receiving store event" ) ;
168+ }
169+ None => {
170+ error ! ( logger, "Store event stream ended" ) ;
171+ break ;
142172 }
143173 }
144174 }
@@ -147,6 +177,7 @@ impl SubscriptionManager {
147177
148178 fn periodically_clean_up_stale_subscriptions ( & self ) {
149179 let subscriptions = self . subscriptions . cheap_clone ( ) ;
180+ let logger = self . logger . cheap_clone ( ) ;
150181
151182 // Clean up stale subscriptions every 5s
152183 graph:: spawn ( async move {
@@ -169,6 +200,7 @@ impl SubscriptionManager {
169200
170201 // Remove all stale subscriptions
171202 for id in stale_ids {
203+ warn ! ( logger, "Removing stale subscription {}" , id) ;
172204 subscriptions. remove ( & id) ;
173205 }
174206 }
0 commit comments