@@ -4,8 +4,9 @@ use crate::{Error, SharedEnv};
44use tokio:: select;
55use tokio:: sync:: { broadcast, mpsc, oneshot} ;
66use tokio:: task:: spawn_local;
7- use tracing:: { error, info, warn} ;
7+ use tracing:: { debug , error, info, trace , warn} ;
88
9+ #[ derive( Debug ) ]
910enum SubscriptionCommand {
1011 GetModel {
1112 resp_tx : oneshot:: Sender < models:: Subscription > ,
@@ -140,6 +141,7 @@ impl SubscriptionActor {
140141 loop {
141142 select ! {
142143 Ok ( event) = self . listener. events. recv( ) => {
144+ debug!( ?event, "received listener event" ) ;
143145 match event {
144146 ListenerEvent :: Message ( msg) => self . handle_msg_event( msg) ,
145147 other => {
@@ -148,14 +150,17 @@ impl SubscriptionActor {
148150 }
149151 }
150152 Some ( command) = self . command_rx. recv( ) => {
153+ trace!( ?command, "processing subscription command" ) ;
151154 match command {
152155 SubscriptionCommand :: GetModel { resp_tx } => {
156+ debug!( "getting subscription model" ) ;
153157 let _ = resp_tx. send( self . model. clone( ) ) ;
154158 }
155159 SubscriptionCommand :: UpdateInfo {
156160 mut new_model,
157161 resp_tx,
158162 } => {
163+ debug!( server=?new_model. server, topic=?new_model. topic, "updating subscription info" ) ;
159164 new_model. server = self . model. server. clone( ) ;
160165 new_model. topic = self . model. topic. clone( ) ;
161166 let res = self . env. db. update_subscription( new_model. clone( ) ) ;
@@ -165,9 +170,11 @@ impl SubscriptionActor {
165170 let _ = resp_tx. send( res. map_err( |e| e. into( ) ) ) ;
166171 }
167172 SubscriptionCommand :: Publish { msg, resp_tx} => {
173+ debug!( topic=?self . model. topic, "publishing message" ) ;
168174 let _ = resp_tx. send( self . publish( msg) . await ) ;
169175 }
170176 SubscriptionCommand :: Attach { resp_tx } => {
177+ debug!( topic=?self . model. topic, "attaching new listener" ) ;
171178 let messages = self
172179 . env
173180 . db
@@ -191,9 +198,11 @@ impl SubscriptionActor {
191198 let _ = resp_tx. send( ( previous_events, self . broadcast_tx. subscribe( ) ) ) ;
192199 }
193200 SubscriptionCommand :: ClearNotifications { resp_tx} => {
201+ debug!( topic=?self . model. topic, "clearing notifications" ) ;
194202 let _ = resp_tx. send( self . env. db. delete_messages( & self . model. server, & self . model. topic) . map_err( |e| anyhow:: anyhow!( e) ) ) ;
195203 }
196204 SubscriptionCommand :: UpdateReadUntil { timestamp, resp_tx } => {
205+ debug!( topic=?self . model. topic, timestamp=timestamp, "updating read until timestamp" ) ;
197206 let res = self . env. db. update_read_until( & self . model. server, & self . model. topic, timestamp) ;
198207 let _ = resp_tx. send( res. map_err( |e| anyhow:: anyhow!( e) ) ) ;
199208 }
@@ -205,35 +214,42 @@ impl SubscriptionActor {
205214
206215 async fn publish ( & self , msg : String ) -> anyhow:: Result < ( ) > {
207216 let server = & self . model . server ;
217+ debug ! ( server=?server, "preparing to publish message" ) ;
208218 let creds = self . env . credentials . get ( server) ;
209219 let mut req = self . env . http_client . post ( server) ;
210220 if let Some ( creds) = creds {
211221 req = req. basic_auth ( creds. username , Some ( creds. password ) ) ;
212222 }
213223
214- info ! ( "sending message" ) ;
224+ info ! ( server=?server , "sending message" ) ;
215225 let res = req. body ( msg) . send ( ) . await ?;
216226 res. error_for_status ( ) ?;
227+ debug ! ( server=?server, "message published successfully" ) ;
217228 Ok ( ( ) )
218229 }
219230 fn handle_msg_event ( & mut self , msg : ReceivedMessage ) {
231+ debug ! ( topic=?self . model. topic, "handling new message" ) ;
220232 // Store in database
221233 let already_stored: bool = {
222234 let json_ev = & serde_json:: to_string ( & msg) . unwrap ( ) ;
223235 match self . env . db . insert_message ( & self . model . server , json_ev) {
224236 Err ( Error :: DuplicateMessage ) => {
225- warn ! ( "Received duplicate message") ;
237+ warn ! ( topic=? self . model . topic , "received duplicate message") ;
226238 true
227239 }
228240 Err ( e) => {
229- error ! ( error = ?e, "Can't store the message" ) ;
241+ error ! ( error=?e, topic=?self . model. topic, "can't store the message" ) ;
242+ false
243+ }
244+ _ => {
245+ debug ! ( topic=?self . model. topic, "message stored successfully" ) ;
230246 false
231247 }
232- _ => false ,
233248 }
234249 } ;
235250
236251 if !already_stored {
252+ debug ! ( topic=?self . model. topic, muted=?self . model. muted, "checking if notification should be shown" ) ;
237253 // Show notification. If this fails, panic
238254 if !{ self . model . muted } {
239255 let notifier = self . env . notifier . clone ( ) ;
@@ -246,11 +262,14 @@ impl SubscriptionActor {
246262 actions : msg. actions . clone ( ) ,
247263 } ;
248264
249- info ! ( "Showing notification") ;
265+ info ! ( topic=? self . model . topic , "showing notification") ;
250266 notifier. send ( n) . unwrap ( ) ;
267+ } else {
268+ debug ! ( topic=?self . model. topic, "notification muted, skipping" ) ;
251269 }
252270
253271 // Forward to app
272+ debug ! ( topic=?self . model. topic, "forwarding message to app" ) ;
254273 let _ = self . broadcast_tx . send ( ListenerEvent :: Message ( msg) ) ;
255274 }
256275 }
0 commit comments