Skip to content

Commit 455693a

Browse files
committed
feat(kafka): Trigger graceful shutdown on actor failure
1 parent 7552a27 commit 455693a

File tree

1 file changed

+62
-47
lines changed

1 file changed

+62
-47
lines changed

src/consumer/kafka.rs

Lines changed: 62 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::{
3232
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
3333
oneshot,
3434
},
35-
task::{self, JoinSet},
35+
task::{self, JoinError, JoinSet},
3636
time::{self, sleep, MissedTickBehavior},
3737
};
3838
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -75,7 +75,7 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
7575
let guard = elegant_departure::get_shutdown_guard();
7676
tokio::spawn(async move {
7777
let _ = guard.wait().await;
78-
info!("Cancellation token received, shutting down consumer");
78+
info!("Cancellation token received, shutting down consumer...");
7979
let (rendezvous_sender, _) = sync_channel(0);
8080
let _ = event_sender.send((Event::Shutdown, rendezvous_sender));
8181
});
@@ -188,6 +188,10 @@ impl ActorHandles {
188188
}
189189
}
190190
}
191+
192+
async fn join_next(&mut self) -> Option<Result<Result<(), Error>, JoinError>> {
193+
self.join_set.join_next().await
194+
}
191195
}
192196

193197
#[macro_export]
@@ -332,54 +336,65 @@ pub async fn handle_events(
332336
let mut state = ConsumerState::Ready;
333337

334338
while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state {
335-
let Some((event, _rendezvous_guard)) = events_stream.next().await else {
336-
unreachable!("Unexpected end to event stream")
337-
};
338-
info!("Received event: {:?}", event);
339-
state = match (state, event) {
340-
(ConsumerState::Ready, Event::Assign(tpl)) => {
341-
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
342-
}
343-
(ConsumerState::Ready, Event::Revoke(_)) => {
344-
unreachable!("Got partition revocation before the consumer has started")
345-
}
346-
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
347-
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Assign(mut assigned_tpl)) => {
348-
assert!(
349-
tpl.is_disjoint(&assigned_tpl),
350-
"Newly assigned TPL should be disjoint from TPL we're consuming from"
351-
);
352-
tpl.append(&mut assigned_tpl);
353-
debug!(
354-
"{} additional topic partitions added after assignment",
355-
assigned_tpl.len()
356-
);
357-
actor_handles.shutdown(CALLBACK_DURATION).await;
358-
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
339+
select! {
340+
_ = match state {
341+
ConsumerState::Consuming(ref mut handles, _) => Either::Left(handles.join_next()),
342+
_ => Either::Right(future::pending::<_>())
343+
} => {
344+
error!("Actor exited unexpectedly, shutting down...");
345+
drop(elegant_departure::shutdown());
359346
}
360-
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Revoke(revoked_tpl)) => {
361-
assert!(
362-
tpl.is_subset(&revoked_tpl),
363-
"Revoked TPL should be a subset of TPL we're consuming from"
364-
);
365-
tpl.retain(|e| !revoked_tpl.contains(e));
366-
debug!("{} topic partitions remaining after revocation", tpl.len());
367-
actor_handles.shutdown(CALLBACK_DURATION).await;
368-
if tpl.is_empty() {
369-
ConsumerState::Ready
370-
} else {
371-
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
347+
event = events_stream.next() => {
348+
let Some((event, _rendezvous_guard)) = event else {
349+
unreachable!("Unexpected end to event stream");
350+
};
351+
info!("Received event: {:?}", event);
352+
state = match (state, event) {
353+
(ConsumerState::Ready, Event::Assign(tpl)) => {
354+
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
355+
}
356+
(ConsumerState::Ready, Event::Revoke(_)) => {
357+
unreachable!("Got partition revocation before the consumer has started")
358+
}
359+
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
360+
(ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => {
361+
assert!(
362+
tpl.is_disjoint(&assigned),
363+
"Newly assigned TPL should be disjoint from TPL we're consuming from"
364+
);
365+
tpl.append(&mut assigned);
366+
debug!(
367+
"{} additional topic partitions added after assignment",
368+
assigned.len()
369+
);
370+
handles.shutdown(CALLBACK_DURATION).await;
371+
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
372+
}
373+
(ConsumerState::Consuming(handles, mut tpl), Event::Revoke(revoked_tpl)) => {
374+
assert!(
375+
tpl.is_subset(&revoked_tpl),
376+
"Revoked TPL should be a subset of TPL we're consuming from"
377+
);
378+
tpl.retain(|e| !revoked_tpl.contains(e));
379+
debug!("{} topic partitions remaining after revocation", tpl.len());
380+
handles.shutdown(CALLBACK_DURATION).await;
381+
if tpl.is_empty() {
382+
ConsumerState::Ready
383+
} else {
384+
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
385+
}
386+
}
387+
(ConsumerState::Consuming(handles, _), Event::Shutdown) => {
388+
handles.shutdown(CALLBACK_DURATION).await;
389+
debug!("Signaling shutdown to client...");
390+
shutdown_client.take();
391+
ConsumerState::Stopped
392+
}
393+
(ConsumerState::Stopped, _) => {
394+
unreachable!("Got event after consumer has stopped")
395+
}
372396
}
373397
}
374-
(ConsumerState::Consuming(actor_handles, _), Event::Shutdown) => {
375-
actor_handles.shutdown(CALLBACK_DURATION).await;
376-
debug!("Signaling shutdown to client...");
377-
shutdown_client.take();
378-
ConsumerState::Stopped
379-
}
380-
(ConsumerState::Stopped, _) => {
381-
unreachable!("Got event after consumer has stopped")
382-
}
383398
}
384399
}
385400
debug!("Shutdown complete");

0 commit comments

Comments
 (0)