2424import java .util .concurrent .TimeUnit ;
2525import java .util .function .Function ;
2626
27- import org .testcontainers .shaded .com .google .common .collect .ImmutableMap ;
28-
2927import static org .awaitility .Awaitility .await ;
3028
3129public class KafkaUtils {
3230 private static final Logger LOGGER = LogManager .getLogger (KafkaUtils .class );
3331
34- /**
35- * Creates kafka topic
36- *
37- * @param topicName the topic name
38- * @param bootstrapServer kafka bootstrap server list
39- */
40- public static void createTopic (String topicName , String bootstrapServer ) {
41- createTopic (topicName , 1 , bootstrapServer );
42- }
43-
4432 public static void createTopic (String topicName , int numOfPartitions , String bootstrapServers ) {
4533 try {
4634 getAdminClient (bootstrapServers , (client -> {
@@ -60,28 +48,24 @@ public static void createTopic(String topicName, int numOfPartitions, String boo
6048
6149 public static boolean checkTopicExistence (String topicName , String bootstrapServers ) {
6250 return getAdminClient (bootstrapServers , (client -> {
63- Map <String , KafkaFuture <TopicDescription >> topics = client .describeTopics (List .of (topicName )).values ();
51+ Map <String , KafkaFuture <TopicDescription >> topics = client .describeTopics (List .of (topicName )).topicNameValues ();
6452
6553 try {
6654 return topics .containsKey (topicName ) && topics .get (topicName ).get ().name ().equals (topicName );
67- } catch (InterruptedException e ) {
68- LOGGER .error ("error on checkTopicExistence" , e );
69- return false ;
70- } catch (ExecutionException e ) {
55+ } catch (InterruptedException | ExecutionException e ) {
7156 LOGGER .error ("error on checkTopicExistence" , e );
7257 return false ;
7358 }
7459 }));
7560 }
7661
7762 private static <Rep > Rep getAdminClient (String bootstrapServer , Function <AdminClient , Rep > function ) {
78- AdminClient adminClient = KafkaAdminClient .create (
79- ImmutableMap .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServer , AdminClientConfig .CLIENT_ID_CONFIG , "test" )
80- );
81- try {
63+ try (
64+ AdminClient adminClient = KafkaAdminClient .create (
65+ Map .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServer , AdminClientConfig .CLIENT_ID_CONFIG , "test" )
66+ )
67+ ) {
8268 return function .apply (adminClient );
83- } finally {
84- adminClient .close ();
8569 }
8670 }
8771}
0 commit comments