11/*
2- * Copyright 2020 the original author or authors.
2+ * Copyright 2020-2022 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1919import static org .assertj .core .api .Assertions .assertThat ;
2020import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
2121import static org .mockito .ArgumentMatchers .any ;
22+ import static org .mockito .BDDMockito .given ;
23+ import static org .mockito .BDDMockito .willAnswer ;
2224import static org .mockito .Mockito .mock ;
2325import static org .mockito .Mockito .times ;
2426import static org .mockito .Mockito .verify ;
2931import java .util .HashMap ;
3032import java .util .List ;
3133import java .util .Map ;
34+ import java .util .concurrent .atomic .AtomicBoolean ;
3235
3336import org .apache .kafka .clients .consumer .Consumer ;
3437import org .apache .kafka .clients .consumer .ConsumerRecord ;
3538import org .apache .kafka .clients .consumer .ConsumerRecords ;
3639import org .apache .kafka .common .TopicPartition ;
3740import org .junit .jupiter .api .Test ;
3841
42+ import org .springframework .kafka .KafkaException ;
3943import org .springframework .util .backoff .FixedBackOff ;
4044
4145/**
@@ -62,6 +66,7 @@ void recover() {
6266 ConsumerRecords <?, ?> records = new ConsumerRecords <>(map );
6367 Consumer <?, ?> consumer = mock (Consumer .class );
6468 MessageListenerContainer container = mock (MessageListenerContainer .class );
69+ given (container .isRunning ()).willReturn (true );
6570 eh .handle (new RuntimeException (), records , consumer , container , () -> {
6671 this .invoked ++;
6772 throw new RuntimeException ();
@@ -90,6 +95,7 @@ void successOnRetry() {
9095 ConsumerRecords <?, ?> records = new ConsumerRecords <>(map );
9196 Consumer <?, ?> consumer = mock (Consumer .class );
9297 MessageListenerContainer container = mock (MessageListenerContainer .class );
98+ given (container .isRunning ()).willReturn (true );
9399 eh .handle (new RuntimeException (), records , consumer , container , () -> this .invoked ++);
94100 assertThat (this .invoked ).isEqualTo (1 );
95101 assertThat (recovered ).hasSize (0 );
@@ -116,6 +122,7 @@ void recoveryFails() {
116122 ConsumerRecords <?, ?> records = new ConsumerRecords <>(map );
117123 Consumer <?, ?> consumer = mock (Consumer .class );
118124 MessageListenerContainer container = mock (MessageListenerContainer .class );
125+ given (container .isRunning ()).willReturn (true );
119126 assertThatExceptionOfType (RuntimeException .class ).isThrownBy (() ->
120127 eh .handle (new RuntimeException (), records , consumer , container , () -> {
121128 this .invoked ++;
@@ -131,4 +138,31 @@ void recoveryFails() {
131138 verify (consumer ).seek (new TopicPartition ("foo" , 1 ), 0L );
132139 }
133140
141+ @ Test
142+ void exitOnContainerStop () {
143+ this .invoked = 0 ;
144+ List <ConsumerRecord <?, ?>> recovered = new ArrayList <>();
145+ RetryingBatchErrorHandler eh = new RetryingBatchErrorHandler (new FixedBackOff (0 , 99999 ), (cr , ex ) -> {
146+ recovered .add (cr );
147+ });
148+ Map <TopicPartition , List <ConsumerRecord <Object , Object >>> map = new HashMap <>();
149+ map .put (new TopicPartition ("foo" , 0 ),
150+ Collections .singletonList (new ConsumerRecord <>("foo" , 0 , 0L , "foo" , "bar" )));
151+ map .put (new TopicPartition ("foo" , 1 ),
152+ Collections .singletonList (new ConsumerRecord <>("foo" , 1 , 0L , "foo" , "bar" )));
153+ ConsumerRecords <?, ?> records = new ConsumerRecords <>(map );
154+ Consumer <?, ?> consumer = mock (Consumer .class );
155+ MessageListenerContainer container = mock (MessageListenerContainer .class );
156+ AtomicBoolean stopped = new AtomicBoolean (true );
157+ willAnswer (inv -> stopped .get ()).given (container ).isRunning ();
158+ assertThatExceptionOfType (KafkaException .class ).isThrownBy (() ->
159+ eh .handle (new RuntimeException (), records , consumer , container , () -> {
160+ this .invoked ++;
161+ stopped .set (false );
162+ throw new RuntimeException ();
163+ })
164+ ).withMessage ("Container stopped during retries" );
165+ assertThat (this .invoked ).isEqualTo (1 );
166+ }
167+
134168}
0 commit comments