@@ -32,7 +32,7 @@ use tokio::{
3232 mpsc:: { self , unbounded_channel, UnboundedReceiver , UnboundedSender } ,
3333 oneshot,
3434 } ,
35- task:: { self , JoinSet } ,
35+ task:: { self , JoinError , JoinSet } ,
3636 time:: { self , sleep, MissedTickBehavior } ,
3737} ;
3838use tokio_stream:: wrappers:: UnboundedReceiverStream ;
@@ -75,7 +75,7 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
7575 let guard = elegant_departure:: get_shutdown_guard ( ) ;
7676 tokio:: spawn ( async move {
7777 let _ = guard. wait ( ) . await ;
78- info ! ( "Cancellation token received, shutting down consumer" ) ;
78+ info ! ( "Cancellation token received, shutting down consumer... " ) ;
7979 let ( rendezvous_sender, _) = sync_channel ( 0 ) ;
8080 let _ = event_sender. send ( ( Event :: Shutdown , rendezvous_sender) ) ;
8181 } ) ;
@@ -189,6 +189,10 @@ impl ActorHandles {
189189 }
190190 }
191191 }
192+
193+ async fn join_next ( & mut self ) -> Option < Result < Result < ( ) , Error > , JoinError > > {
194+ self . join_set . join_next ( ) . await
195+ }
192196}
193197
194198#[ macro_export]
@@ -333,54 +337,65 @@ pub async fn handle_events(
333337 let mut state = ConsumerState :: Ready ;
334338
335339 while let ConsumerState :: Ready { .. } | ConsumerState :: Consuming { .. } = state {
336- let Some ( ( event, _rendezvous_guard) ) = events_stream. next ( ) . await else {
337- unreachable ! ( "Unexpected end to event stream" )
338- } ;
339- info ! ( "Received event: {:?}" , event) ;
340- state = match ( state, event) {
341- ( ConsumerState :: Ready , Event :: Assign ( tpl) ) => {
342- ConsumerState :: Consuming ( spawn_actors ( consumer. clone ( ) , & tpl) , tpl)
343- }
344- ( ConsumerState :: Ready , Event :: Revoke ( _) ) => {
345- unreachable ! ( "Got partition revocation before the consumer has started" )
346- }
347- ( ConsumerState :: Ready , Event :: Shutdown ) => ConsumerState :: Stopped ,
348- ( ConsumerState :: Consuming ( actor_handles, mut tpl) , Event :: Assign ( mut assigned_tpl) ) => {
349- assert ! (
350- tpl. is_disjoint( & assigned_tpl) ,
351- "Newly assigned TPL should be disjoint from TPL we're consuming from"
352- ) ;
353- tpl. append ( & mut assigned_tpl) ;
354- debug ! (
355- "{} additional topic partitions added after assignment" ,
356- assigned_tpl. len( )
357- ) ;
358- actor_handles. shutdown ( CALLBACK_DURATION ) . await ;
359- ConsumerState :: Consuming ( spawn_actors ( consumer. clone ( ) , & tpl) , tpl)
340+ select ! {
341+ res = match state {
342+ ConsumerState :: Consuming ( ref mut handles, _) => Either :: Left ( handles. join_next( ) ) ,
343+ _ => Either :: Right ( future:: pending:: <_>( ) )
344+ } => {
345+ error!( "Actor exited unexpectedly with {:?}, shutting down..." , res) ;
346+ drop( elegant_departure:: shutdown( ) ) ;
360347 }
361- ( ConsumerState :: Consuming ( actor_handles, mut tpl) , Event :: Revoke ( revoked_tpl) ) => {
362- assert ! (
363- tpl. is_subset( & revoked_tpl) ,
364- "Revoked TPL should be a subset of TPL we're consuming from"
365- ) ;
366- tpl. retain ( |e| !revoked_tpl. contains ( e) ) ;
367- debug ! ( "{} topic partitions remaining after revocation" , tpl. len( ) ) ;
368- actor_handles. shutdown ( CALLBACK_DURATION ) . await ;
369- if tpl. is_empty ( ) {
370- ConsumerState :: Ready
371- } else {
372- ConsumerState :: Consuming ( spawn_actors ( consumer. clone ( ) , & tpl) , tpl)
348+ event = events_stream. next( ) => {
349+ let Some ( ( event, _rendezvous_guard) ) = event else {
350+ unreachable!( "Unexpected end to event stream" ) ;
351+ } ;
352+ info!( "Received event: {:?}" , event) ;
353+ state = match ( state, event) {
354+ ( ConsumerState :: Ready , Event :: Assign ( tpl) ) => {
355+ ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
356+ }
357+ ( ConsumerState :: Ready , Event :: Revoke ( _) ) => {
358+ unreachable!( "Got partition revocation before the consumer has started" )
359+ }
360+ ( ConsumerState :: Ready , Event :: Shutdown ) => ConsumerState :: Stopped ,
361+ ( ConsumerState :: Consuming ( handles, mut tpl) , Event :: Assign ( mut assigned) ) => {
362+ assert!(
363+ tpl. is_disjoint( & assigned) ,
364+ "Newly assigned TPL should be disjoint from TPL we're consuming from"
365+ ) ;
366+ tpl. append( & mut assigned) ;
367+ debug!(
368+ "{} additional topic partitions added after assignment" ,
369+ assigned. len( )
370+ ) ;
371+ handles. shutdown( CALLBACK_DURATION ) . await ;
372+ ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
373+ }
374+ ( ConsumerState :: Consuming ( handles, mut tpl) , Event :: Revoke ( revoked) ) => {
375+ assert!(
376+ tpl. is_subset( & revoked) ,
377+ "Revoked TPL should be a subset of TPL we're consuming from"
378+ ) ;
379+ tpl. retain( |e| !revoked. contains( e) ) ;
380+ debug!( "{} topic partitions remaining after revocation" , tpl. len( ) ) ;
381+ handles. shutdown( CALLBACK_DURATION ) . await ;
382+ if tpl. is_empty( ) {
383+ ConsumerState :: Ready
384+ } else {
385+ ConsumerState :: Consuming ( spawn_actors( consumer. clone( ) , & tpl) , tpl)
386+ }
387+ }
388+ ( ConsumerState :: Consuming ( handles, _) , Event :: Shutdown ) => {
389+ handles. shutdown( CALLBACK_DURATION ) . await ;
390+ debug!( "Signaling shutdown to client..." ) ;
391+ shutdown_client. take( ) ;
392+ ConsumerState :: Stopped
393+ }
394+ ( ConsumerState :: Stopped , _) => {
395+ unreachable!( "Got event after consumer has stopped" )
396+ }
373397 }
374398 }
375- ( ConsumerState :: Consuming ( actor_handles, _) , Event :: Shutdown ) => {
376- actor_handles. shutdown ( CALLBACK_DURATION ) . await ;
377- debug ! ( "Signaling shutdown to client..." ) ;
378- shutdown_client. take ( ) ;
379- ConsumerState :: Stopped
380- }
381- ( ConsumerState :: Stopped , _) => {
382- unreachable ! ( "Got event after consumer has stopped" )
383- }
384399 }
385400 }
386401 debug ! ( "Shutdown complete" ) ;
0 commit comments