Skip to content

Commit f16e83b

Browse files
committed
basic producer test works.
1 parent 92711d4 commit f16e83b

File tree

8 files changed

+163
-84
lines changed

8 files changed

+163
-84
lines changed

tests/admin.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rdkafka::admin::{AdminOptions, NewTopic, TopicReplication};
1+
use crate::utils::admin::create_topic;
22
use crate::utils::containers::KafkaContext;
33
use crate::utils::logging::init_test_logger;
44
use crate::utils::rand::rand_test_topic;
@@ -20,29 +20,20 @@ pub async fn test_topic_creation() {
2020
};
2121
let test_topic_name = rand_test_topic("testing-topic");
2222

23-
let admin_client_result = utils::admin::create_admin(&kafka_context.bootstrap_servers).await;
23+
let admin_client_result =
24+
utils::admin::create_admin_client(&kafka_context.bootstrap_servers).await;
2425
let Ok(admin_client) = admin_client_result else {
2526
panic!(
2627
"could not create admin client: {}",
2728
admin_client_result.unwrap_err()
2829
);
2930
};
3031

31-
let new_topic = NewTopic::new(&test_topic_name, 1, TopicReplication::Fixed(1));
32-
let admin_opts = AdminOptions::new();
33-
let create_topics_result = admin_client
34-
.create_topics(vec![&new_topic], &admin_opts)
35-
.await;
36-
let Ok(create_topics) = create_topics_result else {
32+
let create_topic_result = create_topic(&admin_client, &test_topic_name).await;
33+
if create_topic_result.is_err() {
3734
panic!(
38-
"could not create new topics: {}",
39-
create_topics_result.unwrap_err()
35+
"could not create topic: {}",
36+
create_topic_result.unwrap_err()
4037
);
4138
};
42-
43-
for topic_result in create_topics {
44-
if topic_result.is_err() {
45-
panic!("failed to create topic: {:?}", topic_result.unwrap_err());
46-
};
47-
}
4839
}

tests/producer.rs

Lines changed: 33 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,19 @@
1-
use crate::utils::consumer;
1+
use crate::utils::admin::{create_admin_client, create_topic};
2+
use crate::utils::consumer::{create_consumer, poll_x_times_for_messages};
23
use crate::utils::containers::KafkaContext;
34
use crate::utils::logging::init_test_logger;
4-
use crate::utils::producer::create_base_producer;
5-
use crate::utils::rand::{rand_test_group, rand_test_topic};
6-
use anyhow::Context;
7-
use rdkafka::config::FromClientConfig;
8-
use rdkafka::consumer::BaseConsumer;
9-
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
10-
use rdkafka::util::Timeout;
11-
use rdkafka::{ClientConfig, Message};
12-
use std::time::Duration;
5+
use crate::utils::producer::{create_producer, poll_and_flush};
6+
use crate::utils::rand::rand_test_topic;
7+
use rdkafka::producer::BaseRecord;
8+
use rdkafka::Message;
139

10+
#[path = "utils/mod.rs"]
1411
mod utils;
1512

1613
#[tokio::test]
1714
pub async fn test_basic_produce() {
1815
init_test_logger();
1916

20-
// Get Kafka container context.
2117
let kafka_context_result = KafkaContext::shared().await;
2218
let Ok(kafka_context) = kafka_context_result else {
2319
panic!(
@@ -27,13 +23,29 @@ pub async fn test_basic_produce() {
2723
};
2824
let test_topic_name = rand_test_topic("testing-topic");
2925

26+
let admin_client_result = create_admin_client(&kafka_context.bootstrap_servers).await;
27+
let Ok(admin_client) = admin_client_result else {
28+
panic!(
29+
"could not create admin client: {}",
30+
admin_client_result.unwrap_err()
31+
);
32+
};
33+
let create_topic_result = create_topic(&admin_client, &test_topic_name).await;
34+
if create_topic_result.is_err() {
35+
panic!(
36+
"could not create topic: {}",
37+
create_topic_result.unwrap_err()
38+
);
39+
}
40+
3041
let consumer_result = create_consumer(&kafka_context.bootstrap_servers, &test_topic_name).await;
3142
let Ok(consumer) = consumer_result else {
3243
panic!(
3344
"could not create consumer: {}",
3445
consumer_result.unwrap_err()
3546
);
3647
};
48+
3749
let create_producer_result = create_producer(&kafka_context.bootstrap_servers).await;
3850
let Ok(base_producer) = create_producer_result else {
3951
panic!(
@@ -50,20 +62,19 @@ pub async fn test_basic_produce() {
5062
if send_result.is_err() {
5163
panic!("could not produce record: {:?}", send_result.unwrap_err());
5264
}
53-
let flush_result = base_producer.flush(Timeout::After(Duration::from_secs(10)));
54-
if let Err(flush_error) = flush_result {
55-
panic!("timed out waiting for producer flush: {}", flush_error);
65+
if poll_and_flush(&base_producer).is_err() {
66+
panic!("could not poll and flush base producer")
5667
}
5768

58-
let Some(next_message_result) = consumer.poll(Duration::from_secs(2)) else {
59-
panic!("there is no next message on the topic: {}", test_topic_name);
60-
};
61-
let Ok(borrowed_next_message) = next_message_result else {
62-
panic!(
63-
"could not get next message from based_consumer: {}",
64-
next_message_result.unwrap_err()
65-
);
69+
let messages_result = poll_x_times_for_messages(&consumer, 10).await;
70+
let Ok(messages) = messages_result else {
71+
panic!("could not get messages from consumer");
6672
};
73+
if messages.len() != 1 {
74+
panic!("expected exactly one message");
75+
}
76+
let borrowed_next_message = messages.get(0).unwrap();
77+
6778
let owned_next_message = borrowed_next_message.detach();
6879
let Some(message_payload) = owned_next_message.payload() else {
6980
panic!("message payload is empty");
@@ -75,42 +86,3 @@ pub async fn test_basic_produce() {
7586

7687
assert!(message_string.contains("content"));
7788
}
78-
79-
async fn create_consumer(
80-
bootstrap_servers: &str,
81-
test_topic: &str,
82-
) -> anyhow::Result<BaseConsumer> {
83-
let mut consumer_client_config = ClientConfig::default();
84-
consumer_client_config.set("group.id", rand_test_group());
85-
consumer_client_config.set("client.id", "rdkafka_integration_test_client");
86-
consumer_client_config.set("bootstrap.servers", bootstrap_servers);
87-
consumer_client_config.set("enable.partition.eof", "false");
88-
consumer_client_config.set("session.timeout.ms", "6000");
89-
consumer_client_config.set("enable.auto.commit", "false");
90-
consumer_client_config.set("debug", "all");
91-
consumer_client_config.set("auto.offset.reset", "earliest");
92-
93-
let base_consumer_result =
94-
consumer::create_subscribed_base(consumer_client_config, &[&test_topic]).await;
95-
let Ok(base_consumer) = base_consumer_result else {
96-
panic!(
97-
"could not create base consumer: {}",
98-
base_consumer_result.unwrap_err()
99-
)
100-
};
101-
102-
Ok(base_consumer)
103-
}
104-
105-
async fn create_producer(bootstrap_servers: &str) -> anyhow::Result<BaseProducer> {
106-
let mut producer_client_config = ClientConfig::default();
107-
producer_client_config.set("bootstrap.servers", bootstrap_servers);
108-
let base_producer_result = create_base_producer(&producer_client_config);
109-
let Ok(base_producer) = base_producer_result else {
110-
panic!(
111-
"could not create based_producer: {}",
112-
base_producer_result.unwrap_err()
113-
);
114-
};
115-
Ok(base_producer)
116-
}

tests/test_low_consumers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use rdkafka::{ClientConfig, Message, Timestamp};
1616
use crate::utils::rand::*;
1717
use crate::utils::*;
1818

19-
mod admin;
2019
mod utils;
2120

2221
fn create_base_consumer(

tests/utils/admin.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,34 @@
1-
use anyhow::Context;
2-
use rdkafka::admin::AdminClient;
1+
use anyhow::{bail, Context};
2+
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
33
use rdkafka::client::DefaultClientContext;
44
use rdkafka::config::FromClientConfig;
55
use rdkafka::ClientConfig;
66

7-
pub async fn create_admin(
7+
pub async fn create_admin_client(
88
bootstrap_servers: &str,
99
) -> anyhow::Result<AdminClient<DefaultClientContext>> {
1010
let mut admin_client_config = ClientConfig::default();
1111
admin_client_config.set("bootstrap.servers", bootstrap_servers);
1212
AdminClient::from_config(&admin_client_config).context("error creating admin client")
1313
}
14+
15+
pub async fn create_topic(
16+
admin_client: &AdminClient<DefaultClientContext>,
17+
topic_name: &'_ str,
18+
) -> anyhow::Result<String> {
19+
let topic_results = admin_client
20+
.create_topics(&new_topic_vec(&topic_name), &AdminOptions::default())
21+
.await
22+
.context("error creating topics")?;
23+
for topic_result in topic_results {
24+
if topic_result.is_err() {
25+
bail!("failed to create topic: {:?}", topic_result.unwrap_err());
26+
};
27+
}
28+
Ok(topic_name.to_string())
29+
}
30+
31+
pub fn new_topic_vec(topic_name: &'_ str) -> Vec<NewTopic<'_>> {
32+
let new_topic = NewTopic::new(&topic_name, 1, TopicReplication::Fixed(1));
33+
vec![new_topic]
34+
}

tests/utils/consumer.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,37 @@
1+
use crate::utils::consumer;
2+
use crate::utils::rand::rand_test_group;
13
use anyhow::Context;
24
use rdkafka::config::FromClientConfig;
35
use rdkafka::consumer::{BaseConsumer, Consumer};
6+
use rdkafka::message::BorrowedMessage;
47
use rdkafka::ClientConfig;
8+
use std::time::Duration;
9+
10+
pub async fn create_consumer(
11+
bootstrap_servers: &str,
12+
test_topic: &str,
13+
) -> anyhow::Result<BaseConsumer> {
14+
let mut consumer_client_config = ClientConfig::default();
15+
consumer_client_config.set("group.id", rand_test_group());
16+
consumer_client_config.set("client.id", "rdkafka_integration_test_client");
17+
consumer_client_config.set("bootstrap.servers", bootstrap_servers);
18+
consumer_client_config.set("enable.partition.eof", "false");
19+
consumer_client_config.set("session.timeout.ms", "6000");
20+
consumer_client_config.set("enable.auto.commit", "false");
21+
consumer_client_config.set("debug", "all");
22+
consumer_client_config.set("auto.offset.reset", "earliest");
23+
24+
let base_consumer_result =
25+
consumer::create_subscribed_base(consumer_client_config, &[&test_topic]).await;
26+
let Ok(base_consumer) = base_consumer_result else {
27+
panic!(
28+
"could not create base consumer: {}",
29+
base_consumer_result.unwrap_err()
30+
)
31+
};
32+
33+
Ok(base_consumer)
34+
}
535

636
pub async fn create_subscribed_base(
737
client_config: ClientConfig,
@@ -15,3 +45,27 @@ pub async fn create_subscribed_base(
1545

1646
Ok(base_consumer)
1747
}
48+
49+
pub async fn poll_x_times_for_messages(
50+
consumer: &BaseConsumer,
51+
times_to_poll: i32,
52+
) -> anyhow::Result<Vec<BorrowedMessage<'_>>> {
53+
let mut borrowed_messages: Vec<BorrowedMessage> = Vec::new();
54+
55+
for _ in 0..times_to_poll {
56+
let Some(next_message_result) = consumer.poll(Duration::from_secs(2)) else {
57+
continue;
58+
};
59+
60+
let Ok(borrowed_next_message) = next_message_result else {
61+
panic!(
62+
"could not get next message from based_consumer: {}",
63+
next_message_result.unwrap_err()
64+
);
65+
};
66+
borrowed_messages.push(borrowed_next_message);
67+
tokio::time::sleep(Duration::from_millis(100)).await;
68+
}
69+
70+
Ok(borrowed_messages)
71+
}

tests/utils/containers.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@ impl KafkaContext {
2828
.context("Failed to initialize Kafka shared instance")
2929
.map(Arc::clone)
3030
}
31+
32+
pub async fn std_out(&self) -> anyhow::Result<String> {
33+
let std_out_byte_vec = self
34+
.kafka_node
35+
.stdout_to_vec()
36+
.await
37+
.context("Failed to get stdout")?;
38+
Ok(String::from_utf8(std_out_byte_vec)?)
39+
}
40+
41+
pub async fn std_err(&self) -> anyhow::Result<String> {
42+
let std_err_byte_vec = self
43+
.kafka_node
44+
.stderr_to_vec()
45+
.await
46+
.context("Failed to get stderr")?;
47+
Ok(String::from_utf8(std_err_byte_vec)?)
48+
}
3149
}
3250

3351
async fn init() -> anyhow::Result<Arc<KafkaContext>> {

tests/utils/producer.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
1+
use anyhow::Context;
12
use rdkafka::config::FromClientConfig;
2-
use rdkafka::producer::BaseProducer;
3+
use rdkafka::producer::{BaseProducer, Producer};
4+
use rdkafka::util::Timeout;
35
use rdkafka::ClientConfig;
6+
use std::time::Duration;
7+
8+
pub async fn create_producer(bootstrap_servers: &str) -> anyhow::Result<BaseProducer> {
9+
let mut producer_client_config = ClientConfig::default();
10+
producer_client_config.set("bootstrap.servers", bootstrap_servers);
11+
let base_producer_result = create_base_producer(&producer_client_config);
12+
let Ok(base_producer) = base_producer_result else {
13+
panic!(
14+
"could not create based_producer: {}",
15+
base_producer_result.unwrap_err()
16+
);
17+
};
18+
Ok(base_producer)
19+
}
420

521
pub fn create_base_producer(config: &ClientConfig) -> anyhow::Result<BaseProducer> {
622
let base_producer_result = BaseProducer::from_config(config);
@@ -10,6 +26,14 @@ pub fn create_base_producer(config: &ClientConfig) -> anyhow::Result<BaseProduce
1026
base_producer_result.unwrap_err()
1127
)
1228
};
13-
1429
Ok(base_producer)
1530
}
31+
32+
pub fn poll_and_flush(base_producer: &BaseProducer) -> anyhow::Result<()> {
33+
for _ in 0..5 {
34+
base_producer.poll(Duration::from_millis(100));
35+
}
36+
base_producer
37+
.flush(Timeout::After(Duration::from_secs(10)))
38+
.context("flush failed")
39+
}

tests/utils/rand.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use rand::distr::{Alphanumeric, SampleString};
22

33
pub fn rand_test_topic(test_name: &str) -> String {
44
let id = Alphanumeric.sample_string(&mut rand::rng(), 10);
5-
format!("__{}_{}", test_name, id)
5+
format!("{}_{}", test_name, id)
66
}
77

88
pub fn rand_test_group() -> String {

0 commit comments

Comments
 (0)