1616
1717package org .springframework .kafka .listener ;
1818
19+ import java .util .Arrays ;
1920import java .util .Collections ;
2021import java .util .Map ;
2122import java .util .concurrent .CompletableFuture ;
4950 * @param <V> the value type
5051 *
5152 * @author Soby Chacko
53+ * @since 4.0
5254 */
5355public class ShareKafkaMessageListenerContainer <K , V >
5456 extends AbstractShareKafkaMessageListenerContainer <K , V > {
5557
58+ private static final int POLL_TIMEOUT = 1000 ;
59+
5660 @ Nullable
5761 private String clientId ;
5862
@@ -166,11 +170,12 @@ private class ShareListenerConsumer implements Runnable {
166170 this .consumer = ShareKafkaMessageListenerContainer .this .shareConsumerFactory .createShareConsumer (
167171 ShareKafkaMessageListenerContainer .this .getGroupId (),
168172 ShareKafkaMessageListenerContainer .this .getClientId ());
173+
169174 this .genericListener = listener ;
170175 this .clientId = ShareKafkaMessageListenerContainer .this .getClientId ();
171176 // Subscribe to topics, just like in the test
172177 ContainerProperties containerProperties = getContainerProperties ();
173- this .consumer .subscribe (java . util . Arrays .asList (containerProperties .getTopics ()));
178+ this .consumer .subscribe (Arrays .asList (containerProperties .getTopics ()));
174179 }
175180
176181 @ Nullable
@@ -184,7 +189,7 @@ public void run() {
184189 Throwable exitThrowable = null ;
185190 while (isRunning ()) {
186191 try {
187- var records = this .consumer .poll (java .time .Duration .ofMillis (1000 ));
192+ var records = this .consumer .poll (java .time .Duration .ofMillis (POLL_TIMEOUT ));
188193 if (records != null && records .count () > 0 ) {
189194 for (var record : records ) {
190195 @ SuppressWarnings ("unchecked" )
@@ -199,6 +204,7 @@ public void run() {
199204 }
200205 catch (Error e ) {
201206 this .logger .error (e , "Stopping share consumer due to an Error" );
207+ wrapUp ();
202208 throw e ;
203209 }
204210 catch (Exception e ) {
@@ -213,15 +219,19 @@ public void run() {
213219 if (exitThrowable != null ) {
214220 this .logger .error (exitThrowable , "ShareListenerConsumer exiting due to error" );
215221 }
216- this .consumer .close ();
217- this .logger .info (() -> this .consumerGroupId + ": Consumer stopped" );
222+ wrapUp ();
218223 }
219224
220225 protected void initialize () {
221226 publishConsumerStartingEvent ();
222227 publishConsumerStartedEvent ();
223228 }
224229
230+ private void wrapUp (){
231+ this .consumer .close ();
232+ this .logger .info (() -> this .consumerGroupId + ": Consumer stopped" );
233+ }
234+
225235 @ Override
226236 public String toString () {
227237 return "ShareKafkaMessageListenerContainer.ShareListenerConsumer ["
0 commit comments