Skip to content

Commit 92711d4

Browse files
committed
admin. test topic creation.
1 parent 1cf7042 commit 92711d4

File tree

6 files changed

+125
-57
lines changed

6 files changed

+125
-57
lines changed

tests/admin.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use rdkafka::admin::{AdminOptions, NewTopic, TopicReplication};
2+
use crate::utils::containers::KafkaContext;
3+
use crate::utils::logging::init_test_logger;
4+
use crate::utils::rand::rand_test_topic;
5+
6+
#[path = "utils/mod.rs"]
7+
mod utils;
8+
9+
#[tokio::test]
10+
pub async fn test_topic_creation() {
11+
init_test_logger();
12+
13+
// Get Kafka container context.
14+
let kafka_context_result = KafkaContext::shared().await;
15+
let Ok(kafka_context) = kafka_context_result else {
16+
panic!(
17+
"could not create kafka context: {}",
18+
kafka_context_result.unwrap_err()
19+
);
20+
};
21+
let test_topic_name = rand_test_topic("testing-topic");
22+
23+
let admin_client_result = utils::admin::create_admin(&kafka_context.bootstrap_servers).await;
24+
let Ok(admin_client) = admin_client_result else {
25+
panic!(
26+
"could not create admin client: {}",
27+
admin_client_result.unwrap_err()
28+
);
29+
};
30+
31+
let new_topic = NewTopic::new(&test_topic_name, 1, TopicReplication::Fixed(1));
32+
let admin_opts = AdminOptions::new();
33+
let create_topics_result = admin_client
34+
.create_topics(vec![&new_topic], &admin_opts)
35+
.await;
36+
let Ok(create_topics) = create_topics_result else {
37+
panic!(
38+
"could not create new topics: {}",
39+
create_topics_result.unwrap_err()
40+
);
41+
};
42+
43+
for topic_result in create_topics {
44+
if topic_result.is_err() {
45+
panic!("failed to create topic: {:?}", topic_result.unwrap_err());
46+
};
47+
}
48+
}

tests/producer.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use crate::utils::containers::KafkaContext;
33
use crate::utils::logging::init_test_logger;
44
use crate::utils::producer::create_base_producer;
55
use crate::utils::rand::{rand_test_group, rand_test_topic};
6-
use rdkafka::consumer::{BaseConsumer, Consumer};
7-
use rdkafka::error::KafkaError;
8-
use rdkafka::producer::{BaseRecord, Producer};
6+
use anyhow::Context;
7+
use rdkafka::config::FromClientConfig;
8+
use rdkafka::consumer::BaseConsumer;
9+
use rdkafka::producer::{BaseProducer, BaseRecord, Producer};
910
use rdkafka::util::Timeout;
1011
use rdkafka::{ClientConfig, Message};
1112
use std::time::Duration;
@@ -16,52 +17,46 @@ mod utils;
1617
pub async fn test_basic_produce() {
1718
init_test_logger();
1819

19-
let kafka_context_result = KafkaContext::new().await;
20-
let Ok(_kafka_context) = kafka_context_result else {
20+
// Get Kafka container context.
21+
let kafka_context_result = KafkaContext::shared().await;
22+
let Ok(kafka_context) = kafka_context_result else {
2123
panic!(
2224
"could not create kafka context: {}",
2325
kafka_context_result.unwrap_err()
2426
);
2527
};
28+
let test_topic_name = rand_test_topic("testing-topic");
2629

27-
let bootstrap_servers_result = _kafka_context.bootstrap_servers().await;
28-
let Ok(bootstrap_servers) = bootstrap_servers_result else {
30+
let consumer_result = create_consumer(&kafka_context.bootstrap_servers, &test_topic_name).await;
31+
let Ok(consumer) = consumer_result else {
2932
panic!(
30-
"could not create bootstrap servers: {}",
31-
bootstrap_servers_result.unwrap_err()
33+
"could not create consumer: {}",
34+
consumer_result.unwrap_err()
3235
);
3336
};
34-
let test_topic = rand_test_topic("testing-topic");
35-
36-
create_consumer(bootstrap_servers, test_topic);
37-
38-
let mut producer_client_config = ClientConfig::default();
39-
producer_client_config.set("bootstrap.servers", &bootstrap_servers);
40-
41-
let base_producer_result = create_base_producer(&producer_client_config);
42-
let Ok(base_producer) = base_producer_result else {
37+
let create_producer_result = create_producer(&kafka_context.bootstrap_servers).await;
38+
let Ok(base_producer) = create_producer_result else {
4339
panic!(
44-
"could not create based_producer: {}",
45-
base_producer_result.unwrap_err()
40+
"could not create base producer: {}",
41+
create_producer_result.unwrap_err()
4642
);
4743
};
4844

49-
let record = BaseRecord::to(&test_topic) // destination topic
45+
let record = BaseRecord::to(&test_topic_name) // destination topic
5046
.key(&[1, 2, 3, 4]) // message key
5147
.payload("content"); // message payload
5248

5349
let send_result = base_producer.send(record);
5450
if send_result.is_err() {
5551
panic!("could not produce record: {:?}", send_result.unwrap_err());
5652
}
57-
5853
let flush_result = base_producer.flush(Timeout::After(Duration::from_secs(10)));
5954
if let Err(flush_error) = flush_result {
6055
panic!("timed out waiting for producer flush: {}", flush_error);
6156
}
6257

63-
let Some(next_message_result) = base_consumer.poll(Duration::from_secs(2)) else {
64-
panic!("there is no next message on the topic: {}", test_topic);
58+
let Some(next_message_result) = consumer.poll(Duration::from_secs(2)) else {
59+
panic!("there is no next message on the topic: {}", test_topic_name);
6560
};
6661
let Ok(borrowed_next_message) = next_message_result else {
6762
panic!(
@@ -82,13 +77,13 @@ pub async fn test_basic_produce() {
8277
}
8378

8479
async fn create_consumer(
85-
bootstrap_servers: String,
86-
test_topic: String,
80+
bootstrap_servers: &str,
81+
test_topic: &str,
8782
) -> anyhow::Result<BaseConsumer> {
8883
let mut consumer_client_config = ClientConfig::default();
8984
consumer_client_config.set("group.id", rand_test_group());
9085
consumer_client_config.set("client.id", "rdkafka_integration_test_client");
91-
consumer_client_config.set("bootstrap.servers", &bootstrap_servers);
86+
consumer_client_config.set("bootstrap.servers", bootstrap_servers);
9287
consumer_client_config.set("enable.partition.eof", "false");
9388
consumer_client_config.set("session.timeout.ms", "6000");
9489
consumer_client_config.set("enable.auto.commit", "false");
@@ -104,5 +99,18 @@ async fn create_consumer(
10499
)
105100
};
106101

107-
unimplemented!("unimplemented");
102+
Ok(base_consumer)
103+
}
104+
105+
async fn create_producer(bootstrap_servers: &str) -> anyhow::Result<BaseProducer> {
106+
let mut producer_client_config = ClientConfig::default();
107+
producer_client_config.set("bootstrap.servers", bootstrap_servers);
108+
let base_producer_result = create_base_producer(&producer_client_config);
109+
let Ok(base_producer) = base_producer_result else {
110+
panic!(
111+
"could not create based_producer: {}",
112+
base_producer_result.unwrap_err()
113+
);
114+
};
115+
Ok(base_producer)
108116
}

tests/test_low_consumers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use rdkafka::{ClientConfig, Message, Timestamp};
1616
use crate::utils::rand::*;
1717
use crate::utils::*;
1818

19+
mod admin;
1920
mod utils;
2021

2122
fn create_base_consumer(

tests/utils/admin.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use anyhow::Context;
2+
use rdkafka::admin::AdminClient;
3+
use rdkafka::client::DefaultClientContext;
4+
use rdkafka::config::FromClientConfig;
5+
use rdkafka::ClientConfig;
6+
7+
pub async fn create_admin(
8+
bootstrap_servers: &str,
9+
) -> anyhow::Result<AdminClient<DefaultClientContext>> {
10+
let mut admin_client_config = ClientConfig::default();
11+
admin_client_config.set("bootstrap.servers", bootstrap_servers);
12+
AdminClient::from_config(&admin_client_config).context("error creating admin client")
13+
}

tests/utils/containers.rs

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tokio::sync::OnceCell;
99

1010
pub struct KafkaContext {
1111
kafka_node: ContainerAsync<Kafka>,
12+
pub bootstrap_servers: String,
1213
}
1314

1415
impl Debug for KafkaContext {
@@ -22,31 +23,31 @@ impl KafkaContext {
2223
static INSTANCE: OnceCell<Arc<KafkaContext>> = OnceCell::const_new();
2324

2425
INSTANCE
25-
.get_or_try_init(|| async {
26-
let kafka_node = Kafka::default()
27-
.start()
28-
.await
29-
.context("Failed to start Kafka")?;
30-
31-
Ok::<Arc<KafkaContext>, anyhow::Error>(Arc::new(KafkaContext { kafka_node }))
32-
})
26+
.get_or_try_init(init)
3327
.await
3428
.context("Failed to initialize Kafka shared instance")
3529
.map(Arc::clone)
3630
}
31+
}
3732

38-
pub async fn bootstrap_servers(&self) -> anyhow::Result<String> {
39-
let kafka_host = self
40-
.kafka_node
41-
.get_host()
42-
.await
43-
.context("Failed to get Kafka host")?;
44-
let kafka_port = self
45-
.kafka_node
46-
.get_host_port_ipv4(ContainerPort::Tcp(9092))
47-
.await?;
48-
Ok(format!("{}:{}", kafka_host.to_string(), kafka_port))
49-
}
33+
async fn init() -> anyhow::Result<Arc<KafkaContext>> {
34+
let kafka_node = Kafka::default()
35+
.start()
36+
.await
37+
.context("Failed to start Kafka")?;
38+
39+
let kafka_host = kafka_node
40+
.get_host()
41+
.await
42+
.context("Failed to get Kafka host")?;
43+
let kafka_port = kafka_node
44+
.get_host_port_ipv4(ContainerPort::Tcp(9092))
45+
.await?;
46+
47+
Ok::<Arc<KafkaContext>, anyhow::Error>(Arc::new(KafkaContext {
48+
kafka_node,
49+
bootstrap_servers: format!("{}:{}", kafka_host, kafka_port),
50+
}))
5051
}
5152

5253
#[tokio::test]
@@ -59,13 +60,9 @@ pub async fn test_kafka_context_works() {
5960
);
6061
};
6162

62-
let bootstrap_servers_result = kafka_context.bootstrap_servers().await;
63-
let Ok(bootstrap_servers) = bootstrap_servers_result else {
64-
panic!(
65-
"Failed to get bootstrap servers: {}",
66-
bootstrap_servers_result.unwrap_err()
67-
);
68-
};
69-
70-
assert_ne!(bootstrap_servers.len(), 0, "Bootstrap servers empty");
63+
assert_ne!(
64+
kafka_context.bootstrap_servers.len(),
65+
0,
66+
"Bootstrap servers empty"
67+
);
7168
}

tests/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![allow(dead_code)]
22

3+
pub mod admin;
34
pub mod consumer;
45
pub mod containers;
56
pub mod logging;

0 commit comments

Comments
 (0)