@@ -10,7 +10,7 @@ use tokio::{
1010 sync:: { mpsc, oneshot} ,
1111} ;
1212use tokio_stream:: wrappers:: LinesStream ;
13- use tracing:: { debug, error, info} ;
13+ use tracing:: { debug, error, info, warn , Instrument , Span } ;
1414
1515use crate :: credentials:: Credentials ;
1616use crate :: http_client:: HttpClient ;
@@ -106,36 +106,43 @@ pub struct ListenerActor {
106106
107107impl ListenerActor {
108108 pub async fn run_loop ( mut self ) {
109- let mut commands_rx = self . commands_rx . take ( ) . unwrap ( ) ;
110- loop {
111- select ! {
112- _ = self . run_supervised_loop( ) => {
113- // the supervised loop cannot fail. If it finished, don't restart.
114- break ;
115- } ,
116- cmd = commands_rx. recv( ) => {
117- match cmd {
118- Some ( ListenerCommand :: Restart ) => {
119- info!( "Received restart command" ) ;
120- continue ;
121- }
122- Some ( ListenerCommand :: Shutdown ) => {
123- info!( "Received shutdown command" ) ;
124- break ;
125- }
126- Some ( ListenerCommand :: GetState ( tx) ) => {
127- info!( "Received get state command" ) ;
128- let state = self . state. clone( ) ;
129- let _ = tx. send( state) ;
130- }
131- None => {
132- error!( "Channel closed for ListenerActor" ) ;
133- break ;
109+ let span = tracing:: info_span!( "listener_loop" , topic = %self . config. topic) ;
110+ async {
111+ let mut commands_rx = self . commands_rx . take ( ) . unwrap ( ) ;
112+ loop {
113+ select ! {
114+ _ = self . run_supervised_loop( ) => {
115+ info!( "supervised loop ended" ) ;
116+ break ;
117+ } ,
118+ cmd = commands_rx. recv( ) => {
119+ match cmd {
120+ Some ( ListenerCommand :: Restart ) => {
121+ info!( "restarting listener" ) ;
122+ continue ;
123+ }
124+ Some ( ListenerCommand :: Shutdown ) => {
125+ info!( "shutting down listener" ) ;
126+ break ;
127+ }
128+ Some ( ListenerCommand :: GetState ( tx) ) => {
129+ debug!( "getting listener state" ) ;
130+ let state = self . state. clone( ) ;
131+ if tx. send( state) . is_err( ) {
132+ warn!( "failed to send state - receiver dropped" ) ;
133+ }
134+ }
135+ None => {
136+ error!( "command channel closed" ) ;
137+ break ;
138+ }
134139 }
135140 }
136141 }
137142 }
138143 }
144+ . instrument ( span)
145+ . await ;
139146 }
140147
141148 async fn set_state ( & mut self , state : ConnectionState ) {
@@ -146,88 +153,106 @@ impl ListenerActor {
146153 . unwrap ( ) ;
147154 }
148155 async fn run_supervised_loop ( & mut self ) {
149- dbg ! ( "supervised" ) ;
150- let retrier = || {
151- crate :: retry:: WaitExponentialRandom :: builder ( )
152- . min ( Duration :: from_secs ( 1 ) )
153- . max ( Duration :: from_secs ( 5 * 60 ) )
154- . build ( )
155- } ;
156- let mut retry = retrier ( ) ;
157- loop {
158- let start_time = std:: time:: Instant :: now ( ) ;
159-
160- if let Err ( e) = self . recv_and_forward_loop ( ) . await {
161- let uptime = std:: time:: Instant :: now ( ) . duration_since ( start_time) ;
162- // Reset retry delay to minimum if uptime was decent enough
163- if uptime > Duration :: from_secs ( 60 * 4 ) {
164- retry = retrier ( ) ;
156+ let span = tracing:: info_span!( "supervised_loop" ) ;
157+ async {
158+ let retrier = || {
159+ crate :: retry:: WaitExponentialRandom :: builder ( )
160+ . min ( Duration :: from_secs ( 1 ) )
161+ . max ( Duration :: from_secs ( 5 * 60 ) )
162+ . build ( )
163+ } ;
164+ let mut retry = retrier ( ) ;
165+ loop {
166+ let start_time = std:: time:: Instant :: now ( ) ;
167+
168+ if let Err ( e) = self . recv_and_forward_loop ( ) . await {
169+ let uptime = std:: time:: Instant :: now ( ) . duration_since ( start_time) ;
170+ // Reset retry delay to minimum if uptime was decent enough
171+ if uptime > Duration :: from_secs ( 60 * 4 ) {
172+ debug ! ( "resetting retry delay due to sufficient uptime" ) ;
173+ retry = retrier ( ) ;
174+ }
175+ error ! ( error = ?e, "connection error" ) ;
176+ self . set_state ( ConnectionState :: Reconnecting {
177+ retry_count : retry. count ( ) ,
178+ delay : retry. next_delay ( ) ,
179+ error : Some ( Arc :: new ( e) ) ,
180+ } )
181+ . await ;
182+ info ! ( delay = ?retry. next_delay( ) , "waiting before reconnect attempt" ) ;
183+ retry. wait ( ) . await ;
184+ } else {
185+ break ;
165186 }
166- error ! ( error = ?e) ;
167- self . set_state ( ConnectionState :: Reconnecting {
168- retry_count : retry. count ( ) ,
169- delay : retry. next_delay ( ) ,
170- error : Some ( Arc :: new ( e) ) ,
171- } )
172- . await ;
173- info ! ( delay = ?retry. next_delay( ) , "restarting" ) ;
174- retry. wait ( ) . await ;
175- } else {
176- break ;
177187 }
178188 }
189+ . instrument ( span)
190+ . await ;
179191 }
180192
181193 async fn recv_and_forward_loop ( & mut self ) -> anyhow:: Result < ( ) > {
182- let creds = self . config . credentials . get ( & self . config . endpoint ) ;
183- let req = topic_request (
184- & self . config . http_client ,
185- & self . config . endpoint ,
186- & self . config . topic ,
187- self . config . since ,
188- creds. as_ref ( ) . map ( |x| x. username . as_str ( ) ) ,
189- creds. as_ref ( ) . map ( |x| x. password . as_str ( ) ) ,
190- ) ;
191- let res = self . config . http_client . execute ( req?) . await ?;
192- let res = res. error_for_status ( ) ?;
193- let reader = tokio_util:: io:: StreamReader :: new (
194- res. bytes_stream ( )
195- . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e. to_string ( ) ) ) ,
194+ let span = tracing:: info_span!( "receive_loop" ,
195+ endpoint = %self . config. endpoint,
196+ topic = %self . config. topic,
197+ since = %self . config. since
196198 ) ;
197- let stream = response_lines ( reader) . await ?;
198- tokio:: pin!( stream) ;
199-
200- self . set_state ( ConnectionState :: Connected ) . await ;
201-
202- info ! ( topic = %& self . config. topic, "listening" ) ;
203- while let Some ( msg) = stream. next ( ) . await {
204- let msg = msg?;
205-
206- let min_msg = serde_json:: from_str :: < models:: MinMessage > ( & msg)
207- . map_err ( |e| Error :: InvalidMinMessage ( msg. to_string ( ) , e) ) ?;
208- self . config . since = min_msg. time . max ( self . config . since ) ;
209-
210- let event = serde_json:: from_str ( & msg)
211- . map_err ( |e| Error :: InvalidMessage ( msg. to_string ( ) , e) ) ?;
212-
213- match event {
214- ServerEvent :: Message ( msg) => {
215- debug ! ( "message event" ) ;
216- self . event_tx
217- . send ( ListenerEvent :: Message ( msg) )
218- . await
219- . unwrap ( ) ;
220- }
221- ServerEvent :: KeepAlive { .. } => {
222- debug ! ( "keepalive event" ) ;
223- }
224- ServerEvent :: Open { .. } => {
225- debug ! ( "open event" ) ;
199+ async {
200+ let creds = self . config . credentials . get ( & self . config . endpoint ) ;
201+ debug ! ( "creating request" ) ;
202+ let req = topic_request (
203+ & self . config . http_client ,
204+ & self . config . endpoint ,
205+ & self . config . topic ,
206+ self . config . since ,
207+ creds. as_ref ( ) . map ( |x| x. username . as_str ( ) ) ,
208+ creds. as_ref ( ) . map ( |x| x. password . as_str ( ) ) ,
209+ ) ;
210+
211+ debug ! ( "executing request" ) ;
212+ let res = self . config . http_client . execute ( req?) . await ?;
213+ let res = res. error_for_status ( ) ?;
214+ let reader = tokio_util:: io:: StreamReader :: new (
215+ res. bytes_stream ( )
216+ . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e. to_string ( ) ) ) ,
217+ ) ;
218+ let stream = response_lines ( reader) . await ?;
219+ tokio:: pin!( stream) ;
220+
221+ self . set_state ( ConnectionState :: Connected ) . await ;
222+ info ! ( "connection established" ) ;
223+
224+ info ! ( topic = %& self . config. topic, "listening" ) ;
225+ while let Some ( msg) = stream. next ( ) . await {
226+ let msg = msg?;
227+
228+ let min_msg = serde_json:: from_str :: < models:: MinMessage > ( & msg)
229+ . map_err ( |e| Error :: InvalidMinMessage ( msg. to_string ( ) , e) ) ?;
230+ self . config . since = min_msg. time . max ( self . config . since ) ;
231+
232+ let event = serde_json:: from_str ( & msg)
233+ . map_err ( |e| Error :: InvalidMessage ( msg. to_string ( ) , e) ) ?;
234+
235+ match event {
236+ ServerEvent :: Message ( msg) => {
237+ debug ! ( id = %msg. id, "forwarding message" ) ;
238+ self . event_tx
239+ . send ( ListenerEvent :: Message ( msg) )
240+ . await
241+ . unwrap ( ) ;
242+ }
243+ ServerEvent :: KeepAlive { id, .. } => {
244+ debug ! ( id = %id, "received keepalive" ) ;
245+ }
246+ ServerEvent :: Open { id, .. } => {
247+ debug ! ( id = %id, "received open event" ) ;
248+ }
226249 }
227250 }
228- }
229251
230- Ok ( ( ) )
252+ Ok ( ( ) )
253+ }
254+ . instrument ( span)
255+ . await
231256 }
232257}
233258
@@ -323,13 +348,6 @@ mod tests {
323348 ListenerEvent :: ConnectionStateChanged ( ConnectionState :: Connected { .. } ) ,
324349 ]
325350 ) ) ;
326-
327- // assert!(matches!(
328- // listener,
329- // ListenerEvent::Error { .. },
330- // ListenerEvent::Disconnected { .. },
331- // ListenerEvent::Connected { .. },
332- // ));
333351 } ) ;
334352 local_set. await ;
335353 }
@@ -372,26 +390,4 @@ mod tests {
372390 } ) ;
373391 local_set. await ;
374392 }
375-
376- #[ tokio:: test]
377- async fn integration_connects_sends_receives_simple ( ) {
378- let local_set = LocalSet :: new ( ) ;
379- local_set. spawn_local ( async {
380- let http_client = HttpClient :: new ( reqwest:: Client :: new ( ) ) ;
381- let credentials = Credentials :: new_nullable ( vec ! [ ] ) . await . unwrap ( ) ;
382-
383- let config = ListenerConfig {
384- http_client,
385- credentials,
386- endpoint : "http://localhost:8000" . to_string ( ) ,
387- topic : "test" . to_string ( ) ,
388- since : 0 ,
389- } ;
390-
391- let listener = ListenerHandle :: new ( config. clone ( ) ) ;
392-
393- // assert_event_matches!(listener, ListenerEvent::Connected { .. },);
394- } ) ;
395- local_set. await ;
396- }
397393}
0 commit comments