From 5f3e9402af836508565a3c09e43ed39aba069389 Mon Sep 17 00:00:00 2001 From: John Yang Date: Wed, 4 Dec 2024 15:17:01 -0800 Subject: [PATCH] feat(kafka): Trigger graceful shutdown on actor failure --- src/consumer/kafka.rs | 110 ++++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 47 deletions(-) diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index 0c03e2ca..c63d5a59 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -32,7 +32,7 @@ use tokio::{ mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }, - task::{self, JoinSet}, + task::{self, JoinError, JoinSet}, time::{self, sleep, MissedTickBehavior}, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -75,7 +75,7 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>) let guard = elegant_departure::get_shutdown_guard(); tokio::spawn(async move { let _ = guard.wait().await; - info!("Cancellation token received, shutting down consumer"); + info!("Cancellation token received, shutting down consumer..."); let (rendezvous_sender, _) = sync_channel(0); let _ = event_sender.send((Event::Shutdown, rendezvous_sender)); }); @@ -189,6 +189,10 @@ impl ActorHandles { } } } + + async fn join_next(&mut self) -> Option, JoinError>> { + self.join_set.join_next().await + } } #[macro_export] @@ -333,54 +337,66 @@ pub async fn handle_events( let mut state = ConsumerState::Ready; while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state { - let Some((event, _rendezvous_guard)) = events_stream.next().await else { - unreachable!("Unexpected end to event stream") - }; - info!("Received event: {:?}", event); - state = match (state, event) { - (ConsumerState::Ready, Event::Assign(tpl)) => { - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) - } - (ConsumerState::Ready, Event::Revoke(_)) => { - unreachable!("Got partition revocation before the consumer has started") - } - (ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped, - (ConsumerState::Consuming(actor_handles, mut tpl), Event::Assign(mut assigned_tpl)) => { - assert!( - tpl.is_disjoint(&assigned_tpl), - "Newly assigned TPL should be disjoint from TPL we're consuming from" - ); - tpl.append(&mut assigned_tpl); - debug!( - "{} additional topic partitions added after assignment", - assigned_tpl.len() - ); - actor_handles.shutdown(CALLBACK_DURATION).await; - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) + select! { + res = match state { + ConsumerState::Consuming(ref mut handles, _) => Either::Left(handles.join_next()), + _ => Either::Right(future::pending::<_>()), + } => { + error!("Actor exited unexpectedly with {:?}, shutting down...", res); + drop(elegant_departure::shutdown()); } - (ConsumerState::Consuming(actor_handles, mut tpl), Event::Revoke(revoked_tpl)) => { - assert!( - tpl.is_subset(&revoked_tpl), - "Revoked TPL should be a subset of TPL we're consuming from" - ); - tpl.retain(|e| !revoked_tpl.contains(e)); - debug!("{} topic partitions remaining after revocation", tpl.len()); - actor_handles.shutdown(CALLBACK_DURATION).await; - if tpl.is_empty() { - ConsumerState::Ready - } else { - ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) + + event = events_stream.next() => { + let Some((event, _rendezvous_guard)) = event else { + unreachable!("Unexpected end to event stream"); + }; + info!("Received event: {:?}", event); + state = match (state, event) { + (ConsumerState::Ready, Event::Assign(assigned)) => { + ConsumerState::Consuming(spawn_actors(consumer.clone(), &assigned), assigned) + } + (ConsumerState::Ready, Event::Revoke(_)) => { + unreachable!("Got partition revocation before the consumer has started") + } + (ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped, + (ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => { + assert!( + tpl.is_disjoint(&assigned), + "Newly assigned TPL should be disjoint from TPL we're consuming from" + ); + tpl.append(&mut assigned); + debug!( + "{} additional topic partitions added after assignment", + assigned.len() + ); + handles.shutdown(CALLBACK_DURATION).await; + ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) + } + (ConsumerState::Consuming(handles, mut tpl), Event::Revoke(revoked)) => { + assert!( + tpl.is_subset(&revoked), + "Revoked TPL should be a subset of TPL we're consuming from" + ); + tpl.retain(|e| !revoked.contains(e)); + debug!("{} topic partitions remaining after revocation", tpl.len()); + handles.shutdown(CALLBACK_DURATION).await; + if tpl.is_empty() { + ConsumerState::Ready + } else { + ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) + } + } + (ConsumerState::Consuming(handles, _), Event::Shutdown) => { + handles.shutdown(CALLBACK_DURATION).await; + debug!("Signaling shutdown to client..."); + shutdown_client.take(); + ConsumerState::Stopped + } + (ConsumerState::Stopped, _) => { + unreachable!("Got event after consumer has stopped") + } } } - (ConsumerState::Consuming(actor_handles, _), Event::Shutdown) => { - actor_handles.shutdown(CALLBACK_DURATION).await; - debug!("Signaling shutdown to client..."); - shutdown_client.take(); - ConsumerState::Stopped - } - (ConsumerState::Stopped, _) => { - unreachable!("Got event after consumer has stopped") - } } } debug!("Shutdown complete");