Skip to content

Commit 9c40625

Browse files
garyrussellartembilan
authored andcommitted
GH-756: Add concurrency, autoStartup to @kl
Resolves #756 Override the defaults configured on the container factory.
1 parent 7982621 commit 9c40625

File tree

8 files changed

+143
-20
lines changed

8 files changed

+143
-20
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,4 +186,19 @@
186186
*/
187187
String beanRef() default "__listener";
188188

189+
/**
190+
* Override the container factory's {@code concurrency} setting for this listener.
191+
* May be a property placeholder or SpEL expression that evaluates to an integer.
192+
* @return the concurrency.
193+
* @since 2.2
194+
*/
195+
String concurrency() default "";
196+
197+
/**
198+
* Set to true or false, to override the default setting in the container factory.
199+
* @return true to auto start, false to not auto start.
200+
* @since 2.2
201+
*/
202+
String autoStartup() default "";
203+
189204
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,14 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
418418
endpoint.setGroup((String) resolvedGroup);
419419
}
420420
}
421+
String concurrency = kafkaListener.concurrency();
422+
if (StringUtils.hasText(concurrency)) {
423+
endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
424+
}
425+
String autoStartup = kafkaListener.autoStartup();
426+
if (StringUtils.hasText(autoStartup)) {
427+
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
428+
}
421429

422430
KafkaListenerContainerFactory<?> factory = null;
423431
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
@@ -640,6 +648,35 @@ private String resolveExpressionAsString(String value, String attribute) {
640648
}
641649
}
642650

651+
private int resolveExpressionAsInteger(String value, String attribute) {
652+
Object resolved = resolveExpression(value);
653+
if (resolved instanceof String) {
654+
return Integer.parseInt((String) resolved);
655+
}
656+
else if (resolved instanceof Number) {
657+
return ((Number) resolved).intValue();
658+
}
659+
else {
660+
throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
661+
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
662+
}
663+
}
664+
665+
private boolean resolveExpressionAsBoolean(String value, String attribute) {
666+
Object resolved = resolveExpression(value);
667+
if (resolved instanceof Boolean) {
668+
return (Boolean) resolved;
669+
}
670+
else if (resolved instanceof String) {
671+
final String s = (String) resolved;
672+
return Boolean.parseBoolean(s);
673+
}
674+
else {
675+
throw new IllegalStateException("The [" + attribute + "] must resolve to a Boolean. "
676+
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
677+
}
678+
}
679+
643680
private Object resolveExpression(String value) {
644681
String resolvedValue = resolve(value);
645682

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
247247
if (endpoint.getId() != null) {
248248
instance.setBeanName(endpoint.getId());
249249
}
250-
251250
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
252251
AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint;
253252
if (this.recordFilterStrategy != null) {
@@ -274,7 +273,7 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
274273
}
275274

276275
endpoint.setupListenerContainer(instance, this.messageConverter);
277-
initializeContainer(instance);
276+
initializeContainer(instance, endpoint);
278277
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
279278
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
280279

@@ -293,8 +292,9 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
293292
* <p>Subclasses can inherit from this method to apply extra
294293
* configuration if necessary.
295294
* @param instance the container instance to configure.
295+
* @param endpoint the endpoint.
296296
*/
297-
protected void initializeContainer(C instance) {
297+
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
298298
ContainerProperties properties = instance.getContainerProperties();
299299
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
300300
"messageListener", "ackCount", "ackTime");
@@ -310,7 +310,10 @@ protected void initializeContainer(C instance) {
310310
if (this.errorHandler != null) {
311311
instance.setGenericErrorHandler(this.errorHandler);
312312
}
313-
if (this.autoStartup != null) {
313+
if (endpoint.getAutoStartup() != null) {
314+
instance.setAutoStartup(endpoint.getAutoStartup());
315+
}
316+
else if (this.autoStartup != null) {
314317
instance.setAutoStartup(this.autoStartup);
315318
}
316319
if (this.phase != null) {
@@ -323,43 +326,46 @@ protected void initializeContainer(C instance) {
323326

324327
@Override
325328
public C createContainer(final Collection<TopicPartitionInitialOffset> topicPartitions) {
326-
C container = createContainerInstance(new KafkaListenerEndpointAdapter() {
329+
KafkaListenerEndpoint endpoint = new KafkaListenerEndpointAdapter() {
327330

328331
@Override
329332
public Collection<TopicPartitionInitialOffset> getTopicPartitions() {
330333
return topicPartitions;
331334
}
332335

333-
});
334-
initializeContainer(container);
336+
};
337+
C container = createContainerInstance(endpoint);
338+
initializeContainer(container, endpoint);
335339
return container;
336340
}
337341

338342
@Override
339343
public C createContainer(final String... topics) {
340-
C container = createContainerInstance(new KafkaListenerEndpointAdapter() {
344+
KafkaListenerEndpoint endpoint = new KafkaListenerEndpointAdapter() {
341345

342346
@Override
343347
public Collection<String> getTopics() {
344348
return Arrays.asList(topics);
345349
}
346350

347-
});
348-
initializeContainer(container);
351+
};
352+
C container = createContainerInstance(endpoint);
353+
initializeContainer(container, endpoint);
349354
return container;
350355
}
351356

352357
@Override
353358
public C createContainer(final Pattern topicPattern) {
354-
C container = createContainerInstance(new KafkaListenerEndpointAdapter() {
359+
KafkaListenerEndpoint endpoint = new KafkaListenerEndpointAdapter() {
355360

356361
@Override
357362
public Pattern getTopicPattern() {
358363
return topicPattern;
359364
}
360365

361-
});
362-
initializeContainer(container);
366+
};
367+
C container = createContainerInstance(endpoint);
368+
initializeContainer(container, endpoint);
363369
return container;
364370
}
365371

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
101101

102102
private String clientIdPrefix;
103103

104+
private Integer concurrency;
105+
106+
private Boolean autoStartup;
107+
104108
@Override
105109
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
106110
this.beanFactory = beanFactory;
@@ -342,6 +346,34 @@ public void setClientIdPrefix(String clientIdPrefix) {
342346
this.clientIdPrefix = clientIdPrefix;
343347
}
344348

349+
@Override
350+
public Integer getConcurrency() {
351+
return this.concurrency;
352+
}
353+
354+
/**
355+
* Set the concurrency for this endpoint's container.
356+
* @param concurrency the concurrency.
357+
* @since 2.2
358+
*/
359+
public void setConcurrency(int concurrency) {
360+
this.concurrency = concurrency;
361+
}
362+
363+
@Override
364+
public Boolean getAutoStartup() {
365+
return this.autoStartup;
366+
}
367+
368+
/**
369+
* Set the autoStartup for this endpoint's container.
370+
* @param autoStartup the autoStartup.
371+
* @since 2.2
372+
*/
373+
public void setAutoStartup(Boolean autoStartup) {
374+
this.autoStartup = autoStartup;
375+
}
376+
345377
@Override
346378
public void afterPropertiesSet() {
347379
boolean topicsEmpty = getTopics().isEmpty();

spring-kafka/src/main/java/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,12 @@ protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(Kafka
7979
}
8080

8181
@Override
82-
protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {
83-
super.initializeContainer(instance);
84-
if (this.concurrency != null) {
82+
protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
83+
super.initializeContainer(instance, endpoint);
84+
if (endpoint.getConcurrency() != null) {
85+
instance.setConcurrency(endpoint.getConcurrency());
86+
}
87+
else if (this.concurrency != null) {
8588
instance.setConcurrency(this.concurrency);
8689
}
8790
}

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,20 @@ public interface KafkaListenerEndpoint {
8383
*/
8484
String getClientIdPrefix();
8585

86+
/**
87+
* Return the concurrency for this endpoint's container.
88+
* @return the concurrency.
89+
* @since 2.2
90+
*/
91+
Integer getConcurrency();
92+
93+
/**
94+
* Return the autoStartup for this endpoint's container.
95+
* @return the autoStartup.
96+
* @since 2.2
97+
*/
98+
Boolean getAutoStartup();
99+
86100
/**
87101
* Setup the specified message listener container with the model
88102
* defined by this endpoint.

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ public String getClientIdPrefix() {
7272
return null;
7373
}
7474

75+
@Override
76+
public Integer getConcurrency() {
77+
return null;
78+
}
79+
80+
@Override
81+
public Boolean getAutoStartup() {
82+
return null;
83+
}
84+
7585
@Override
7686
public void setupListenerContainer(MessageListenerContainer listenerContainer,
7787
MessageConverter messageConverter) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,13 @@ public void testSimple() throws Exception {
255255
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId"))
256256
.isEqualTo("clientIdViaAnnotation-0");
257257

258+
MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
259+
assertThat(rebalanceConcurrentContainer).isNotNull();
260+
assertThat(rebalanceConcurrentContainer.isAutoStartup()).isFalse();
261+
assertThat(KafkaTestUtils.getPropertyValue(rebalanceConcurrentContainer, "concurrency", Integer.class))
262+
.isEqualTo(3);
263+
rebalanceConcurrentContainer.start();
264+
258265
template.send("annotated11", 0, "foo");
259266
assertThat(this.listener.latch7.await(60, TimeUnit.SECONDS)).isTrue();
260267
assertThat(this.consumerRef.get()).isNotNull();
@@ -266,8 +273,6 @@ public void testSimple() throws Exception {
266273
assertThat(this.listener.latch7a.await(60, TimeUnit.SECONDS)).isTrue();
267274
assertThat(this.listener.latch7String).isNull();
268275

269-
MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
270-
assertThat(rebalanceConcurrentContainer).isNotNull();
271276
MessageListenerContainer rebalanceContainer = (MessageListenerContainer) KafkaTestUtils
272277
.getPropertyValue(rebalanceConcurrentContainer, "containers", List.class).get(0);
273278
assertThat(KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.coordinator.groupId"))
@@ -278,7 +283,6 @@ public void testSimple() throws Exception {
278283
clientId)
279284
.startsWith("consumer-");
280285
assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-'));
281-
282286
}
283287

284288
@Test
@@ -847,6 +851,7 @@ public KafkaListenerContainerFactory<?> batchManualFactory2() {
847851
new ConcurrentKafkaListenerContainerFactory<>();
848852
ContainerProperties props = factory.getContainerProperties();
849853
factory.setConsumerFactory(consumerFactory());
854+
factory.setAutoStartup(true);
850855
props.setConsumerRebalanceListener(consumerRebalanceListener(consumerRef()));
851856
return factory;
852857
}
@@ -1268,7 +1273,8 @@ public void jsonHeaders(Bar foo) { // should be mapped to Foo via Headers
12681273
}
12691274

12701275
@KafkaListener(id = "rebalanceListener", topics = "annotated11", idIsGroup = false,
1271-
containerFactory = "kafkaRebalanceListenerContainerFactory")
1276+
containerFactory = "kafkaRebalanceListenerContainerFactory", autoStartup = "${foobarbaz:false}",
1277+
concurrency = "${fixbux:3}")
12721278
public void listen7(@Payload(required = false) String foo) {
12731279
this.latch7String = foo;
12741280
this.latch7.countDown();

0 commit comments

Comments
 (0)