@@ -9,6 +9,7 @@ use rdkafka::admin::{
99} ;
1010use rdkafka:: error:: KafkaError ;
1111use std:: time:: Duration ;
12+ use rdkafka_sys:: RDKafkaErrorCode ;
1213
1314#[ path = "utils/mod.rs" ]
1415mod utils;
@@ -66,43 +67,6 @@ async fn test_topics() {
6667 . await
6768 . expect ( "could not create consumer client" ) ;
6869
69- // // Verify that incorrect replication configurations are ignored when
70- // // creating partitions.
71- // {
72- // let name = rand_test_topic("test_topics");
73- // let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));
74- //
75- // let res = admin_client
76- // .create_topics(vec![&topic], &opts)
77- // .await
78- // .expect("topic creation failed");
79- // assert_eq!(res, &[Ok(name.clone())]);
80- // let _ = fetch_metadata(&name);
81- //
82- // // This partition specification is obviously garbage, and so trips
83- // // a client-side error.
84- // let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0], &[0]]);
85- // let res = admin_client.create_partitions(&[partitions], &opts).await;
86- // assert_eq!(
87- // res,
88- // Err(KafkaError::AdminOpCreation(format!(
89- // "partition assignment for topic '{}' assigns 3 partition(s), \
90- // which is more than the requested total number of partitions (2)",
91- // name
92- // )))
93- // );
94- //
95- // // Only the server knows that this partition specification is garbage.
96- // let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0]]);
97- // let res = admin_client
98- // .create_partitions(&[partitions], &opts)
99- // .await
100- // .expect("partition creation failed");
101- // assert_eq!(
102- // res,
103- // &[Err((name, RDKafkaErrorCode::InvalidReplicaAssignment))],
104- // );
105- // }
10670 //
10771 // // Verify that deleting a non-existent topic fails.
10872 // {
@@ -324,7 +288,7 @@ pub async fn test_topic_create_and_delete() {
324288/// Verify that incorrect replication configurations are ignored when
325289/// creating topics.
326290#[ tokio:: test]
327- pub async fn test_incorect_replication_factors_are_ignored ( ) {
291+ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_topics ( ) {
328292 // Get Kafka container context.
329293 let kafka_context_result = KafkaContext :: shared ( ) . await ;
330294 let Ok ( kafka_context) = kafka_context_result else {
@@ -360,3 +324,67 @@ pub async fn test_incorect_replication_factors_are_ignored() {
360324 res,
361325 )
362326}
327+
328+ /// Verify that incorrect replication configurations are ignored when
329+ /// creating partitions.
330+ #[ tokio:: test]
331+ pub async fn test_incorrect_replication_factors_are_ignored_when_creating_partitions ( ) {
332+ // Get Kafka container context.
333+ let kafka_context_result = KafkaContext :: shared ( ) . await ;
334+ let Ok ( kafka_context) = kafka_context_result else {
335+ panic ! (
336+ "could not create kafka context: {}" ,
337+ kafka_context_result. unwrap_err( )
338+ ) ;
339+ } ;
340+
341+ let admin_client_result =
342+ utils:: admin:: create_admin_client ( & kafka_context. bootstrap_servers ) . await ;
343+ let Ok ( admin_client) = admin_client_result else {
344+ panic ! (
345+ "could not create admin client: {}" ,
346+ admin_client_result. unwrap_err( )
347+ ) ;
348+ } ;
349+ let opts = AdminOptions :: new ( ) . operation_timeout ( Some ( Duration :: from_secs ( 30 ) ) ) ;
350+
351+ // Create consumer client
352+ let consumer_client =
353+ utils:: consumer:: create_unsubscribed_base_consumer ( & kafka_context. bootstrap_servers )
354+ . await
355+ . expect ( "could not create consumer client" ) ;
356+
357+ let name = rand_test_topic ( "test_topics" ) ;
358+ let topic = NewTopic :: new ( & name, 1 , TopicReplication :: Fixed ( 1 ) ) ;
359+
360+ let res = admin_client
361+ . create_topics ( vec ! [ & topic] , & opts)
362+ . await
363+ . expect ( "topic creation failed" ) ;
364+ assert_eq ! ( res, & [ Ok ( name. clone( ) ) ] ) ;
365+ let _ = utils:: consumer:: fetch_consumer_metadata ( & consumer_client, & name) ;
366+
367+ // This partition specification is obviously garbage, and so trips
368+ // a client-side error.
369+ let partitions = NewPartitions :: new ( & name, 2 ) . assign ( & [ & [ 0 ] , & [ 0 ] , & [ 0 ] ] ) ;
370+ let res = admin_client. create_partitions ( & [ partitions] , & opts) . await ;
371+ assert_eq ! (
372+ res,
373+ Err ( KafkaError :: AdminOpCreation ( format!(
374+ "partition assignment for topic '{}' assigns 3 partition(s), \
375+ which is more than the requested total number of partitions (2)",
376+ name
377+ ) ) )
378+ ) ;
379+
380+ // Only the server knows that this partition specification is garbage.
381+ let partitions = NewPartitions :: new ( & name, 2 ) . assign ( & [ & [ 0 ] , & [ 0 ] ] ) ;
382+ let res = admin_client
383+ . create_partitions ( & [ partitions] , & opts)
384+ . await
385+ . expect ( "partition creation failed" ) ;
386+ assert_eq ! (
387+ res,
388+ & [ Err ( ( name, RDKafkaErrorCode :: InvalidReplicaAssignment ) ) ] ,
389+ ) ;
390+ }
0 commit comments