@@ -52,16 +52,17 @@ void testASimpleProducerSmokeTest() {
5252        props .put (ProducerConfig .ACKS_CONFIG , "all" );
5353        props .put (ProducerConfig .ENABLE_IDEMPOTENCE_CONFIG , "true" );
5454
55-         KafkaProducer <Integer , String > producer  = new  KafkaProducer <>(props ); 
56-         producer .initTransactions ();
57-         producer .beginTransaction ();
55+         try ( KafkaProducer <Integer , String > producer  = new  KafkaProducer <>(props )) { 
56+              producer .initTransactions ();
57+              producer .beginTransaction ();
5858
59-         IntStream .range (0 , 10 ).forEach (i  -> {
60-             producer .send (new  ProducerRecord <>(topicName , i , "Hello, World!" ));
61-         });
62-         producer .commitTransaction ();
59+              IntStream .range (0 , 10 ).forEach (i  -> {
60+                  producer .send (new  ProducerRecord <>(topicName , i , "Hello, World!" ));
61+              });
62+              producer .commitTransaction ();
6363
64-         assertProduced (10 , topicName );
64+             assertProduced (10 , topicName );
65+         }
6566    }
6667
6768    @ Test 
@@ -84,14 +85,15 @@ void testSinkSavesAvroDataToMongoDB() {
8485        producerProps .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , "io.confluent.kafka.serializers.KafkaAvroSerializer" );
8586        producerProps .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , "io.confluent.kafka.serializers.KafkaAvroSerializer" );
8687        producerProps .put (KafkaAvroSerializerConfig .SCHEMA_REGISTRY_URL_CONFIG , KAFKA .schemaRegistryUrl ());
87-         KafkaProducer <String , TweetMsg > producer  = new  KafkaProducer <>(producerProps );
8888
89-         producer .initTransactions ();
90-         producer .beginTransaction ();
91-         tweets .forEach (tweet  -> producer .send (new  ProducerRecord <>(topicName , tweet )));
92-         producer .commitTransaction ();
89+         try (KafkaProducer <String , TweetMsg > producer  = new  KafkaProducer <>(producerProps )) {
90+             producer .initTransactions ();
91+             producer .beginTransaction ();
92+             tweets .forEach (tweet  -> producer .send (new  ProducerRecord <>(topicName , tweet )));
93+             producer .commitTransaction ();
9394
94-         assertProduced (100 , topicName );
95-         assertEquals (100 , getCollection ().countDocuments ());
95+             assertProduced (100 , topicName );
96+             assertEquals (100 , getCollection ().countDocuments ());
97+         }
9698    }
9799}
0 commit comments