@@ -7,6 +7,7 @@ use rdkafka::admin::{
77 AdminOptions , ConfigEntry , ConfigSource , NewPartitions , NewTopic , ResourceSpecifier ,
88 TopicReplication ,
99} ;
10+ use rdkafka:: error:: KafkaError ;
1011use std:: time:: Duration ;
1112
1213#[ path = "utils/mod.rs" ]
@@ -65,21 +66,6 @@ async fn test_topics() {
6566 . await
6667 . expect ( "could not create consumer client" ) ;
6768
68- // // Verify that incorrect replication configurations are ignored when
69- // // creating topics.
70- // {
71- // let topic = NewTopic::new("ignored", 1, TopicReplication::Variable(&[&[0], &[0]]));
72- // let res = admin_client.create_topics(&[topic], &opts).await;
73- // assert_eq!(
74- // Err(KafkaError::AdminOpCreation(
75- // "replication configuration for topic 'ignored' assigns 2 partition(s), \
76- // which does not match the specified number of partitions (1)"
77- // .into()
78- // )),
79- // res,
80- // )
81- // }
82- //
8369 // // Verify that incorrect replication configurations are ignored when
8470 // // creating partitions.
8571 // {
@@ -207,7 +193,11 @@ pub async fn test_topic_create_and_delete() {
207193 let topic2 = NewTopic {
208194 name : & topic_name_2,
209195 num_partitions : 3 ,
210- replication : TopicReplication :: Variable ( & [ & [ 1 ] , & [ 1 ] , & [ 1 ] ] ) ,
196+ replication : TopicReplication :: Variable ( & [
197+ & [ utils:: BROKER_ID ] ,
198+ & [ utils:: BROKER_ID ] ,
199+ & [ utils:: BROKER_ID ] ,
200+ ] ) ,
211201 config : Vec :: new ( ) ,
212202 } ;
213203
@@ -330,3 +320,43 @@ pub async fn test_topic_create_and_delete() {
330320 utils:: consumer:: verify_topic_deleted ( & consumer_client, & topic_name_2)
331321 . expect ( & format ! ( "could not delete topic for {}" , & topic_name_2) ) ;
332322}
323+
324+ /// Verify that incorrect replication configurations are ignored when
325+ /// creating topics.
326+ #[ tokio:: test]
327+ pub async fn test_incorect_replication_factors_are_ignored ( ) {
328+ // Get Kafka container context.
329+ let kafka_context_result = KafkaContext :: shared ( ) . await ;
330+ let Ok ( kafka_context) = kafka_context_result else {
331+ panic ! (
332+ "could not create kafka context: {}" ,
333+ kafka_context_result. unwrap_err( )
334+ ) ;
335+ } ;
336+ let test_topic_name = rand_test_topic ( "testing-topic" ) ;
337+
338+ let admin_client_result =
339+ utils:: admin:: create_admin_client ( & kafka_context. bootstrap_servers ) . await ;
340+ let Ok ( admin_client) = admin_client_result else {
341+ panic ! (
342+ "could not create admin client: {}" ,
343+ admin_client_result. unwrap_err( )
344+ ) ;
345+ } ;
346+ let opts = AdminOptions :: new ( ) . operation_timeout ( Some ( Duration :: from_secs ( 30 ) ) ) ;
347+
348+ let topic = NewTopic :: new (
349+ "ignored" ,
350+ 1 ,
351+ TopicReplication :: Variable ( & [ & [ utils:: BROKER_ID ] , & [ utils:: BROKER_ID ] ] ) ,
352+ ) ;
353+ let res = admin_client. create_topics ( & [ topic] , & opts) . await ;
354+ assert_eq ! (
355+ Err ( KafkaError :: AdminOpCreation (
356+ "replication configuration for topic 'ignored' assigns 2 partition(s), \
357+ which does not match the specified number of partitions (1)"
358+ . into( )
359+ ) ) ,
360+ res,
361+ )
362+ }
0 commit comments