@@ -2,6 +2,12 @@ use crate::utils::admin::create_topic;
22use crate :: utils:: containers:: KafkaContext ;
33use crate :: utils:: logging:: init_test_logger;
44use crate :: utils:: rand:: rand_test_topic;
5+ use crate :: utils:: { get_broker_version, KafkaVersion } ;
6+ use rdkafka:: admin:: {
7+ AdminOptions , ConfigEntry , ConfigSource , NewPartitions , NewTopic , ResourceSpecifier ,
8+ TopicReplication ,
9+ } ;
10+ use std:: time:: Duration ;
511
612#[ path = "utils/mod.rs" ]
713mod utils;
@@ -37,3 +43,290 @@ pub async fn test_topic_creation() {
3743 ) ;
3844 } ;
3945}
46+
47+ #[ tokio:: test]
48+ async fn test_topics ( ) {
49+ init_test_logger ( ) ;
50+
51+ // Get Kafka container context.
52+ let kafka_context = KafkaContext :: shared ( )
53+ . await
54+ . expect ( "could not create kafka context" ) ;
55+
56+ // Create admin client
57+ let admin_client = utils:: admin:: create_admin_client ( & kafka_context. bootstrap_servers )
58+ . await
59+ . expect ( "could not create admin client" ) ;
60+ let opts = AdminOptions :: new ( ) . operation_timeout ( Some ( Duration :: from_secs ( 30 ) ) ) ;
61+
62+ // Create consumer client
63+ let consumer_client =
64+ utils:: consumer:: create_unsubscribed_base_consumer ( & kafka_context. bootstrap_servers )
65+ . await
66+ . expect ( "could not create consumer client" ) ;
67+
68+ // // Verify that incorrect replication configurations are ignored when
69+ // // creating topics.
70+ // {
71+ // let topic = NewTopic::new("ignored", 1, TopicReplication::Variable(&[&[0], &[0]]));
72+ // let res = admin_client.create_topics(&[topic], &opts).await;
73+ // assert_eq!(
74+ // Err(KafkaError::AdminOpCreation(
75+ // "replication configuration for topic 'ignored' assigns 2 partition(s), \
76+ // which does not match the specified number of partitions (1)"
77+ // .into()
78+ // )),
79+ // res,
80+ // )
81+ // }
82+ //
83+ // // Verify that incorrect replication configurations are ignored when
84+ // // creating partitions.
85+ // {
86+ // let name = rand_test_topic("test_topics");
87+ // let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));
88+ //
89+ // let res = admin_client
90+ // .create_topics(vec![&topic], &opts)
91+ // .await
92+ // .expect("topic creation failed");
93+ // assert_eq!(res, &[Ok(name.clone())]);
94+ // let _ = fetch_metadata(&name);
95+ //
96+ // // This partition specification is obviously garbage, and so trips
97+ // // a client-side error.
98+ // let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0], &[0]]);
99+ // let res = admin_client.create_partitions(&[partitions], &opts).await;
100+ // assert_eq!(
101+ // res,
102+ // Err(KafkaError::AdminOpCreation(format!(
103+ // "partition assignment for topic '{}' assigns 3 partition(s), \
104+ // which is more than the requested total number of partitions (2)",
105+ // name
106+ // )))
107+ // );
108+ //
109+ // // Only the server knows that this partition specification is garbage.
110+ // let partitions = NewPartitions::new(&name, 2).assign(&[&[0], &[0]]);
111+ // let res = admin_client
112+ // .create_partitions(&[partitions], &opts)
113+ // .await
114+ // .expect("partition creation failed");
115+ // assert_eq!(
116+ // res,
117+ // &[Err((name, RDKafkaErrorCode::InvalidReplicaAssignment))],
118+ // );
119+ // }
120+ //
121+ // // Verify that deleting a non-existent topic fails.
122+ // {
123+ // let name = rand_test_topic("test_topics");
124+ // let res = admin_client
125+ // .delete_topics(&[&name], &opts)
126+ // .await
127+ // .expect("delete topics failed");
128+ // assert_eq!(
129+ // res,
130+ // &[Err((name, RDKafkaErrorCode::UnknownTopicOrPartition))]
131+ // );
132+ // }
133+ //
134+ // // Verify that mixed-success operations properly report the successful and
135+ // // failing operators.
136+ // {
137+ // let name1 = rand_test_topic("test_topics");
138+ // let name2 = rand_test_topic("test_topics");
139+ //
140+ // let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
141+ // let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
142+ //
143+ // let res = admin_client
144+ // .create_topics(vec![&topic1], &opts)
145+ // .await
146+ // .expect("topic creation failed");
147+ // assert_eq!(res, &[Ok(name1.clone())]);
148+ // let _ = fetch_metadata(&name1);
149+ //
150+ // let res = admin_client
151+ // .create_topics(vec![&topic1, &topic2], &opts)
152+ // .await
153+ // .expect("topic creation failed");
154+ // assert_eq!(
155+ // res,
156+ // &[
157+ // Err((name1.clone(), RDKafkaErrorCode::TopicAlreadyExists)),
158+ // Ok(name2.clone())
159+ // ]
160+ // );
161+ // let _ = fetch_metadata(&name2);
162+ //
163+ // let res = admin_client
164+ // .delete_topics(&[&name1], &opts)
165+ // .await
166+ // .expect("topic deletion failed");
167+ // assert_eq!(res, &[Ok(name1.clone())]);
168+ // verify_delete(&name1);
169+ //
170+ // let res = admin_client
171+ // .delete_topics(&[&name2, &name1], &opts)
172+ // .await
173+ // .expect("topic deletion failed");
174+ // assert_eq!(
175+ // res,
176+ // &[
177+ // Ok(name2.clone()),
178+ // Err((name1.clone(), RDKafkaErrorCode::UnknownTopicOrPartition))
179+ // ]
180+ // );
181+ // }
182+ }
183+
184+ #[ tokio:: test]
185+ pub async fn test_topic_create_and_delete ( ) {
186+ // Get Kafka container context.
187+ let kafka_context = KafkaContext :: shared ( )
188+ . await
189+ . expect ( "could not create kafka context" ) ;
190+
191+ // Create admin client
192+ let admin_client = utils:: admin:: create_admin_client ( & kafka_context. bootstrap_servers )
193+ . await
194+ . expect ( "could not create admin client" ) ;
195+ let opts = AdminOptions :: new ( ) . operation_timeout ( Some ( Duration :: from_secs ( 30 ) ) ) ;
196+
197+ // Create consumer client
198+ let consumer_client =
199+ utils:: consumer:: create_unsubscribed_base_consumer ( & kafka_context. bootstrap_servers )
200+ . await
201+ . expect ( "could not create consumer client" ) ;
202+
203+ let topic_name_1 = rand_test_topic ( "test_topics" ) ;
204+ let topic_name_2 = rand_test_topic ( "test_topics" ) ;
205+ let topic1 = NewTopic :: new ( & topic_name_1, 1 , TopicReplication :: Fixed ( 1 ) )
206+ . set ( "max.message.bytes" , "1234" ) ;
207+ let topic2 = NewTopic {
208+ name : & topic_name_2,
209+ num_partitions : 3 ,
210+ replication : TopicReplication :: Variable ( & [ & [ 1 ] , & [ 1 ] , & [ 1 ] ] ) ,
211+ config : Vec :: new ( ) ,
212+ } ;
213+
214+ // Topics created
215+ let topic_results = admin_client
216+ . create_topics ( & [ topic1, topic2] , & opts)
217+ . await
218+ . expect ( "topic creation failed" ) ;
219+ assert_eq ! (
220+ topic_results,
221+ & [ Ok ( topic_name_1. clone( ) ) , Ok ( topic_name_2. clone( ) ) ]
222+ ) ;
223+
224+ // Verify metadata
225+ let metadata1 = utils:: consumer:: fetch_consumer_metadata ( & consumer_client, & topic_name_1)
226+ . expect ( & format ! ( "failed to fetch metadata for {}" , & topic_name_1) ) ;
227+ let metadata2 = utils:: consumer:: fetch_consumer_metadata ( & consumer_client, & topic_name_2)
228+ . expect ( & format ! ( "failed to fetch metadata for {}" , topic_name_2) ) ;
229+ assert_eq ! ( 1 , metadata1. topics( ) . len( ) ) ;
230+ assert_eq ! ( 1 , metadata2. topics( ) . len( ) ) ;
231+ let metadata_topic1 = & metadata1. topics ( ) [ 0 ] ;
232+ let metadata_topic2 = & metadata2. topics ( ) [ 0 ] ;
233+ assert_eq ! ( & topic_name_1, metadata_topic1. name( ) ) ;
234+ assert_eq ! ( & topic_name_2, metadata_topic2. name( ) ) ;
235+ assert_eq ! ( 1 , metadata_topic1. partitions( ) . len( ) ) ;
236+ assert_eq ! ( 3 , metadata_topic2. partitions( ) . len( ) ) ;
237+
238+ // Verifying topic configurations
239+ let config_resource_results = admin_client
240+ . describe_configs (
241+ & [
242+ ResourceSpecifier :: Topic ( & topic_name_1) ,
243+ ResourceSpecifier :: Topic ( & topic_name_2) ,
244+ ] ,
245+ & opts,
246+ )
247+ . await
248+ . expect ( "could not describe configs" ) ;
249+ let topic_config1 = & config_resource_results[ 0 ]
250+ . as_ref ( )
251+ . expect ( & format ! ( "failed to describe config for {}" , & topic_name_1) ) ;
252+ let topic_config2 = & config_resource_results[ 1 ]
253+ . as_ref ( )
254+ . expect ( & format ! ( "failed to describe config for {}" , & topic_name_2) ) ;
255+ let mut expected_entry1 = ConfigEntry {
256+ name : "max.message.bytes" . into ( ) ,
257+ value : Some ( "1234" . into ( ) ) ,
258+ source : ConfigSource :: DynamicTopic ,
259+ is_read_only : false ,
260+ is_default : false ,
261+ is_sensitive : false ,
262+ } ;
263+ let default_max_msg_bytes = if get_broker_version ( & kafka_context) <= KafkaVersion ( 2 , 3 , 0 , 0 ) {
264+ "1000012"
265+ } else {
266+ "1048588"
267+ } ;
268+ let expected_entry2 = ConfigEntry {
269+ name : "max.message.bytes" . into ( ) ,
270+ value : Some ( default_max_msg_bytes. into ( ) ) ,
271+ source : ConfigSource :: Default ,
272+ is_read_only : false ,
273+ is_default : true ,
274+ is_sensitive : false ,
275+ } ;
276+ if get_broker_version ( & kafka_context) < KafkaVersion ( 1 , 1 , 0 , 0 ) {
277+ expected_entry1. source = ConfigSource :: Unknown ;
278+ }
279+ assert_eq ! (
280+ Some ( & expected_entry1) ,
281+ topic_config1. get( "max.message.bytes" )
282+ ) ;
283+ assert_eq ! (
284+ Some ( & expected_entry2) ,
285+ topic_config2. get( "max.message.bytes" )
286+ ) ;
287+ let config_entries1 = topic_config1. entry_map ( ) ;
288+ let config_entries2 = topic_config2. entry_map ( ) ;
289+ assert_eq ! ( topic_config1. entries. len( ) , config_entries1. len( ) ) ;
290+ assert_eq ! ( topic_config2. entries. len( ) , config_entries2. len( ) ) ;
291+ assert_eq ! (
292+ Some ( &&expected_entry1) ,
293+ config_entries1. get( "max.message.bytes" )
294+ ) ;
295+ assert_eq ! (
296+ Some ( &&expected_entry2) ,
297+ config_entries2. get( "max.message.bytes" )
298+ ) ;
299+
300+ let partitions1 = NewPartitions :: new ( & topic_name_1, 5 ) ;
301+ let res = admin_client
302+ . create_partitions ( & [ partitions1] , & opts)
303+ . await
304+ . expect ( "partition creation failed" ) ;
305+ assert_eq ! ( res, & [ Ok ( topic_name_1. clone( ) ) ] ) ;
306+
307+ let mut tries = 0 ;
308+ loop {
309+ let metadata = utils:: consumer:: fetch_consumer_metadata ( & consumer_client, & topic_name_1)
310+ . expect ( & format ! ( "failed to fetch metadata for {}" , & topic_name_1) ) ;
311+ let topic = & metadata. topics ( ) [ 0 ] ;
312+ let n = topic. partitions ( ) . len ( ) ;
313+ if n == 5 {
314+ break ;
315+ } else if tries >= 5 {
316+ panic ! ( "topic has {} partitions, but expected {}" , n, 5 ) ;
317+ } else {
318+ tries += 1 ;
319+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
320+ }
321+ }
322+
323+ let res = admin_client
324+ . delete_topics ( & [ & topic_name_1, & topic_name_2] , & opts)
325+ . await
326+ . expect ( "topic deletion failed" ) ;
327+ assert_eq ! ( res, & [ Ok ( topic_name_1. clone( ) ) , Ok ( topic_name_2. clone( ) ) ] ) ;
328+ utils:: consumer:: verify_topic_deleted ( & consumer_client, & topic_name_1)
329+ . expect ( & format ! ( "could not delete topic for {}" , & topic_name_1) ) ;
330+ utils:: consumer:: verify_topic_deleted ( & consumer_client, & topic_name_2)
331+ . expect ( & format ! ( "could not delete topic for {}" , & topic_name_2) ) ;
332+ }
0 commit comments