Skip to content

Commit caef930

Browse files
committed
feat(kafka): Trigger graceful shutdown on actor failure
1 parent b221083 commit caef930

File tree

1 file changed

+63
-47
lines changed

1 file changed

+63
-47
lines changed

src/consumer/kafka.rs

Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::{
3232
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
3333
oneshot,
3434
},
35-
task::{self, JoinSet},
35+
task::{self, JoinError, JoinSet},
3636
time::{self, sleep, MissedTickBehavior},
3737
};
3838
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -75,7 +75,7 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
7575
let guard = elegant_departure::get_shutdown_guard();
7676
tokio::spawn(async move {
7777
let _ = guard.wait().await;
78-
info!("Cancellation token received, shutting down consumer");
78+
info!("Cancellation token received, shutting down consumer...");
7979
let (rendezvous_sender, _) = sync_channel(0);
8080
let _ = event_sender.send((Event::Shutdown, rendezvous_sender));
8181
});
@@ -189,6 +189,10 @@ impl ActorHandles {
189189
}
190190
}
191191
}
192+
193+
async fn join_next(&mut self) -> Option<Result<Result<(), Error>, JoinError>> {
194+
self.join_set.join_next().await
195+
}
192196
}
193197

194198
#[macro_export]
@@ -333,54 +337,66 @@ pub async fn handle_events(
333337
let mut state = ConsumerState::Ready;
334338

335339
while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state {
336-
let Some((event, _rendezvous_guard)) = events_stream.next().await else {
337-
unreachable!("Unexpected end to event stream")
338-
};
339-
info!("Received event: {:?}", event);
340-
state = match (state, event) {
341-
(ConsumerState::Ready, Event::Assign(tpl)) => {
342-
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
343-
}
344-
(ConsumerState::Ready, Event::Revoke(_)) => {
345-
unreachable!("Got partition revocation before the consumer has started")
346-
}
347-
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
348-
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Assign(mut assigned_tpl)) => {
349-
assert!(
350-
tpl.is_disjoint(&assigned_tpl),
351-
"Newly assigned TPL should be disjoint from TPL we're consuming from"
352-
);
353-
tpl.append(&mut assigned_tpl);
354-
debug!(
355-
"{} additional topic partitions added after assignment",
356-
assigned_tpl.len()
357-
);
358-
actor_handles.shutdown(CALLBACK_DURATION).await;
359-
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
340+
select! {
341+
res = match state {
342+
ConsumerState::Consuming(ref mut handles, _) => Either::Left(handles.join_next()),
343+
_ => Either::Right(future::pending::<_>()),
344+
} => {
345+
error!("Actor exited unexpectedly with {:?}, shutting down...", res);
346+
drop(elegant_departure::shutdown());
360347
}
361-
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Revoke(revoked_tpl)) => {
362-
assert!(
363-
tpl.is_subset(&revoked_tpl),
364-
"Revoked TPL should be a subset of TPL we're consuming from"
365-
);
366-
tpl.retain(|e| !revoked_tpl.contains(e));
367-
debug!("{} topic partitions remaining after revocation", tpl.len());
368-
actor_handles.shutdown(CALLBACK_DURATION).await;
369-
if tpl.is_empty() {
370-
ConsumerState::Ready
371-
} else {
372-
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
348+
349+
event = events_stream.next() => {
350+
let Some((event, _rendezvous_guard)) = event else {
351+
unreachable!("Unexpected end to event stream");
352+
};
353+
info!("Received event: {:?}", event);
354+
state = match (state, event) {
355+
(ConsumerState::Ready, Event::Assign(tpl)) => {
356+
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
357+
}
358+
(ConsumerState::Ready, Event::Revoke(_)) => {
359+
unreachable!("Got partition revocation before the consumer has started")
360+
}
361+
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
362+
(ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => {
363+
assert!(
364+
tpl.is_disjoint(&assigned),
365+
"Newly assigned TPL should be disjoint from TPL we're consuming from"
366+
);
367+
tpl.append(&mut assigned);
368+
debug!(
369+
"{} additional topic partitions added after assignment",
370+
assigned.len()
371+
);
372+
handles.shutdown(CALLBACK_DURATION).await;
373+
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
374+
}
375+
(ConsumerState::Consuming(handles, mut tpl), Event::Revoke(revoked)) => {
376+
assert!(
377+
tpl.is_subset(&revoked),
378+
"Revoked TPL should be a subset of TPL we're consuming from"
379+
);
380+
tpl.retain(|e| !revoked.contains(e));
381+
debug!("{} topic partitions remaining after revocation", tpl.len());
382+
handles.shutdown(CALLBACK_DURATION).await;
383+
if tpl.is_empty() {
384+
ConsumerState::Ready
385+
} else {
386+
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
387+
}
388+
}
389+
(ConsumerState::Consuming(handles, _), Event::Shutdown) => {
390+
handles.shutdown(CALLBACK_DURATION).await;
391+
debug!("Signaling shutdown to client...");
392+
shutdown_client.take();
393+
ConsumerState::Stopped
394+
}
395+
(ConsumerState::Stopped, _) => {
396+
unreachable!("Got event after consumer has stopped")
397+
}
373398
}
374399
}
375-
(ConsumerState::Consuming(actor_handles, _), Event::Shutdown) => {
376-
actor_handles.shutdown(CALLBACK_DURATION).await;
377-
debug!("Signaling shutdown to client...");
378-
shutdown_client.take();
379-
ConsumerState::Stopped
380-
}
381-
(ConsumerState::Stopped, _) => {
382-
unreachable!("Got event after consumer has stopped")
383-
}
384400
}
385401
}
386402
debug!("Shutdown complete");

0 commit comments

Comments
 (0)