Skip to content

Commit 552a338

Browse files
committed
fix(consumer): Fix green thread deadlock when running consumer in green thread
1 parent ec2cda5 commit 552a338

File tree

2 files changed

+17
-14
lines changed

2 files changed

+17
-14
lines changed

src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl Default for Config {
7676
Self {
7777
sentry_dsn: None,
7878
sentry_env: None,
79-
log_level: LogLevel::Info,
79+
log_level: LogLevel::Debug,
8080
log_format: LogFormat::Text,
8181
grpc_port: 50051,
8282
statsd_addr: "127.0.0.1:8126".parse().unwrap(),
@@ -159,7 +159,7 @@ mod tests {
159159
};
160160
assert_eq!(config.sentry_dsn, None);
161161
assert_eq!(config.sentry_env, None);
162-
assert_eq!(config.log_level, LogLevel::Info);
162+
assert_eq!(config.log_level, LogLevel::Debug);
163163
assert_eq!(config.log_format, LogFormat::Text);
164164
assert_eq!(config.grpc_port, 50051);
165165
assert_eq!(config.kafka_topic, "task-worker");

src/consumer/kafka.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::{anyhow, Error};
22
use futures::{
3+
executor::block_on,
34
future::{self},
45
pin_mut, Stream, StreamExt,
56
};
@@ -30,7 +31,7 @@ use tokio::{
3031
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
3132
oneshot,
3233
},
33-
task::JoinSet,
34+
task::{spawn_blocking, JoinSet},
3435
time::{self, sleep, MissedTickBehavior},
3536
};
3637
use tokio_stream::wrappers::UnboundedReceiverStream;
@@ -84,18 +85,20 @@ pub fn handle_consumer_client(
8485
consumer: Arc<StreamConsumer<KafkaContext>>,
8586
shutdown: oneshot::Receiver<()>,
8687
) {
87-
tokio::spawn(async move {
88-
select! {
89-
biased;
90-
_ = shutdown => {
91-
debug!("Received shutdown signal, commiting state in sync mode...");
92-
let _ = consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync);
93-
}
94-
msg = consumer.recv() => {
95-
error!("Got unexpected message from consumer client: {:?}", msg);
88+
spawn_blocking(|| {
89+
block_on(async move {
90+
select! {
91+
biased;
92+
_ = shutdown => {
93+
debug!("Received shutdown signal, commiting state in sync mode...");
94+
let _ = consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync);
95+
}
96+
msg = consumer.recv() => {
97+
error!("Got unexpected message from consumer client: {:?}", msg);
98+
}
9699
}
97-
}
98-
debug!("Shutdown complete");
100+
debug!("Shutdown complete");
101+
})
99102
});
100103
}
101104

0 commit comments

Comments
 (0)