Skip to content

Commit 89ba998

Browse files
committed
fix: test_consumer_groups_deletion and test_delete_unknown_group
1 parent bc12604 commit 89ba998

File tree

5 files changed

+161
-60
lines changed

5 files changed

+161
-60
lines changed

tests/admin.rs

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use crate::utils::admin::create_topic;
22
use crate::utils::containers::KafkaContext;
33
use crate::utils::logging::init_test_logger;
4-
use crate::utils::rand::rand_test_topic;
4+
use crate::utils::rand::{rand_test_group, rand_test_topic};
55
use crate::utils::{get_broker_version, KafkaVersion};
66
use backon::{BlockingRetryable, ExponentialBuilder};
77
use rdkafka::admin::{
8-
AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic,
8+
AdminOptions, AlterConfig, ConfigEntry, ConfigSource, GroupResult, NewPartitions, NewTopic,
99
OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
1010
};
1111
use rdkafka::error::KafkaError;
@@ -69,7 +69,7 @@ pub async fn test_topic_create_and_delete() {
6969

7070
// Create consumer client
7171
let consumer_client =
72-
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers)
72+
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers, None)
7373
.await
7474
.expect("could not create consumer client");
7575

@@ -276,7 +276,7 @@ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_partit
276276

277277
// Create consumer client
278278
let consumer_client =
279-
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers)
279+
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers, None)
280280
.await
281281
.expect("could not create consumer client");
282282

@@ -361,7 +361,7 @@ pub async fn test_mixed_success_results() {
361361

362362
// Create consumer client
363363
let consumer_client =
364-
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers)
364+
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers, None)
365365
.await
366366
.expect("could not create consumer client");
367367

@@ -623,3 +623,43 @@ async fn test_configs() {
623623
.expect("alter configs failed");
624624
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(utils::BROKER_ID))]);
625625
}
626+
627+
#[tokio::test]
628+
async fn test_groups() {
629+
init_test_logger();
630+
631+
// Get Kafka container context.
632+
let kafka_context = KafkaContext::shared()
633+
.await
634+
.expect("could not create kafka context");
635+
636+
// Create admin client
637+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
638+
.await
639+
.expect("could not create admin client");
640+
let opts = AdminOptions::new();
641+
642+
// Verify that deleting a valid and invalid group results in a mixed result
643+
// set.
644+
{
645+
let group_name = rand_test_group();
646+
let unknown_group_name = rand_test_group();
647+
create_consumer_group(&group_name).await;
648+
let res = admin_client
649+
.delete_groups(
650+
&[&group_name, &unknown_group_name],
651+
&AdminOptions::default(),
652+
)
653+
.await;
654+
assert_eq!(
655+
res,
656+
Ok(vec![
657+
Ok(group_name.to_string()),
658+
Err((
659+
unknown_group_name.to_string(),
660+
RDKafkaErrorCode::GroupIdNotFound
661+
))
662+
])
663+
);
664+
}
665+
}

tests/consumer_groups.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crate::utils::containers::KafkaContext;
2+
use crate::utils::logging::init_test_logger;
3+
use crate::utils::rand::{rand_test_group, rand_test_topic};
4+
use rdkafka::admin::{AdminOptions, GroupResult, NewTopic, TopicReplication};
5+
use rdkafka_sys::RDKafkaErrorCode;
6+
7+
mod utils;
8+
9+
/// Verify that a valid group can be deleted.
10+
#[tokio::test]
11+
pub async fn test_consumer_groups_deletion() {
12+
init_test_logger();
13+
14+
// Get Kafka container context.
15+
let kafka_context = KafkaContext::shared()
16+
.await
17+
.expect("could not create kafka context");
18+
19+
// Create admin client
20+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
21+
.await
22+
.expect("could not create admin client");
23+
24+
// Create consumer_client
25+
let group_name = rand_test_group();
26+
let topic_name = rand_test_topic("test_topic");
27+
let consumer_client = utils::consumer::create_unsubscribed_base_consumer(
28+
&kafka_context.bootstrap_servers,
29+
Some(&group_name),
30+
)
31+
.await
32+
.expect("could not create subscribed base consumer");
33+
34+
admin_client
35+
.create_topics(
36+
&[NewTopic {
37+
name: &topic_name,
38+
num_partitions: 1,
39+
replication: TopicReplication::Fixed(1),
40+
config: vec![],
41+
}],
42+
&AdminOptions::default(),
43+
)
44+
.await
45+
.expect("topic creation failed");
46+
47+
utils::consumer::create_consumer_group_on_topic(&consumer_client, &topic_name)
48+
.await
49+
.expect("could not create group");
50+
let res = admin_client
51+
.delete_groups(&[&group_name], &AdminOptions::default())
52+
.await
53+
.expect("could not delete groups");
54+
assert_eq!(res, [Ok(group_name.to_string())]);
55+
}
56+
57+
/// Verify that attempting to delete an unknown group returns a "group not
58+
/// found" error.
59+
#[tokio::test]
60+
pub async fn delete_unknown_group() {
61+
init_test_logger();
62+
63+
// Get Kafka container context.
64+
let kafka_context = KafkaContext::shared()
65+
.await
66+
.expect("could not create kafka context");
67+
68+
// Create admin client
69+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
70+
.await
71+
.expect("could not create admin client");
72+
73+
let unknown_group_name = rand_test_group();
74+
let res = admin_client
75+
.delete_groups(&[&unknown_group_name], &AdminOptions::default())
76+
.await;
77+
let expected: GroupResult = Err((unknown_group_name, RDKafkaErrorCode::NotCoordinator));
78+
assert_eq!(res, Ok(vec![expected]));
79+
}

tests/producer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ pub async fn test_basic_produce() {
3939
}
4040

4141
let consumer_result =
42-
create_subscribed_base_consumer(&kafka_context.bootstrap_servers, &test_topic_name).await;
42+
create_subscribed_base_consumer(&kafka_context.bootstrap_servers, None, &test_topic_name)
43+
.await;
4344
let Ok(consumer) = consumer_result else {
4445
panic!(
4546
"could not create consumer: {}",

tests/test_admin.rs

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -119,56 +119,6 @@ fn verify_delete(topic: &str) {
119119
.unwrap()
120120
}
121121

122-
#[tokio::test]
123-
async fn test_groups() {
124-
let admin_client = create_admin_client();
125-
126-
// Verify that a valid group can be deleted.
127-
{
128-
let group_name = rand_test_group();
129-
create_consumer_group(&group_name).await;
130-
let res = admin_client
131-
.delete_groups(&[&group_name], &AdminOptions::default())
132-
.await;
133-
assert_eq!(res, Ok(vec![Ok(group_name.to_string())]));
134-
}
135-
136-
// Verify that attempting to delete an unknown group returns a "group not
137-
// found" error.
138-
{
139-
let unknown_group_name = rand_test_group();
140-
let res = admin_client
141-
.delete_groups(&[&unknown_group_name], &AdminOptions::default())
142-
.await;
143-
let expected: GroupResult = Err((unknown_group_name, RDKafkaErrorCode::GroupIdNotFound));
144-
assert_eq!(res, Ok(vec![expected]));
145-
}
146-
147-
// Verify that deleting a valid and invalid group results in a mixed result
148-
// set.
149-
{
150-
let group_name = rand_test_group();
151-
let unknown_group_name = rand_test_group();
152-
create_consumer_group(&group_name).await;
153-
let res = admin_client
154-
.delete_groups(
155-
&[&group_name, &unknown_group_name],
156-
&AdminOptions::default(),
157-
)
158-
.await;
159-
assert_eq!(
160-
res,
161-
Ok(vec![
162-
Ok(group_name.to_string()),
163-
Err((
164-
unknown_group_name.to_string(),
165-
RDKafkaErrorCode::GroupIdNotFound
166-
))
167-
])
168-
);
169-
}
170-
}
171-
172122
// Tests whether each admin operation properly reports an error if the entire
173123
// request fails. The original implementations failed to check this, resulting
174124
// in confusing situations where a failed admin request would return Ok([]).

tests/utils/consumer.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@ use crate::utils::rand::rand_test_group;
22
use anyhow::{bail, Context};
33
use backon::{BlockingRetryable, ExponentialBuilder};
44
use rdkafka::config::FromClientConfig;
5-
use rdkafka::consumer::{BaseConsumer, Consumer};
5+
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer};
66
use rdkafka::message::BorrowedMessage;
77
use rdkafka::metadata::Metadata;
8-
use rdkafka::ClientConfig;
8+
use rdkafka::{ClientConfig, TopicPartitionList};
99
use std::time::Duration;
1010

1111
pub async fn create_subscribed_base_consumer(
1212
bootstrap_servers: &str,
13+
consumer_group_option: Option<&str>,
1314
test_topic: &str,
1415
) -> anyhow::Result<BaseConsumer> {
15-
let unsubscribed_base_consumer = create_unsubscribed_base_consumer(bootstrap_servers).await?;
16+
let unsubscribed_base_consumer =
17+
create_unsubscribed_base_consumer(bootstrap_servers, consumer_group_option).await?;
1618
unsubscribed_base_consumer
1719
.subscribe(&[test_topic])
1820
.context("Failed to subscribe to topic")?;
@@ -21,9 +23,14 @@ pub async fn create_subscribed_base_consumer(
2123

2224
pub async fn create_unsubscribed_base_consumer(
2325
bootstrap_servers: &str,
26+
consumer_group_option: Option<&str>,
2427
) -> anyhow::Result<BaseConsumer> {
28+
let consumer_group_name = match consumer_group_option {
29+
Some(consumer_group_name) => consumer_group_name,
30+
None => &rand_test_group(),
31+
};
2532
let mut consumer_client_config = ClientConfig::default();
26-
consumer_client_config.set("group.id", rand_test_group());
33+
consumer_client_config.set("group.id", consumer_group_name);
2734
consumer_client_config.set("client.id", "rdkafka_integration_test_client");
2835
consumer_client_config.set("bootstrap.servers", bootstrap_servers);
2936
consumer_client_config.set("enable.partition.eof", "false");
@@ -97,3 +104,27 @@ pub fn verify_topic_deleted(consumer: &BaseConsumer, topic: &str) -> anyhow::Res
97104
.retry(ExponentialBuilder::default().with_max_delay(Duration::from_secs(5)))
98105
.call()
99106
}
107+
108+
pub async fn create_consumer_group_on_topic(
109+
consumer_client: &BaseConsumer,
110+
topic_name: &str,
111+
) -> anyhow::Result<()> {
112+
let topic_partition_list = {
113+
let mut lst = TopicPartitionList::new();
114+
lst.add_partition(topic_name, 0);
115+
lst
116+
};
117+
consumer_client
118+
.assign(&topic_partition_list)
119+
.context("assign topic partition list failed")?;
120+
consumer_client
121+
.fetch_metadata(None, Duration::from_secs(3))
122+
.context("unable to fetch metadata")?;
123+
(|| consumer_client.store_offset(topic_name, 0, -1))
124+
.retry(ExponentialBuilder::default().with_max_delay(Duration::from_secs(5)))
125+
.call()
126+
.context("store offset failed")?;
127+
consumer_client
128+
.commit_consumer_state(CommitMode::Sync)
129+
.context("commit the consumer state failed")
130+
}

0 commit comments

Comments
 (0)