7676import org .springframework .kafka .transaction .KafkaTransactionManager ;
7777import org .springframework .messaging .MessageHeaders ;
7878import org .springframework .transaction .TransactionDefinition ;
79+ import org .springframework .transaction .TransactionExecution ;
80+ import org .springframework .transaction .TransactionExecutionListener ;
7981import org .springframework .transaction .annotation .Transactional ;
8082import org .springframework .transaction .support .DefaultTransactionDefinition ;
8183import org .springframework .util .backoff .FixedBackOff ;
@@ -1167,7 +1169,6 @@ void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException {
11671169 DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(consumerProperties );
11681170 ContainerProperties containerProps = new ContainerProperties (topic11 );
11691171 containerProps .setPollTimeout (10_000 );
1170- final var successLatch = new AtomicReference <>(new CountDownLatch (2 ));
11711172 containerProps .setMessageListener (new MessageListener <Integer , String >() {
11721173 @ Transactional ("testSendOffsetOnlyOnActiveTransaction" )
11731174 @ Override
@@ -1177,9 +1178,19 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
11771178
11781179 // init container
11791180 KafkaTransactionManager <Object , Object > tm = new KafkaTransactionManager <>(pf );
1181+ AtomicInteger txCount = new AtomicInteger (0 );
1182+ tm .addListener (new TransactionExecutionListener () {
1183+ @ Override
1184+ public void afterCommit (TransactionExecution transaction , @ Nullable Throwable commitFailure ) {
1185+ txCount .incrementAndGet ();
1186+ TransactionExecutionListener .super .afterCommit (transaction , commitFailure );
1187+ }
1188+ });
11801189 containerProps .setKafkaAwareTransactionManager (tm );
1190+
11811191 KafkaMessageListenerContainer <Integer , String > container = new KafkaMessageListenerContainer <>(cf , containerProps );
11821192 container .setBeanName ("testSendOffsetOnlyOnActiveTransaction" );
1193+ final var interceptorLatch = new AtomicReference <>(new CountDownLatch (1 ));
11831194 container .setRecordInterceptor (new RecordInterceptor <Integer , String >() {
11841195 boolean isFirst = true ;
11851196
@@ -1198,17 +1209,25 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
11981209 public void afterRecord (
11991210 ConsumerRecord <Integer , String > record ,
12001211 Consumer <Integer , String > consumer ) {
1201- successLatch .get ().countDown ();
1212+ interceptorLatch .get ().countDown ();
12021213 }
12031214 });
12041215 container .start ();
12051216
12061217 template .executeInTransaction (t -> {
12071218 template .send (new ProducerRecord <>(topic11 , 0 , 0 , "bar1" ));
1219+ return null ;
1220+ });
1221+ assertThat (interceptorLatch .get ().await (30 , TimeUnit .SECONDS )).isTrue ();
1222+ assertThat (txCount .get ()).isEqualTo (1 );
1223+
1224+ interceptorLatch .set (new CountDownLatch (1 ));
1225+ template .executeInTransaction (t -> {
12081226 template .send (new ProducerRecord <>(topic11 , 0 , 0 , "bar2" ));
12091227 return null ;
12101228 });
1211- assertThat (successLatch .get ().await (30 , TimeUnit .SECONDS )).isTrue ();
1229+ assertThat (interceptorLatch .get ().await (30 , TimeUnit .SECONDS )).isTrue ();
1230+ assertThat (txCount .get ()).isEqualTo (1 );
12121231
12131232 container .stop ();
12141233 pf .destroy ();
0 commit comments