diff --git a/src/config.rs b/src/config.rs index 61fb0c9c..8dcb4753 100644 --- a/src/config.rs +++ b/src/config.rs @@ -76,7 +76,7 @@ impl Default for Config { Self { sentry_dsn: None, sentry_env: None, - log_level: LogLevel::Info, + log_level: LogLevel::Debug, log_format: LogFormat::Text, grpc_port: 50051, statsd_addr: "127.0.0.1:8126".parse().unwrap(), @@ -159,7 +159,7 @@ mod tests { }; assert_eq!(config.sentry_dsn, None); assert_eq!(config.sentry_env, None); - assert_eq!(config.log_level, LogLevel::Info); + assert_eq!(config.log_level, LogLevel::Debug); assert_eq!(config.log_format, LogFormat::Text); assert_eq!(config.grpc_port, 50051); assert_eq!(config.kafka_topic, "task-worker"); diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index 37a71351..0347cfaa 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -25,12 +25,13 @@ use std::{ time::Duration, }; use tokio::{ + runtime::Handle, select, signal, sync::{ mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }, - task::JoinSet, + task::{self, JoinSet}, time::{self, sleep, MissedTickBehavior}, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -84,18 +85,20 @@ pub fn handle_consumer_client( consumer: Arc>, shutdown: oneshot::Receiver<()>, ) { - tokio::spawn(async move { - select! { - biased; - _ = shutdown => { - debug!("Received shutdown signal, commiting state in sync mode..."); - let _ = consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync); - } - msg = consumer.recv() => { - error!("Got unexpected message from consumer client: {:?}", msg); + task::spawn_blocking(|| { + Handle::current().block_on(async move { + select! { + biased; + _ = shutdown => { + debug!("Received shutdown signal, commiting state in sync mode..."); + let _ = consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync); + } + msg = consumer.recv() => { + error!("Got unexpected message from consumer client: {:?}", msg); + } } - } - debug!("Shutdown complete"); + debug!("Shutdown complete"); + }); }); }