Skip to content

Commit d0f6965

Browse files
committed
Addressing PR review
Signed-off-by: Soby Chacko <[email protected]>
1 parent e9ee474 commit d0f6965

File tree

1 file changed

+18
-64
lines changed

1 file changed

+18
-64
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 18 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@
2020
import java.util.Collection;
2121
import java.util.regex.Pattern;
2222

23-
import org.apache.commons.logging.LogFactory;
2423
import org.jspecify.annotations.Nullable;
2524

2625
import org.springframework.context.ApplicationContext;
2726
import org.springframework.context.ApplicationContextAware;
2827
import org.springframework.context.ApplicationEventPublisher;
2928
import org.springframework.context.ApplicationEventPublisherAware;
30-
import org.springframework.core.log.LogAccessor;
3129
import org.springframework.kafka.core.ShareConsumerFactory;
3230
import org.springframework.kafka.listener.ContainerProperties;
3331
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
@@ -56,11 +54,7 @@
5654
public class ShareKafkaListenerContainerFactory<K, V>
5755
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>, ApplicationEventPublisherAware, ApplicationContextAware {
5856

59-
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
60-
61-
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
62-
63-
private ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
57+
private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
6458

6559
private @Nullable Boolean autoStartup;
6660

@@ -83,22 +77,6 @@ public void setApplicationContext(ApplicationContext applicationContext) {
8377
this.applicationContext = applicationContext;
8478
}
8579

86-
/**
87-
* Get the share consumer factory.
88-
* @return the share consumer factory
89-
*/
90-
public ShareConsumerFactory<? super K, ? super V> getShareConsumerFactory() {
91-
return this.shareConsumerFactory;
92-
}
93-
94-
/**
95-
* Set the share consumer factory to use for creating containers.
96-
* @param shareConsumerFactory the share consumer factory
97-
*/
98-
public void setShareConsumerFactory(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory) {
99-
this.shareConsumerFactory = shareConsumerFactory;
100-
}
101-
10280
/**
10381
* Set whether containers created by this factory should auto-start.
10482
* @param autoStartup true to auto-start
@@ -120,21 +98,14 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
12098
this.applicationEventPublisher = applicationEventPublisher;
12199
}
122100

123-
/**
124-
* Get the container properties.
125-
* @return the container properties
126-
*/
127-
public ContainerProperties getContainerProperties() {
128-
return this.containerProperties;
129-
}
130-
131101
@Override
102+
@SuppressWarnings({"unchecked", "rawtypes"})
132103
public ShareKafkaMessageListenerContainer<K, V> createListenerContainer(KafkaListenerEndpoint endpoint) {
133104
ShareKafkaMessageListenerContainer<K, V> instance = createContainerInstance(endpoint);
134105
JavaUtils.INSTANCE
135106
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
136-
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
137-
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
107+
if (endpoint instanceof AbstractKafkaListenerEndpoint abstractKafkaListenerEndpoint) {
108+
configureEndpoint(abstractKafkaListenerEndpoint);
138109
}
139110
// TODO: No message converter for queue at the moment
140111
endpoint.setupListenerContainer(instance, null);
@@ -153,36 +124,19 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> endpoint) {
153124
*/
154125
protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
155126
ContainerProperties properties = instance.getContainerProperties();
156-
if (this.containerProperties.getAckCount() > 0) {
157-
properties.setAckCount(this.containerProperties.getAckCount());
158-
}
159-
if (this.containerProperties.getAckTime() > 0) {
160-
properties.setAckTime(this.containerProperties.getAckTime());
161-
}
162-
if (endpoint.getAutoStartup() != null) {
163-
instance.setAutoStartup(endpoint.getAutoStartup());
164-
}
165-
else if (this.autoStartup != null) {
166-
instance.setAutoStartup(this.autoStartup);
167-
}
168-
if (this.phase != null) {
169-
instance.setPhase(this.phase);
170-
}
171-
if (this.applicationContext != null) {
172-
instance.setApplicationContext(this.applicationContext);
173-
}
174-
if (this.applicationEventPublisher != null) {
175-
instance.setApplicationEventPublisher(this.applicationEventPublisher);
176-
}
177-
if (endpoint.getGroupId() != null) {
178-
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
179-
}
180-
if (endpoint.getClientIdPrefix() != null) {
181-
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
182-
}
183-
if (endpoint.getConsumerProperties() != null) {
184-
instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties());
185-
}
127+
JavaUtils.INSTANCE
128+
.acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup)
129+
.acceptIfNotNull(this.autoStartup, autoStartup -> {
130+
if (endpoint.getAutoStartup() == null) {
131+
instance.setAutoStartup(autoStartup);
132+
}
133+
})
134+
.acceptIfNotNull(this.phase, instance::setPhase)
135+
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
136+
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
137+
.acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId)
138+
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId)
139+
.acceptIfNotNull(endpoint.getConsumerProperties(), properties::setKafkaConsumerProperties);
186140
}
187141

188142
@Override
@@ -214,7 +168,7 @@ public ShareKafkaMessageListenerContainer<K, V> createContainer(Pattern topicPat
214168
protected ShareKafkaMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
215169
Collection<String> topics = endpoint.getTopics();
216170
Assert.state(topics != null, "'topics' must not be null");
217-
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
171+
return new ShareKafkaMessageListenerContainer<>(this.shareConsumerFactory,
218172
new ContainerProperties(topics.toArray(new String[0])));
219173
}
220174

0 commit comments

Comments
 (0)