11/*
2- * Copyright 2017-2020 the original author or authors.
2+ * Copyright 2017-2021 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
9292 */
9393@ EmbeddedKafka (topics = { KafkaTemplateTransactionTests .STRING_KEY_TOPIC ,
9494 KafkaTemplateTransactionTests .LOCAL_TX_IN_TOPIC }, brokerProperties = {
95- "transaction.state.log.replication.factor=1" , "transaction.state.log.min.isr=1" })
95+ "transaction.state.log.replication.factor=1" , "transaction.state.log.min.isr=1" })
9696public class KafkaTemplateTransactionTests {
9797
9898 public static final String STRING_KEY_TOPIC = "stringKeyTopic" ;
@@ -128,7 +128,7 @@ public void testLocalTransaction() {
128128 new OffsetAndMetadata (singleRecord .offset () + 1L )), "testLocalTx" );
129129 assertThat (KafkaTestUtils .getPropertyValue (
130130 KafkaTestUtils .getPropertyValue (template , "producers" , ThreadLocal .class ).get (),
131- "delegate.transactionManager.transactionalId" )).isEqualTo ("my.transaction.0" );
131+ "delegate.transactionManager.transactionalId" )).isEqualTo ("my.transaction.0" );
132132 return null ;
133133 });
134134 ConsumerRecords <String , String > records = KafkaTestUtils .getRecords (consumer );
@@ -300,14 +300,14 @@ public void testTransactionSynchronizationExceptionOnCommit() {
300300 ResourcelessTransactionManager tm = new ResourcelessTransactionManager ();
301301
302302 assertThatExceptionOfType (ProducerFencedException .class ).isThrownBy (() ->
303- new TransactionTemplate (tm )
304- .execute (s -> {
305- template .sendDefault ("foo" , "bar" );
303+ new TransactionTemplate (tm )
304+ .execute (s -> {
305+ template .sendDefault ("foo" , "bar" );
306306
307- // Mark the mock producer as fenced so it throws when committing the transaction
308- producer .fenceProducer ();
309- return null ;
310- }));
307+ // Mark the mock producer as fenced so it throws when committing the transaction
308+ producer .fenceProducer ();
309+ return null ;
310+ }));
311311
312312 assertThat (producer .transactionCommitted ()).isFalse ();
313313 assertThat (producer .closed ()).isTrue ();
@@ -384,30 +384,30 @@ public void testQuickCloseAfterCommitTimeout() {
384384 DefaultKafkaProducerFactory <String , String > pf =
385385 new DefaultKafkaProducerFactory <String , String >(Collections .emptyMap ()) {
386386
387- @ SuppressWarnings ({ "rawtypes" , "unchecked" })
388- @ Override
389- public Producer <String , String > createProducer (String txIdPrefixArg ) {
390- CloseSafeProducer <String , String > closeSafeProducer = new CloseSafeProducer <>(producer ,
391- (prod , timeout ) -> {
392- prod .closeDelegate (timeout , Collections .emptyList ());
393- return true ;
394- },
395- Duration .ofSeconds (1 ), "factory" , 0 );
396- return closeSafeProducer ;
397- }
398-
399- };
387+ @ SuppressWarnings ({ "rawtypes" , "unchecked" })
388+ @ Override
389+ public Producer <String , String > createProducer (String txIdPrefixArg ) {
390+ CloseSafeProducer <String , String > closeSafeProducer = new CloseSafeProducer <>(producer ,
391+ (prod , timeout ) -> {
392+ prod .closeDelegate (timeout , Collections .emptyList ());
393+ return true ;
394+ },
395+ Duration .ofSeconds (1 ), "factory" , 0 );
396+ return closeSafeProducer ;
397+ }
398+
399+ };
400400 pf .setTransactionIdPrefix ("foo" );
401401
402402 KafkaTemplate <String , String > template = new KafkaTemplate <>(pf );
403403 template .setDefaultTopic (STRING_KEY_TOPIC );
404404
405405 willThrow (new TimeoutException ()).given (producer ).commitTransaction ();
406406 assertThatExceptionOfType (TimeoutException .class )
407- .isThrownBy (() ->
408- template .executeInTransaction (t -> {
409- return null ;
410- }));
407+ .isThrownBy (() ->
408+ template .executeInTransaction (t -> {
409+ return null ;
410+ }));
411411 verify (producer , never ()).abortTransaction ();
412412 verify (producer ).close (Duration .ofMillis (0 ));
413413 }
@@ -420,24 +420,24 @@ void testNormalCloseAfterCommitCacheFull() {
420420 DefaultKafkaProducerFactory <String , String > pf =
421421 new DefaultKafkaProducerFactory <String , String >(Collections .emptyMap ()) {
422422
423- @ SuppressWarnings ("unchecked" )
424- @ Override
425- public Producer <String , String > createProducer (String txIdPrefixArg ) {
426- BlockingQueue <CloseSafeProducer <String , String >> cache = new LinkedBlockingDeque <>(1 );
427- try {
428- cache .put (new CloseSafeProducer <>(mock (Producer .class ), this ::removeProducer ,
429- Duration .ofSeconds (1 ), "factory" , 0 ));
430- }
431- catch (@ SuppressWarnings ("unused" ) InterruptedException e ) {
432- Thread .currentThread ().interrupt ();
433- }
434- KafkaTestUtils .getPropertyValue (this , "cache" , Map .class ).put ("foo" , cache );
435- CloseSafeProducer <String , String > closeSafeProducer = new CloseSafeProducer <>(producer ,
436- this ::cacheReturner , "foo" , Duration .ofSeconds (1 ), "factory" , 0 );
437- return closeSafeProducer ;
438- }
439-
440- };
423+ @ SuppressWarnings ("unchecked" )
424+ @ Override
425+ public Producer <String , String > createProducer (String txIdPrefixArg ) {
426+ BlockingQueue <CloseSafeProducer <String , String >> cache = new LinkedBlockingDeque <>(1 );
427+ try {
428+ cache .put (new CloseSafeProducer <>(mock (Producer .class ), this ::removeProducer ,
429+ Duration .ofSeconds (1 ), "factory" , 0 ));
430+ }
431+ catch (@ SuppressWarnings ("unused" ) InterruptedException e ) {
432+ Thread .currentThread ().interrupt ();
433+ }
434+ KafkaTestUtils .getPropertyValue (this , "cache" , Map .class ).put ("foo" , cache );
435+ CloseSafeProducer <String , String > closeSafeProducer = new CloseSafeProducer <>(producer ,
436+ this ::cacheReturner , "foo" , Duration .ofSeconds (1 ), "factory" , 0 );
437+ return closeSafeProducer ;
438+ }
439+
440+ };
441441 pf .setTransactionIdPrefix ("foo" );
442442
443443 KafkaTemplate <String , String > template = new KafkaTemplate <>(pf );
@@ -577,28 +577,6 @@ void testNonTxWithTx() {
577577 pf .destroy ();
578578 }
579579
580- @ Test
581- void syncCommitFails () {
582- DummyTM tm = new DummyTM ();
583- MockProducer <String , String > producer =
584- new MockProducer <>(true , new StringSerializer (), new StringSerializer ());
585- producer .initTransactions ();
586- producer .commitTransactionException = new IllegalStateException ();
587-
588- @ SuppressWarnings ("unchecked" )
589- ProducerFactory <String , String > pf = mock (ProducerFactory .class );
590- given (pf .transactionCapable ()).willReturn (true );
591- given (pf .createProducer (isNull ())).willReturn (producer );
592-
593- KafkaTemplate <String , String > template = new KafkaTemplate <>(pf );
594- template .setDefaultTopic (STRING_KEY_TOPIC );
595-
596- assertThatExceptionOfType (IllegalStateException .class ).isThrownBy (() ->
597- new TransactionTemplate (tm ).execute (status -> template .sendDefault ("foo" )));
598-
599- assertThat (tm .committed ).isTrue ();
600- }
601-
602580 @ Configuration
603581 @ EnableTransactionManagement
604582 public static class DeclarativeConfig {
0 commit comments