Skip to content

Commit 722deb2

Browse files
committed
fix: test_mixed_success_results
1 parent 87f159b commit 722deb2

File tree

2 files changed

+96
-65
lines changed

2 files changed

+96
-65
lines changed

tests/admin.rs

Lines changed: 96 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use rdkafka::admin::{
88
TopicReplication,
99
};
1010
use rdkafka::error::KafkaError;
11-
use std::time::Duration;
1211
use rdkafka_sys::RDKafkaErrorCode;
12+
use std::time::Duration;
1313

1414
#[path = "utils/mod.rs"]
1515
mod utils;
@@ -66,69 +66,6 @@ async fn test_topics() {
6666
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers)
6767
.await
6868
.expect("could not create consumer client");
69-
70-
//
71-
// // Verify that deleting a non-existent topic fails.
72-
// {
73-
// let name = rand_test_topic("test_topics");
74-
// let res = admin_client
75-
// .delete_topics(&[&name], &opts)
76-
// .await
77-
// .expect("delete topics failed");
78-
// assert_eq!(
79-
// res,
80-
// &[Err((name, RDKafkaErrorCode::UnknownTopicOrPartition))]
81-
// );
82-
// }
83-
//
84-
// // Verify that mixed-success operations properly report the successful and
85-
// // failing operators.
86-
// {
87-
// let name1 = rand_test_topic("test_topics");
88-
// let name2 = rand_test_topic("test_topics");
89-
//
90-
// let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
91-
// let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
92-
//
93-
// let res = admin_client
94-
// .create_topics(vec![&topic1], &opts)
95-
// .await
96-
// .expect("topic creation failed");
97-
// assert_eq!(res, &[Ok(name1.clone())]);
98-
// let _ = fetch_metadata(&name1);
99-
//
100-
// let res = admin_client
101-
// .create_topics(vec![&topic1, &topic2], &opts)
102-
// .await
103-
// .expect("topic creation failed");
104-
// assert_eq!(
105-
// res,
106-
// &[
107-
// Err((name1.clone(), RDKafkaErrorCode::TopicAlreadyExists)),
108-
// Ok(name2.clone())
109-
// ]
110-
// );
111-
// let _ = fetch_metadata(&name2);
112-
//
113-
// let res = admin_client
114-
// .delete_topics(&[&name1], &opts)
115-
// .await
116-
// .expect("topic deletion failed");
117-
// assert_eq!(res, &[Ok(name1.clone())]);
118-
// verify_delete(&name1);
119-
//
120-
// let res = admin_client
121-
// .delete_topics(&[&name2, &name1], &opts)
122-
// .await
123-
// .expect("topic deletion failed");
124-
// assert_eq!(
125-
// res,
126-
// &[
127-
// Ok(name2.clone()),
128-
// Err((name1.clone(), RDKafkaErrorCode::UnknownTopicOrPartition))
129-
// ]
130-
// );
131-
// }
13269
}
13370

13471
#[tokio::test]
@@ -388,3 +325,98 @@ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_partit
388325
&[Err((name, RDKafkaErrorCode::InvalidReplicaAssignment))],
389326
);
390327
}
328+
329+
/// Verify that deleting a non-existent topic fails.
330+
#[tokio::test]
331+
pub async fn test_delete_nonexistent_topics() {
332+
// Get Kafka container context.
333+
let kafka_context = KafkaContext::shared()
334+
.await
335+
.expect("could not create kafka context");
336+
337+
// Create admin client
338+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
339+
.await
340+
.expect("could not create admin client");
341+
let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30)));
342+
343+
let name = rand_test_topic("test_topics");
344+
let res = admin_client
345+
.delete_topics(&[&name], &opts)
346+
.await
347+
.expect("delete topics failed");
348+
assert_eq!(
349+
res,
350+
&[Err((name, RDKafkaErrorCode::UnknownTopicOrPartition))]
351+
);
352+
}
353+
354+
/// Verify that mixed-success operations properly report the successful and
355+
/// failing operators.
356+
#[tokio::test]
357+
pub async fn test_mixed_success_results() {
358+
// Get Kafka container context.
359+
let kafka_context = KafkaContext::shared()
360+
.await
361+
.expect("could not create kafka context");
362+
363+
// Create admin client
364+
let admin_client = utils::admin::create_admin_client(&kafka_context.bootstrap_servers)
365+
.await
366+
.expect("could not create admin client");
367+
let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30)));
368+
369+
// Create consumer client
370+
let consumer_client =
371+
utils::consumer::create_unsubscribed_base_consumer(&kafka_context.bootstrap_servers)
372+
.await
373+
.expect("could not create consumer client");
374+
375+
let name1 = rand_test_topic("test_topics");
376+
let name2 = rand_test_topic("test_topics");
377+
378+
let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
379+
let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
380+
381+
let res = admin_client
382+
.create_topics(vec![&topic1], &opts)
383+
.await
384+
.expect("topic creation failed");
385+
assert_eq!(res, &[Ok(name1.clone())]);
386+
let _ = utils::consumer::fetch_consumer_metadata(&consumer_client, &name1)
387+
.expect(&format!("could not fetch consumer metadata for {}", name1));
388+
389+
let res = admin_client
390+
.create_topics(vec![&topic1, &topic2], &opts)
391+
.await
392+
.expect("topic creation failed");
393+
assert_eq!(
394+
res,
395+
&[
396+
Err((name1.clone(), RDKafkaErrorCode::TopicAlreadyExists)),
397+
Ok(name2.clone())
398+
]
399+
);
400+
let _ = utils::consumer::fetch_consumer_metadata(&consumer_client, &name2)
401+
.expect(&format!("could not fetch consumer metadata for {}", name2));
402+
403+
let res = admin_client
404+
.delete_topics(&[&name1], &opts)
405+
.await
406+
.expect("topic deletion failed");
407+
assert_eq!(res, &[Ok(name1.clone())]);
408+
utils::consumer::verify_topic_deleted(&consumer_client, &name1)
409+
.expect(&format!("could not verify topic \"{}\" was deleted", name1));
410+
411+
let res = admin_client
412+
.delete_topics(&[&name2, &name1], &opts)
413+
.await
414+
.expect("topic deletion failed");
415+
assert_eq!(
416+
res,
417+
&[
418+
Ok(name2.clone()),
419+
Err((name1.clone(), RDKafkaErrorCode::UnknownTopicOrPartition))
420+
]
421+
);
422+
}

tests/utils/consumer.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::utils::consumer;
21
use crate::utils::rand::rand_test_group;
32
use anyhow::{bail, Context};
43
use backon::{BlockingRetryable, ExponentialBuilder};

0 commit comments

Comments
 (0)