Skip to content

Commit 70b900a

Browse files
committed
fix map actor shutdown logging
1 parent 31a5209 commit 70b900a

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

src/consumer/kafka.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ pub async fn map<T>(
424424
biased;
425425

426426
_ = shutdown.cancelled() => {
427+
debug!("Receive shutdown signal, shutting down...");
427428
break;
428429
}
429430

@@ -434,12 +435,16 @@ pub async fn map<T>(
434435
let msg = Arc::new(msg.detach()?);
435436
match transform(msg.clone()) {
436437
Ok(transformed) => {
437-
ok.send((
438-
iter::once(Arc::try_unwrap(msg).expect("msg should only have a single strong ref")),
438+
if ok.send((
439+
iter::once(
440+
Arc::try_unwrap(msg)
441+
.expect("msg should only have a single strong ref"),
442+
),
439443
transformed,
440-
))
441-
.await
442-
.map_err(|err| anyhow!("{}", err))?;
444+
)).await.is_err() {
445+
debug!("Receive half of ok channel is closed, shutting down...");
446+
break;
447+
}
443448
}
444449
Err(e) => {
445450
error!(

0 commit comments

Comments
 (0)