Skip to content

Commit 497a86c

Browse files
committed
refactor: readability
1 parent 5f9ec0c commit 497a86c

File tree

2 files changed

+20
-22
lines changed

2 files changed

+20
-22
lines changed

src/kafka.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -205,29 +205,27 @@ async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Resu
205205
}
206206

207207
pub async fn setup_integration() {
208-
tokio::task::spawn(async move {
209-
let (consumer, stream_name) = match setup_consumer() {
210-
Ok(c) => c,
211-
Err(err) => {
212-
match err {
213-
KafkaError::DoNotPrintError => {
214-
debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
215-
}
216-
_ => {
217-
error!("{err}");
218-
}
208+
let (consumer, stream_name) = match setup_consumer() {
209+
Ok(c) => c,
210+
Err(err) => {
211+
match err {
212+
KafkaError::DoNotPrintError => {
213+
debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
214+
}
215+
_ => {
216+
error!("{err}");
219217
}
220-
return;
221218
}
222-
};
219+
return;
220+
}
221+
};
223222

224-
info!("Setup kafka integration for {stream_name}");
225-
let mut stream = consumer.stream();
223+
info!("Setup kafka integration for {stream_name}");
224+
let mut stream = consumer.stream();
226225

227-
while let Ok(curr) = stream.next().await.unwrap() {
228-
if let Err(err) = ingest_message(&stream_name, curr).await {
229-
error!("Unable to ingest incoming kafka message- {err}"),
230-
}
226+
while let Ok(curr) = stream.next().await.unwrap() {
227+
if let Err(err) = ingest_message(&stream_name, curr).await {
228+
error!("Unable to ingest incoming kafka message- {err}")
231229
}
232-
});
230+
}
233231
}

src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ async fn main() -> anyhow::Result<()> {
4747
metadata.set_global();
4848

4949
// load kafka server
50-
if CONFIG.parseable.mode.ne(&Mode::Query) {
51-
kafka::setup_integration().await;
50+
if CONFIG.parseable.mode != Mode::Query {
51+
tokio::task::spawn(kafka::setup_integration());
5252
}
5353

5454
server.init().await?;

0 commit comments

Comments
 (0)