@@ -53,7 +53,6 @@ impl EventProducer<StoreEvent> for StoreEventListener {
53
53
/// Manage subscriptions to the `StoreEvent` stream. Keep a list of
54
54
/// currently active subscribers and forward new events to each of them
55
55
pub struct SubscriptionManager {
56
- logger : Logger ,
57
56
subscriptions : Arc < RwLock < HashMap < String , Sender < Arc < StoreEvent > > > > > ,
58
57
59
58
/// listen to StoreEvents generated when applying entity operations
@@ -68,7 +67,6 @@ impl SubscriptionManager {
68
67
. expect ( "Failed to listen to entity change events in Postgres" ) ;
69
68
70
69
let manager = SubscriptionManager {
71
- logger,
72
70
subscriptions : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
73
71
listener : Mutex :: new ( listener) ,
74
72
} ;
@@ -91,28 +89,24 @@ impl SubscriptionManager {
91
89
& self ,
92
90
store_events : Box < dyn Stream < Item = StoreEvent , Error = ( ) > + Send > ,
93
91
) {
94
- let logger = self . logger . clone ( ) ;
95
92
let subscriptions = self . subscriptions . clone ( ) ;
96
93
97
94
graph:: spawn (
98
95
store_events
99
96
. for_each ( move |event| {
100
97
let senders = subscriptions. read ( ) . unwrap ( ) . clone ( ) ;
101
- let logger = logger. clone ( ) ;
102
98
let subscriptions = subscriptions. clone ( ) ;
103
99
let event = Arc :: new ( event) ;
104
100
105
101
// Write change to all matching subscription streams; remove subscriptions
106
102
// whose receiving end has been dropped
107
103
stream:: iter_ok :: < _ , ( ) > ( senders) . for_each ( move |( id, sender) | {
108
- let logger = logger. clone ( ) ;
109
104
let subscriptions = subscriptions. clone ( ) ;
110
105
111
106
sender. send ( event. cheap_clone ( ) ) . then ( move |result| {
112
107
match result {
113
108
Err ( _send_error) => {
114
109
// Receiver was dropped
115
- debug ! ( logger, "Unsubscribe" ; "id" => & id) ;
116
110
subscriptions. write ( ) . unwrap ( ) . remove ( & id) ;
117
111
Ok ( ( ) )
118
112
}
@@ -128,7 +122,6 @@ impl SubscriptionManager {
128
122
fn periodically_clean_up_stale_subscriptions ( & self ) {
129
123
use futures03:: stream:: StreamExt ;
130
124
131
- let logger = self . logger . clone ( ) ;
132
125
let subscriptions = self . subscriptions . clone ( ) ;
133
126
134
127
// Clean up stale subscriptions every 5s
@@ -147,7 +140,6 @@ impl SubscriptionManager {
147
140
148
141
// Remove all stale subscriptions
149
142
for id in stale_ids {
150
- debug ! ( logger, "Unsubscribe" ; "id" => & id) ;
151
143
subscriptions. remove ( & id) ;
152
144
}
153
145
@@ -165,10 +157,6 @@ impl SubscriptionManager {
165
157
id = Uuid :: new_v4 ( ) . to_string ( ) ;
166
158
}
167
159
168
- debug ! ( self . logger, "Subscribe" ;
169
- "id" => & id,
170
- "entities" => format!( "{:?}" , entities) ) ;
171
-
172
160
// Prepare the new subscription by creating a channel and a subscription object
173
161
let ( sender, receiver) = channel ( 100 ) ;
174
162
0 commit comments