Skip to content

Commit 1cf7042

Browse files
committed
Various updates. KafkaContext is now shared.
1 parent bef9136 commit 1cf7042

File tree

6 files changed

+127
-19
lines changed

6 files changed

+127
-19
lines changed

src/consumer/base_consumer.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Low-level consumers.
22
33
use std::ffi::{CStr, CString};
4+
use std::fmt;
45
use std::mem::ManuallyDrop;
56
use std::os::raw::c_void;
67
use std::ptr;
@@ -41,6 +42,20 @@ where
4142
nonempty_callback: Option<Box<Box<dyn Fn() + Send + Sync>>>,
4243
}
4344

45+
impl<C> fmt::Debug for BaseConsumer<C>
46+
where
47+
C: ConsumerContext,
48+
{
49+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50+
f.debug_struct("BaseConsumer")
51+
.field("native_ptr", &self.client.native_ptr())
52+
.field("queue", &self.queue)
53+
.field("group_id", &self.group_id)
54+
.field("has_nonempty_callback", &self.nonempty_callback.is_some())
55+
.finish()
56+
}
57+
}
58+
4459
impl FromClientConfig for BaseConsumer {
4560
fn from_config(config: &ClientConfig) -> KafkaResult<BaseConsumer> {
4661
BaseConsumer::from_config_and_context(config, DefaultConsumerContext)

tests/producer.rs

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
use rdkafka::ClientConfig;
1+
use crate::utils::consumer;
22
use crate::utils::containers::KafkaContext;
33
use crate::utils::logging::init_test_logger;
44
use crate::utils::producer::create_base_producer;
5-
use crate::utils::rand::rand_test_topic;
6-
use rdkafka::producer::BaseRecord;
5+
use crate::utils::rand::{rand_test_group, rand_test_topic};
6+
use rdkafka::consumer::{BaseConsumer, Consumer};
7+
use rdkafka::error::KafkaError;
8+
use rdkafka::producer::{BaseRecord, Producer};
9+
use rdkafka::util::Timeout;
10+
use rdkafka::{ClientConfig, Message};
11+
use std::time::Duration;
712

813
mod utils;
914

@@ -21,27 +26,83 @@ pub async fn test_basic_produce() {
2126

2227
let bootstrap_servers_result = _kafka_context.bootstrap_servers().await;
2328
let Ok(bootstrap_servers) = bootstrap_servers_result else {
24-
panic!("could not create bootstrap servers: {}", bootstrap_servers_result.unwrap_err());
29+
panic!(
30+
"could not create bootstrap servers: {}",
31+
bootstrap_servers_result.unwrap_err()
32+
);
2533
};
26-
let mut client_config = ClientConfig::default();
27-
client_config.set("bootstrap.servers", bootstrap_servers);
34+
let test_topic = rand_test_topic("testing-topic");
2835

29-
let base_producer_result = create_base_producer(&client_config);
36+
create_consumer(bootstrap_servers, test_topic);
37+
38+
let mut producer_client_config = ClientConfig::default();
39+
producer_client_config.set("bootstrap.servers", &bootstrap_servers);
40+
41+
let base_producer_result = create_base_producer(&producer_client_config);
3042
let Ok(base_producer) = base_producer_result else {
3143
panic!(
3244
"could not create based_producer: {}",
3345
base_producer_result.unwrap_err()
3446
);
3547
};
3648

37-
let test_topic = rand_test_topic("testing-topic");
3849
let record = BaseRecord::to(&test_topic) // destination topic
3950
.key(&[1, 2, 3, 4]) // message key
40-
.payload("content") // message payload
41-
.partition(5);
51+
.payload("content"); // message payload
4252

4353
let send_result = base_producer.send(record);
4454
if send_result.is_err() {
4555
panic!("could not produce record: {:?}", send_result.unwrap_err());
4656
}
57+
58+
let flush_result = base_producer.flush(Timeout::After(Duration::from_secs(10)));
59+
if let Err(flush_error) = flush_result {
60+
panic!("timed out waiting for producer flush: {}", flush_error);
61+
}
62+
63+
let Some(next_message_result) = base_consumer.poll(Duration::from_secs(2)) else {
64+
panic!("there is no next message on the topic: {}", test_topic);
65+
};
66+
let Ok(borrowed_next_message) = next_message_result else {
67+
panic!(
68+
"could not get next message from based_consumer: {}",
69+
next_message_result.unwrap_err()
70+
);
71+
};
72+
let owned_next_message = borrowed_next_message.detach();
73+
let Some(message_payload) = owned_next_message.payload() else {
74+
panic!("message payload is empty");
75+
};
76+
let message_string_result = String::from_utf8(message_payload.to_vec());
77+
let Ok(message_string) = message_string_result else {
78+
panic!("message payload is not valid UTF-8");
79+
};
80+
81+
assert!(message_string.contains("content"));
82+
}
83+
84+
async fn create_consumer(
85+
bootstrap_servers: String,
86+
test_topic: String,
87+
) -> anyhow::Result<BaseConsumer> {
88+
let mut consumer_client_config = ClientConfig::default();
89+
consumer_client_config.set("group.id", rand_test_group());
90+
consumer_client_config.set("client.id", "rdkafka_integration_test_client");
91+
consumer_client_config.set("bootstrap.servers", &bootstrap_servers);
92+
consumer_client_config.set("enable.partition.eof", "false");
93+
consumer_client_config.set("session.timeout.ms", "6000");
94+
consumer_client_config.set("enable.auto.commit", "false");
95+
consumer_client_config.set("debug", "all");
96+
consumer_client_config.set("auto.offset.reset", "earliest");
97+
98+
let base_consumer_result =
99+
consumer::create_subscribed_base(consumer_client_config, &[&test_topic]).await;
100+
let Ok(base_consumer) = base_consumer_result else {
101+
panic!(
102+
"could not create base consumer: {}",
103+
base_consumer_result.unwrap_err()
104+
)
105+
};
106+
107+
unimplemented!("unimplemented");
47108
}

tests/utils/consumer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use anyhow::Context;
2+
use rdkafka::config::FromClientConfig;
3+
use rdkafka::consumer::{BaseConsumer, Consumer};
4+
use rdkafka::ClientConfig;
5+
6+
pub async fn create_subscribed_base(
7+
client_config: ClientConfig,
8+
topics: &[&str],
9+
) -> anyhow::Result<BaseConsumer> {
10+
let base_consumer =
11+
BaseConsumer::from_config(&client_config).context("Failed to create consumer")?;
12+
base_consumer
13+
.subscribe(topics)
14+
.context("Failed to subscribe to topic")?;
15+
16+
Ok(base_consumer)
17+
}

tests/utils/containers.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use anyhow::Context;
22
use std::fmt::Debug;
3+
use std::sync::Arc;
34
use testcontainers_modules::kafka::apache::Kafka;
45
use testcontainers_modules::testcontainers::core::ContainerPort;
56
use testcontainers_modules::testcontainers::runners::AsyncRunner;
67
use testcontainers_modules::testcontainers::ContainerAsync;
8+
use tokio::sync::OnceCell;
79

810
pub struct KafkaContext {
911
kafka_node: ContainerAsync<Kafka>,
@@ -16,13 +18,21 @@ impl Debug for KafkaContext {
1618
}
1719

1820
impl KafkaContext {
19-
pub async fn new() -> anyhow::Result<Self> {
20-
let kafka_node = Kafka::default()
21-
.start()
22-
.await
23-
.context("Failed to start Kafka")?;
21+
pub async fn shared() -> anyhow::Result<Arc<Self>> {
22+
static INSTANCE: OnceCell<Arc<KafkaContext>> = OnceCell::const_new();
23+
24+
INSTANCE
25+
.get_or_try_init(|| async {
26+
let kafka_node = Kafka::default()
27+
.start()
28+
.await
29+
.context("Failed to start Kafka")?;
2430

25-
Ok(Self { kafka_node })
31+
Ok::<Arc<KafkaContext>, anyhow::Error>(Arc::new(KafkaContext { kafka_node }))
32+
})
33+
.await
34+
.context("Failed to initialize Kafka shared instance")
35+
.map(Arc::clone)
2636
}
2737

2838
pub async fn bootstrap_servers(&self) -> anyhow::Result<String> {
@@ -41,7 +51,7 @@ impl KafkaContext {
4151

4252
#[tokio::test]
4353
pub async fn test_kafka_context_works() {
44-
let kafka_context_result = KafkaContext::new().await;
54+
let kafka_context_result = KafkaContext::shared().await;
4555
let Ok(kafka_context) = kafka_context_result else {
4656
panic!(
4757
"Failed to get Kafka context: {}",

tests/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![allow(dead_code)]
22

3+
pub mod consumer;
34
pub mod containers;
45
pub mod logging;
56
pub mod producer;

tests/utils/producer.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
use rdkafka::ClientConfig;
21
use rdkafka::config::FromClientConfig;
32
use rdkafka::producer::BaseProducer;
3+
use rdkafka::ClientConfig;
44

55
pub fn create_base_producer(config: &ClientConfig) -> anyhow::Result<BaseProducer> {
66
let base_producer_result = BaseProducer::from_config(config);
77
let Ok(base_producer) = base_producer_result else {
8-
anyhow::bail!("error creating base producer: {}", base_producer_result.unwrap_err())
8+
anyhow::bail!(
9+
"error creating base producer: {}",
10+
base_producer_result.unwrap_err()
11+
)
912
};
13+
1014
Ok(base_producer)
1115
}

0 commit comments

Comments
 (0)