Skip to content

Commit b6d9bc9

Browse files
committed
clear up
1 parent f16e83b commit b6d9bc9

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

tests/producer.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::utils::admin::{create_admin_client, create_topic};
22
use crate::utils::consumer::{create_consumer, poll_x_times_for_messages};
33
use crate::utils::containers::KafkaContext;
44
use crate::utils::logging::init_test_logger;
5-
use crate::utils::producer::{create_producer, poll_and_flush};
5+
use crate::utils::producer::{create_producer, send_record};
66
use crate::utils::rand::rand_test_topic;
77
use rdkafka::producer::BaseRecord;
88
use rdkafka::Message;
@@ -57,13 +57,9 @@ pub async fn test_basic_produce() {
5757
let record = BaseRecord::to(&test_topic_name) // destination topic
5858
.key(&[1, 2, 3, 4]) // message key
5959
.payload("content"); // message payload
60-
61-
let send_result = base_producer.send(record);
62-
if send_result.is_err() {
63-
panic!("could not produce record: {:?}", send_result.unwrap_err());
64-
}
65-
if poll_and_flush(&base_producer).is_err() {
66-
panic!("could not poll and flush base producer")
60+
let send_record_result = send_record(&base_producer, record).await;
61+
if send_record_result.is_err() {
62+
panic!("could not send record: {}", send_record_result.unwrap_err());
6763
}
6864

6965
let messages_result = poll_x_times_for_messages(&consumer, 10).await;

tests/utils/producer.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use anyhow::Context;
1+
use anyhow::{bail, Context};
22
use rdkafka::config::FromClientConfig;
3-
use rdkafka::producer::{BaseProducer, Producer};
3+
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
44
use rdkafka::util::Timeout;
55
use rdkafka::ClientConfig;
66
use std::time::Duration;
@@ -29,6 +29,21 @@ pub fn create_base_producer(config: &ClientConfig) -> anyhow::Result<BaseProduce
2929
Ok(base_producer)
3030
}
3131

32+
pub async fn send_record(
33+
producer: &BaseProducer,
34+
record: BaseRecord<'_, [u8; 4], str>,
35+
) -> anyhow::Result<()> {
36+
let send_result = producer.send(record);
37+
if send_result.is_err() {
38+
bail!("could not produce record: {:?}", send_result.unwrap_err());
39+
}
40+
if poll_and_flush(&producer).is_err() {
41+
bail!("could not poll and flush base producer")
42+
};
43+
44+
Ok(())
45+
}
46+
3247
pub fn poll_and_flush(base_producer: &BaseProducer) -> anyhow::Result<()> {
3348
for _ in 0..5 {
3449
base_producer.poll(Duration::from_millis(100));

0 commit comments

Comments
 (0)