2020import org .apache .kafka .clients .admin .Admin ;
2121import org .apache .kafka .clients .admin .AdminClientConfig ;
2222import org .apache .kafka .clients .admin .FenceProducersOptions ;
23- import org .apache .kafka .clients .producer .KafkaProducer ;
23+ import org .apache .kafka .clients .producer .Producer ;
2424import org .apache .kafka .clients .producer .ProducerConfig ;
2525import org .apache .kafka .clients .producer .ProducerRecord ;
2626import org .apache .kafka .common .errors .ApiException ;
2727import org .apache .kafka .common .errors .InvalidProducerEpochException ;
2828import org .apache .kafka .common .errors .ProducerFencedException ;
2929import org .apache .kafka .common .errors .TimeoutException ;
30- import org .apache .kafka .common .serialization .ByteArraySerializer ;
3130import org .apache .kafka .common .test .api .ClusterConfigProperty ;
3231import org .apache .kafka .common .test .api .ClusterInstance ;
3332import org .apache .kafka .common .test .api .ClusterTest ;
4241import java .util .Collections ;
4342import java .util .HashMap ;
4443import java .util .Map ;
45- import java .util .Properties ;
4644import java .util .concurrent .ExecutionException ;
4745
4846import static org .junit .jupiter .api .Assertions .assertInstanceOf ;
@@ -68,22 +66,16 @@ public class AdminFenceProducersTest {
6866 this .clusterInstance = clusterInstance ;
6967 }
7068
71- private KafkaProducer <byte [], byte []> createProducer () {
72- Properties config = new Properties ();
73- config .put (ProducerConfig .BOOTSTRAP_SERVERS_CONFIG , clusterInstance .bootstrapServers ());
74- config .put (ProducerConfig .TRANSACTIONAL_ID_CONFIG , TXN_ID );
75- config .put (ProducerConfig .TRANSACTION_TIMEOUT_CONFIG , "2000" );
76- config .put (ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG , ByteArraySerializer .class .getName ());
77- config .put (ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG , ByteArraySerializer .class .getName ());
78-
79- return new KafkaProducer <>(config );
69+ private Producer <byte [], byte []> createProducer () {
70+ return clusterInstance .producer (Map .of (ProducerConfig .TRANSACTIONAL_ID_CONFIG , TXN_ID ,
71+ ProducerConfig .TRANSACTION_TIMEOUT_CONFIG , "2000" ));
8072 }
8173
8274 @ ClusterTest
8375 void testFenceAfterProducerCommit () throws Exception {
8476 clusterInstance .createTopic (TOPIC_NAME , 1 , (short ) 1 );
8577
86- try (KafkaProducer <byte [], byte []> producer = createProducer ();
78+ try (Producer <byte [], byte []> producer = createProducer ();
8779 Admin adminClient = clusterInstance .admin ()) {
8880 producer .initTransactions ();
8981 producer .beginTransaction ();
@@ -125,7 +117,7 @@ void testFenceProducerTimeoutMs() {
125117 void testFenceBeforeProducerCommit () throws Exception {
126118 clusterInstance .createTopic (TOPIC_NAME , 1 , (short ) 1 );
127119
128- try (KafkaProducer <byte [], byte []> producer = createProducer ();
120+ try (Producer <byte [], byte []> producer = createProducer ();
129121 Admin adminClient = clusterInstance .admin ()) {
130122
131123 producer .initTransactions ();
0 commit comments