11use std:: cell:: RefCell ;
22use std:: sync:: Arc ;
33use std:: thread:: JoinHandle ;
4- use std:: { time:: Duration } ;
4+ use std:: time:: Duration ;
55
66use futures:: { StreamExt , TryStreamExt } ;
77use serde:: { Deserialize , Serialize } ;
@@ -11,7 +11,7 @@ use tokio::sync::RwLock;
1111use tokio:: task:: { self , spawn_local, AbortHandle , LocalSet } ;
1212use tokio:: {
1313 select,
14- sync:: { mpsc, watch , oneshot } ,
14+ sync:: { mpsc, oneshot , watch } ,
1515} ;
1616use tokio_stream:: wrappers:: LinesStream ;
1717use tracing:: { debug, error, info} ;
@@ -36,9 +36,7 @@ pub enum ServerEvent {
3636 topic : String ,
3737 } ,
3838 #[ serde( rename = "message" ) ]
39- Message (
40- models:: Message ,
41- ) ,
39+ Message ( models:: Message ) ,
4240 #[ serde( rename = "keepalive" ) ]
4341 KeepAlive {
4442 id : String ,
@@ -116,9 +114,7 @@ pub struct ListenerActor {
116114}
117115
118116impl ListenerActor {
119- pub fn new (
120- config : ListenerConfig ,
121- ) -> ListenerHandle {
117+ pub fn new ( config : ListenerConfig ) -> ListenerHandle {
122118 let ( event_tx, event_rx) = async_channel:: bounded ( 64 ) ;
123119 let ( commands_tx, commands_rx) = mpsc:: channel ( 1 ) ;
124120
@@ -127,7 +123,6 @@ impl ListenerActor {
127123 // use a new local set to isolate panics
128124 let local_set = LocalSet :: new ( ) ;
129125 local_set. spawn_local ( async move {
130-
131126 let this = Self {
132127 event_tx,
133128 commands_rx : Some ( commands_rx) ,
@@ -149,41 +144,44 @@ impl ListenerActor {
149144 }
150145
151146 pub async fn run_loop ( mut self ) {
152- let mut commands_rx = self . commands_rx . take ( ) . unwrap ( ) ;
153- loop {
154- select ! {
155- _ = self . run_supervised_loop( ) => {
156- // the supervised loop cannot fail. If it finished, don't restart.
157- break ;
158- } ,
159- cmd = commands_rx. recv( ) => {
160- match cmd {
161- Some ( ListenerCommand :: Restart ) => {
162- info!( "Received restart command" ) ;
163- continue ;
164- }
165- Some ( ListenerCommand :: Shutdown ) => {
166- info!( "Received shutdown command" ) ;
167- break ;
168- }
169- Some ( ListenerCommand :: GetState ( tx) ) => {
170- info!( "Received get state command" ) ;
171- let state = self . state. clone( ) ;
172- let _ = tx. send( state) ;
173- }
174- None => {
175- error!( "Channel closed for ListenerActor" ) ;
176- break ;
177- }
147+ let mut commands_rx = self . commands_rx . take ( ) . unwrap ( ) ;
148+ loop {
149+ select ! {
150+ _ = self . run_supervised_loop( ) => {
151+ // the supervised loop cannot fail. If it finished, don't restart.
152+ break ;
153+ } ,
154+ cmd = commands_rx. recv( ) => {
155+ match cmd {
156+ Some ( ListenerCommand :: Restart ) => {
157+ info!( "Received restart command" ) ;
158+ continue ;
159+ }
160+ Some ( ListenerCommand :: Shutdown ) => {
161+ info!( "Received shutdown command" ) ;
162+ break ;
163+ }
164+ Some ( ListenerCommand :: GetState ( tx) ) => {
165+ info!( "Received get state command" ) ;
166+ let state = self . state. clone( ) ;
167+ let _ = tx. send( state) ;
168+ }
169+ None => {
170+ error!( "Channel closed for ListenerActor" ) ;
171+ break ;
178172 }
179173 }
180174 }
181175 }
176+ }
182177 }
183178
184179 async fn set_state ( & mut self , state : ConnectionState ) {
185180 self . state = state. clone ( ) ;
186- self . event_tx . send ( ListenerEvent :: ConnectionStateChanged ( state) ) . await . unwrap ( ) ;
181+ self . event_tx
182+ . send ( ListenerEvent :: ConnectionStateChanged ( state) )
183+ . await
184+ . unwrap ( ) ;
187185 }
188186 async fn run_supervised_loop ( & mut self ) {
189187 dbg ! ( "supervised" ) ;
@@ -208,7 +206,8 @@ impl ListenerActor {
208206 retry_count : retry. count ( ) ,
209207 delay : retry. next_delay ( ) ,
210208 error : Some ( Arc :: new ( e) ) ,
211- } ) . await ;
209+ } )
210+ . await ;
212211 info ! ( delay = ?retry. next_delay( ) , "restarting" ) ;
213212 retry. wait ( ) . await ;
214213 } else {
@@ -236,9 +235,7 @@ impl ListenerActor {
236235 let stream = response_lines ( reader) . await ?;
237236 tokio:: pin!( stream) ;
238237
239- self . set_state (
240- ConnectionState :: Connected ,
241- ) . await ;
238+ self . set_state ( ConnectionState :: Connected ) . await ;
242239
243240 info ! ( topic = %& self . config. topic, "listening" ) ;
244241 while let Some ( msg) = stream. next ( ) . await {
@@ -254,7 +251,10 @@ impl ListenerActor {
254251 match event {
255252 ServerEvent :: Message ( msg) => {
256253 debug ! ( "message event" ) ;
257- self . event_tx . send ( ListenerEvent :: Message ( msg) ) . await . unwrap ( ) ;
254+ self . event_tx
255+ . send ( ListenerEvent :: Message ( msg) )
256+ . await
257+ . unwrap ( ) ;
258258 }
259259 ServerEvent :: KeepAlive { .. } => {
260260 debug ! ( "keepalive event" ) ;
@@ -283,7 +283,10 @@ impl ListenerHandle {
283283 // the response will be sent as an event in self.events
284284 pub async fn request_state ( & self ) -> ConnectionState {
285285 let ( tx, rx) = oneshot:: channel ( ) ;
286- self . commands . send ( ListenerCommand :: GetState ( tx) ) . await . unwrap ( ) ;
286+ self . commands
287+ . send ( ListenerCommand :: GetState ( tx) )
288+ . await
289+ . unwrap ( ) ;
287290 rx. await . unwrap ( )
288291 }
289292}
@@ -336,7 +339,6 @@ mod tests {
336339
337340 let mut listener = ListenerActor :: new ( config. clone ( ) ) ;
338341 let items: Vec < _ > = listener. events . take ( 3 ) . collect ( ) . await ;
339-
340342
341343 dbg ! ( & items) ;
342344 assert ! ( matches!(
@@ -355,7 +357,7 @@ mod tests {
355357 // ListenerEvent::Connected { .. },
356358 // ));
357359 } ) ;
358- local_set. await ;
360+ local_set. await ;
359361 }
360362
361363 #[ tokio:: test]
@@ -400,23 +402,22 @@ mod tests {
400402 #[ tokio:: test]
401403 async fn integration_connects_sends_receives_simple ( ) {
402404 let local_set = LocalSet :: new ( ) ;
403- local_set
404- . spawn_local ( async {
405- let http_client = HttpClient :: new ( reqwest:: Client :: new ( ) ) ;
406- let credentials = Credentials :: new_nullable ( vec ! [ ] ) . await . unwrap ( ) ;
407-
408- let config = ListenerConfig {
409- http_client,
410- credentials,
411- endpoint : "http://localhost:8000" . to_string ( ) ,
412- topic : "test" . to_string ( ) ,
413- since : 0 ,
414- } ;
405+ local_set. spawn_local ( async {
406+ let http_client = HttpClient :: new ( reqwest:: Client :: new ( ) ) ;
407+ let credentials = Credentials :: new_nullable ( vec ! [ ] ) . await . unwrap ( ) ;
408+
409+ let config = ListenerConfig {
410+ http_client,
411+ credentials,
412+ endpoint : "http://localhost:8000" . to_string ( ) ,
413+ topic : "test" . to_string ( ) ,
414+ since : 0 ,
415+ } ;
415416
416- let mut listener = ListenerActor :: new ( config. clone ( ) ) ;
417+ let mut listener = ListenerActor :: new ( config. clone ( ) ) ;
417418
418- // assert_event_matches!(listener, ListenerEvent::Connected { .. },);
419- } ) ;
420- local_set. await ;
419+ // assert_event_matches!(listener, ListenerEvent::Connected { .. },);
420+ } ) ;
421+ local_set. await ;
421422 }
422423}
0 commit comments