Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 63 additions & 47 deletions src/consumer/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
task::{self, JoinSet},
task::{self, JoinError, JoinSet},
time::{self, sleep, MissedTickBehavior},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
let guard = elegant_departure::get_shutdown_guard();
tokio::spawn(async move {
let _ = guard.wait().await;
info!("Cancellation token received, shutting down consumer");
info!("Cancellation token received, shutting down consumer...");
let (rendezvous_sender, _) = sync_channel(0);
let _ = event_sender.send((Event::Shutdown, rendezvous_sender));
});
Expand Down Expand Up @@ -189,6 +189,10 @@ impl ActorHandles {
}
}
}

async fn join_next(&mut self) -> Option<Result<Result<(), Error>, JoinError>> {
self.join_set.join_next().await
}
}

#[macro_export]
Expand Down Expand Up @@ -333,54 +337,66 @@ pub async fn handle_events(
let mut state = ConsumerState::Ready;

while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state {
let Some((event, _rendezvous_guard)) = events_stream.next().await else {
unreachable!("Unexpected end to event stream")
};
info!("Received event: {:?}", event);
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
unreachable!("Got partition revocation before the consumer has started")
}
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Assign(mut assigned_tpl)) => {
assert!(
tpl.is_disjoint(&assigned_tpl),
"Newly assigned TPL should be disjoint from TPL we're consuming from"
);
tpl.append(&mut assigned_tpl);
debug!(
"{} additional topic partitions added after assignment",
assigned_tpl.len()
);
actor_handles.shutdown(CALLBACK_DURATION).await;
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
select! {
res = match state {
ConsumerState::Consuming(ref mut handles, _) => Either::Left(handles.join_next()),
_ => Either::Right(future::pending::<_>()),
} => {
error!("Actor exited unexpectedly with {:?}, shutting down...", res);
drop(elegant_departure::shutdown());
}
(ConsumerState::Consuming(actor_handles, mut tpl), Event::Revoke(revoked_tpl)) => {
assert!(
tpl.is_subset(&revoked_tpl),
"Revoked TPL should be a subset of TPL we're consuming from"
);
tpl.retain(|e| !revoked_tpl.contains(e));
debug!("{} topic partitions remaining after revocation", tpl.len());
actor_handles.shutdown(CALLBACK_DURATION).await;
if tpl.is_empty() {
ConsumerState::Ready
} else {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)

event = events_stream.next() => {
let Some((event, _rendezvous_guard)) = event else {
unreachable!("Unexpected end to event stream");
};
info!("Received event: {:?}", event);
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(assigned)) => {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &assigned), assigned)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
unreachable!("Got partition revocation before the consumer has started")
}
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
(ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => {
assert!(
tpl.is_disjoint(&assigned),
"Newly assigned TPL should be disjoint from TPL we're consuming from"
);
tpl.append(&mut assigned);
debug!(
"{} additional topic partitions added after assignment",
assigned.len()
);
handles.shutdown(CALLBACK_DURATION).await;
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Consuming(handles, mut tpl), Event::Revoke(revoked)) => {
assert!(
tpl.is_subset(&revoked),
"Revoked TPL should be a subset of TPL we're consuming from"
);
tpl.retain(|e| !revoked.contains(e));
debug!("{} topic partitions remaining after revocation", tpl.len());
handles.shutdown(CALLBACK_DURATION).await;
if tpl.is_empty() {
ConsumerState::Ready
} else {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
}
(ConsumerState::Consuming(handles, _), Event::Shutdown) => {
handles.shutdown(CALLBACK_DURATION).await;
debug!("Signaling shutdown to client...");
shutdown_client.take();
ConsumerState::Stopped
}
(ConsumerState::Stopped, _) => {
unreachable!("Got event after consumer has stopped")
}
}
}
(ConsumerState::Consuming(actor_handles, _), Event::Shutdown) => {
actor_handles.shutdown(CALLBACK_DURATION).await;
debug!("Signaling shutdown to client...");
shutdown_client.take();
ConsumerState::Stopped
}
(ConsumerState::Stopped, _) => {
unreachable!("Got event after consumer has stopped")
}
}
}
debug!("Shutdown complete");
Expand Down
Loading