8484import org .springframework .kafka .core .ProducerFactory ;
8585import org .springframework .kafka .core .ProducerFactoryUtils ;
8686import org .springframework .kafka .event .ConsumerStoppedEvent ;
87+ import org .springframework .kafka .event .ConsumerStoppedEvent .Reason ;
8788import org .springframework .kafka .event .ListenerContainerIdleEvent ;
8889import org .springframework .kafka .listener .ContainerProperties .AckMode ;
8990import org .springframework .kafka .listener .ContainerProperties .AssignmentCommitOption ;
@@ -172,15 +173,29 @@ public void testConsumeAndProduceTransactionKTM_BETA() throws Exception {
172173 testConsumeAndProduceTransactionGuts (false , false , AckMode .RECORD , EOSMode .BETA );
173174 }
174175
176+ @ Test
177+ public void testConsumeAndProduceTransactionStopWhenFenced () throws Exception {
178+ testConsumeAndProduceTransactionGuts (false , false , AckMode .RECORD , EOSMode .BETA , true );
179+ }
180+
175181 @ SuppressWarnings ({ "rawtypes" , "unchecked" })
176182 private void testConsumeAndProduceTransactionGuts (boolean chained , boolean handleError , AckMode ackMode ,
177183 EOSMode eosMode ) throws Exception {
178184
185+ testConsumeAndProduceTransactionGuts (chained , handleError , ackMode , eosMode , false );
186+ }
187+
188+ @ SuppressWarnings ({ "rawtypes" , "unchecked" })
189+ private void testConsumeAndProduceTransactionGuts (boolean chained , boolean handleError , AckMode ackMode ,
190+ EOSMode eosMode , boolean stopWhenFenced ) throws Exception {
191+
179192 Consumer consumer = mock (Consumer .class );
193+ AtomicBoolean assigned = new AtomicBoolean ();
180194 final TopicPartition topicPartition = new TopicPartition ("foo" , 0 );
181195 willAnswer (i -> {
182196 ((ConsumerRebalanceListener ) i .getArgument (1 ))
183197 .onPartitionsAssigned (Collections .singletonList (topicPartition ));
198+ assigned .set (true );
184199 return null ;
185200 }).given (consumer ).subscribe (any (Collection .class ), any (ConsumerRebalanceListener .class ));
186201 ConsumerRecords records = new ConsumerRecords (Collections .singletonMap (topicPartition ,
@@ -199,6 +214,14 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
199214 ConsumerFactory cf = mock (ConsumerFactory .class );
200215 willReturn (consumer ).given (cf ).createConsumer ("group" , "" , null , KafkaTestUtils .defaultPropertyOverrides ());
201216 Producer producer = mock (Producer .class );
217+ if (stopWhenFenced ) {
218+ willAnswer (inv -> {
219+ if (assigned .get ()) {
220+ throw new ProducerFencedException ("fenced" );
221+ }
222+ return null ;
223+ }).given (producer ).sendOffsetsToTransaction (any (), any (ConsumerGroupMetadata .class ));
224+ }
202225 given (producer .send (any (), any ())).willReturn (new SettableListenableFuture <>());
203226 final CountDownLatch closeLatch = new CountDownLatch (2 );
204227 willAnswer (i -> {
@@ -224,6 +247,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
224247 props .setTransactionManager (ptm );
225248 props .setAssignmentCommitOption (AssignmentCommitOption .ALWAYS );
226249 props .setEosMode (eosMode );
250+ props .setStopContainerWhenFenced (stopWhenFenced );
227251 ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata ("group" );
228252 given (consumer .groupMetadata ()).willReturn (consumerGroupMetadata );
229253 final KafkaTemplate template = new KafkaTemplate (pf );
@@ -260,6 +284,14 @@ public void onMessage(Object data) {
260284 if (handleError ) {
261285 container .setErrorHandler ((e , data ) -> { });
262286 }
287+ CountDownLatch stopEventLatch = new CountDownLatch (1 );
288+ AtomicReference <ConsumerStoppedEvent > stopEvent = new AtomicReference <>();
289+ container .setApplicationEventPublisher (event -> {
290+ if (event instanceof ConsumerStoppedEvent ) {
291+ stopEvent .set ((ConsumerStoppedEvent ) event );
292+ stopEventLatch .countDown ();
293+ }
294+ });
263295 container .start ();
264296 assertThat (closeLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
265297 InOrder inOrder = inOrder (producer );
@@ -272,27 +304,37 @@ public void onMessage(Object data) {
272304 inOrder .verify (producer ).sendOffsetsToTransaction (Collections .singletonMap (topicPartition ,
273305 new OffsetAndMetadata (0 )), consumerGroupMetadata );
274306 }
275- inOrder .verify (producer ).commitTransaction ();
276- inOrder .verify (producer ).close (any ());
277- inOrder .verify (producer ).beginTransaction ();
278- ArgumentCaptor <ProducerRecord > captor = ArgumentCaptor .forClass (ProducerRecord .class );
279- inOrder .verify (producer ).send (captor .capture (), any (Callback .class ));
280- assertThat (captor .getValue ()).isEqualTo (new ProducerRecord ("bar" , "baz" ));
281- if (eosMode .equals (EOSMode .ALPHA )) {
282- inOrder .verify (producer ).sendOffsetsToTransaction (Collections .singletonMap (topicPartition ,
283- new OffsetAndMetadata (1 )), "group" );
307+ if (stopWhenFenced ) {
308+ assertThat (stopEventLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
309+ assertThat (stopEvent .get ().getReason ()).isEqualTo (Reason .FENCED );
284310 }
285311 else {
286- inOrder .verify (producer ).sendOffsetsToTransaction (Collections .singletonMap (topicPartition ,
287- new OffsetAndMetadata (1 )), consumerGroupMetadata );
312+ inOrder .verify (producer ).commitTransaction ();
313+ inOrder .verify (producer ).close (any ());
314+ inOrder .verify (producer ).beginTransaction ();
315+ ArgumentCaptor <ProducerRecord > captor = ArgumentCaptor .forClass (ProducerRecord .class );
316+ inOrder .verify (producer ).send (captor .capture (), any (Callback .class ));
317+ assertThat (captor .getValue ()).isEqualTo (new ProducerRecord ("bar" , "baz" ));
318+ if (eosMode .equals (EOSMode .ALPHA )) {
319+ inOrder .verify (producer ).sendOffsetsToTransaction (Collections .singletonMap (topicPartition ,
320+ new OffsetAndMetadata (1 )), "group" );
321+ }
322+ else {
323+ inOrder .verify (producer ).sendOffsetsToTransaction (Collections .singletonMap (topicPartition ,
324+ new OffsetAndMetadata (1 )), consumerGroupMetadata );
325+ }
326+ inOrder .verify (producer ).commitTransaction ();
327+ inOrder .verify (producer ).close (any ());
328+ container .stop ();
329+ verify (pf , times (2 )).createProducer (isNull ());
330+ verifyNoMoreInteractions (producer );
331+ assertThat (transactionalIds .get (0 )).isEqualTo ("group.foo.0" );
332+ assertThat (transactionalIds .get (0 )).isEqualTo ("group.foo.0" );
333+ assertThat (stopEventLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
334+ assertThat (stopEvent .get ().getReason ()).isEqualTo (Reason .NORMAL );
288335 }
289- inOrder .verify (producer ).commitTransaction ();
290- inOrder .verify (producer ).close (any ());
291- container .stop ();
292- verify (pf , times (2 )).createProducer (isNull ());
293- verifyNoMoreInteractions (producer );
294- assertThat (transactionalIds .get (0 )).isEqualTo ("group.foo.0" );
295- assertThat (transactionalIds .get (0 )).isEqualTo ("group.foo.0" );
336+ MessageListenerContainer stoppedContainer = stopEvent .get ().getContainer ();
337+ assertThat (stoppedContainer ).isSameAs (container );
296338 }
297339
298340 @ SuppressWarnings ({ "rawtypes" , "unchecked" })
0 commit comments