Skip to content

Commit 5684312

Browse files
committed
fix: test_produce_consume_base_assign
1 parent 8c9ab9b commit 5684312

File tree

8 files changed

+192
-26
lines changed

8 files changed

+192
-26
lines changed

tests/admin.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ use crate::utils::logging::init_test_logger;
44
use crate::utils::rand::{rand_test_group, rand_test_topic};
55
use crate::utils::{get_broker_version, KafkaVersion};
66
use backon::{BlockingRetryable, ExponentialBuilder};
7-
use rdkafka::admin::{AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, GroupResult, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier, TopicReplication};
7+
use rdkafka::admin::{
8+
AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, GroupResult, NewPartitions,
9+
NewTopic, OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
10+
};
11+
use rdkafka::client::DefaultClientContext;
812
use rdkafka::error::KafkaError;
913
use rdkafka::producer::{FutureRecord, Producer};
1014
use rdkafka::{ClientConfig, Offset, TopicPartitionList};
1115
use rdkafka_sys::RDKafkaErrorCode;
1216
use std::time::Duration;
13-
use rdkafka::client::DefaultClientContext;
1417

1518
#[path = "utils/mod.rs"]
1619
mod utils;
@@ -666,4 +669,4 @@ async fn test_event_errors() {
666669
res,
667670
Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut))
668671
);
669-
}
672+
}

tests/test_high_consumers.rs renamed to tests/stream_consumers.rs

Lines changed: 116 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ use rdkafka::error::KafkaError;
1515
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
1616
use rdkafka::util::current_time_millis;
1717
use rdkafka::{Message, Timestamp};
18+
use rdkafka::admin::AdminOptions;
1819
use rdkafka_sys::types::RDKafkaConfRes;
1920

21+
use crate::utils::containers::KafkaContext;
22+
use crate::utils::logging::init_test_logger;
2023
use crate::utils::rand::*;
2124
use crate::utils::*;
25+
use crate::utils::admin::new_topic_vec;
2226

2327
mod utils;
2428

@@ -71,27 +75,52 @@ async fn test_invalid_max_poll_interval() {
7175
// All produced messages should be consumed.
7276
#[tokio::test(flavor = "multi_thread")]
7377
async fn test_produce_consume_base() {
74-
let _r = env_logger::try_init();
78+
init_test_logger();
79+
80+
// Get Kafka container context.
81+
let kafka_context = KafkaContext::shared()
82+
.await
83+
.expect("could not create kafka context");
7584

85+
let producer = producer::future_producer::create_producer(&kafka_context.bootstrap_servers)
86+
.await
87+
.expect("Could not create Future producer");
88+
89+
let num_of_messages_to_send = 100usize;
7690
let start_time = current_time_millis();
7791
let topic_name = rand_test_topic("test_produce_consume_base");
78-
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
79-
let consumer = create_stream_consumer(&rand_test_group(), None);
80-
consumer.subscribe(&[topic_name.as_str()]).unwrap();
92+
let message_map = topics::populate_topic_using_future_producer(
93+
&producer,
94+
&topic_name,
95+
num_of_messages_to_send,
96+
None,
97+
)
98+
.await
99+
.expect("Could not populate topic using Future producer");
100+
// let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
101+
let consumer = utils::consumer::stream_consumer::create_stream_consumer(
102+
&kafka_context.bootstrap_servers,
103+
Some(&rand_test_group()),
104+
)
105+
.await
106+
.expect("could not create stream consumer");
107+
consumer
108+
.subscribe(&[topic_name.as_str()])
109+
.expect("could not subscribe to kafka topic");
81110

82-
let _consumer_future = consumer
111+
consumer
83112
.stream()
84-
.take(100)
113+
.take(num_of_messages_to_send)
85114
.for_each(|message| {
86115
match message {
87116
Ok(m) => {
88117
let id = message_map[&(m.partition(), m.offset())];
89118
match m.timestamp() {
90119
Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time),
91-
_ => panic!("Expected createtime for message timestamp"),
120+
_ => panic!("Expected create time for message timestamp"),
92121
};
93-
assert_eq!(m.payload_view::<str>().unwrap().unwrap(), value_fn(id));
94-
assert_eq!(m.key_view::<str>().unwrap().unwrap(), key_fn(id));
122+
assert_eq!(m.payload_view::<str>().unwrap().unwrap(), id.to_string());
123+
assert_eq!(m.key_view::<str>().unwrap().unwrap(), id.to_string());
95124
assert_eq!(m.topic(), topic_name.as_str());
96125
}
97126
Err(e) => panic!("Error receiving message: {:?}", e),
@@ -107,13 +136,39 @@ async fn test_produce_consume_base() {
107136
/// waker slot.
108137
#[tokio::test(flavor = "multi_thread")]
109138
async fn test_produce_consume_base_concurrent() {
110-
let _r = env_logger::try_init();
139+
init_test_logger();
111140

112-
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
113-
populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
141+
// Get Kafka container context.
142+
let kafka_context = KafkaContext::shared()
143+
.await
144+
.expect("could not create kafka context");
114145

115-
let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None));
116-
consumer.subscribe(&[topic_name.as_str()]).unwrap();
146+
let producer = producer::future_producer::create_producer(&kafka_context.bootstrap_servers)
147+
.await
148+
.expect("Could not create Future producer");
149+
150+
let num_of_messages_to_send = 100usize;
151+
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
152+
topics::populate_topic_using_future_producer(
153+
&producer,
154+
&topic_name,
155+
num_of_messages_to_send,
156+
None,
157+
)
158+
.await
159+
.expect("Could not populate topic using Future producer");
160+
// let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
161+
let consumer = Arc::new(
162+
consumer::stream_consumer::create_stream_consumer(
163+
&kafka_context.bootstrap_servers,
164+
Some(&rand_test_group()),
165+
)
166+
.await
167+
.expect("could not create stream consumer"),
168+
);
169+
consumer
170+
.subscribe(&[topic_name.as_str()])
171+
.expect("could not subscribe to kafka topic");
117172

118173
let mk_task = || {
119174
let consumer = consumer.clone();
@@ -137,13 +192,55 @@ async fn test_produce_consume_base_concurrent() {
137192
// All produced messages should be consumed.
138193
#[tokio::test(flavor = "multi_thread")]
139194
async fn test_produce_consume_base_assign() {
140-
let _r = env_logger::try_init();
195+
init_test_logger();
196+
197+
// Get Kafka container context.
198+
let kafka_context = KafkaContext::shared()
199+
.await
200+
.expect("could not create kafka context");
141201

142202
let topic_name = rand_test_topic("test_produce_consume_base_assign");
143-
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
144-
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
145-
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
146-
let consumer = create_stream_consumer(&rand_test_group(), None);
203+
let admin_client = admin::create_admin_client(&kafka_context.bootstrap_servers)
204+
.await
205+
.expect("Could not create admin client");
206+
admin_client.create_topics(&new_topic_vec(&topic_name, Some(3)), &AdminOptions::default()).await.expect("could not create topics");
207+
208+
let producer = producer::future_producer::create_producer(&kafka_context.bootstrap_servers)
209+
.await
210+
.expect("Could not create Future producer");
211+
212+
let num_of_messages_to_send = 10usize;
213+
topics::populate_topic_using_future_producer(
214+
&producer,
215+
&topic_name,
216+
num_of_messages_to_send,
217+
Some(0),
218+
)
219+
.await
220+
.expect("Could not populate topic using Future producer");
221+
topics::populate_topic_using_future_producer(
222+
&producer,
223+
&topic_name,
224+
num_of_messages_to_send,
225+
Some(1),
226+
)
227+
.await
228+
.expect("Could not populate topic using Future producer");
229+
topics::populate_topic_using_future_producer(
230+
&producer,
231+
&topic_name,
232+
num_of_messages_to_send,
233+
Some(2),
234+
)
235+
.await
236+
.expect("Could not populate topic using Future producer");
237+
238+
let consumer = utils::consumer::stream_consumer::create_stream_consumer(
239+
&kafka_context.bootstrap_servers,
240+
Some(&rand_test_group()),
241+
)
242+
.await
243+
.expect("could not create stream consumer");
147244
let mut tpl = TopicPartitionList::new();
148245
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning)
149246
.unwrap();

tests/utils/admin.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub async fn create_topic(
1717
topic_name: &'_ str,
1818
) -> anyhow::Result<String> {
1919
let topic_results = admin_client
20-
.create_topics(&new_topic_vec(&topic_name), &AdminOptions::default())
20+
.create_topics(&new_topic_vec(&topic_name, None), &AdminOptions::default())
2121
.await
2222
.context("error creating topics")?;
2323
for topic_result in topic_results {
@@ -28,7 +28,7 @@ pub async fn create_topic(
2828
Ok(topic_name.to_string())
2929
}
3030

31-
pub fn new_topic_vec(topic_name: &'_ str) -> Vec<NewTopic<'_>> {
32-
let new_topic = NewTopic::new(&topic_name, 1, TopicReplication::Fixed(1));
31+
pub fn new_topic_vec(topic_name: &'_ str, num_partitions: Option<i32>) -> Vec<NewTopic<'_>> {
32+
let new_topic = NewTopic::new(&topic_name, num_partitions.unwrap_or(1), TopicReplication::Fixed(1));
3333
vec![new_topic]
3434
}

tests/utils/consumer.rs renamed to tests/utils/consumer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod stream_consumer;
2+
13
use crate::utils::rand::rand_test_group;
24
use anyhow::{bail, Context};
35
use backon::{BlockingRetryable, ExponentialBuilder};
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use anyhow::Context;
2+
use rdkafka::ClientConfig;
3+
use rdkafka::config::FromClientConfig;
4+
use rdkafka::consumer::StreamConsumer;
5+
6+
pub async fn create_stream_consumer(
7+
bootstrap_server: &str,
8+
consumer_group_option: Option<&str>,
9+
) -> anyhow::Result<StreamConsumer> {
10+
let mut client_config = ClientConfig::default();
11+
client_config.set("bootstrap.servers", bootstrap_server);
12+
client_config.set("auto.offset.reset", "earliest");
13+
if let Some(group) = consumer_group_option {
14+
client_config.set("group.id", group);
15+
}
16+
17+
let stream_consumer = StreamConsumer::from_config(&client_config).context("failed to create stream consumer")?;
18+
Ok(stream_consumer)
19+
}

tests/utils/containers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44
use testcontainers_modules::kafka::apache::Kafka;
55
use testcontainers_modules::testcontainers::core::ContainerPort;
66
use testcontainers_modules::testcontainers::runners::AsyncRunner;
7-
use testcontainers_modules::testcontainers::{ContainerAsync, Image, ImageExt};
7+
use testcontainers_modules::testcontainers::{ContainerAsync, Image};
88
use tokio::sync::OnceCell;
99

1010
pub struct KafkaContext {

tests/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod containers;
66
pub mod logging;
77
pub mod producer;
88
pub mod rand;
9+
pub mod topics;
910

1011
use std::collections::HashMap;
1112
use std::env::{self};

tests/utils/topics.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use rdkafka::producer::{FutureProducer, FutureRecord};
2+
use std::collections::HashMap;
3+
use std::time::Duration;
4+
5+
pub type PartitionOffset = (i32, i64);
6+
pub type MessageId = usize;
7+
8+
pub async fn populate_topic_using_future_producer(
9+
producer: &FutureProducer,
10+
topic_name: &str,
11+
num_messages: usize,
12+
partition: Option<i32>,
13+
) -> anyhow::Result<HashMap<PartitionOffset, MessageId>> {
14+
let message_send_futures = (0..num_messages)
15+
.map(|id| {
16+
let future = async move {
17+
producer
18+
.send(
19+
FutureRecord {
20+
topic: topic_name,
21+
payload: Some(&id.to_string()),
22+
key: Some(&id.to_string()),
23+
partition,
24+
timestamp: None,
25+
headers: None,
26+
},
27+
Duration::from_secs(1),
28+
)
29+
.await
30+
};
31+
(id, future)
32+
})
33+
.collect::<Vec<_>>();
34+
35+
let mut message_map = HashMap::<PartitionOffset, MessageId>::new();
36+
for (id, future) in message_send_futures {
37+
match future.await {
38+
Ok(delivered) => message_map.insert((delivered.partition, delivered.offset), id),
39+
Err((kafka_error, _message)) => panic!("Delivery failed: {}", kafka_error),
40+
};
41+
}
42+
43+
Ok(message_map)
44+
}

0 commit comments

Comments
 (0)