Skip to content

Commit e9ee474

Browse files
committed
Addressing PR review
Signed-off-by: Soby Chacko <[email protected]>
1 parent 20978de commit e9ee474

File tree

3 files changed

+30
-26
lines changed

3 files changed

+30
-26
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,7 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageLis
175175

176176
Assert.state(this.messageHandlerMethodFactory != null,
177177
"Could not create message listener - MessageHandlerMethodFactory not set");
178-
179-
final MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
178+
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
180179
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
181180
JavaUtils.INSTANCE
182181
.acceptIfNotNull(getReplyTopic(), replyTopic -> {

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,6 @@ public void setApplicationContext(ApplicationContext applicationContext) {
8383
this.applicationContext = applicationContext;
8484
}
8585

86-
/**
87-
* Set the share consumer factory to use for creating containers.
88-
* @param shareConsumerFactory the share consumer factory
89-
*/
90-
public void setShareConsumerFactory(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory) {
91-
this.shareConsumerFactory = shareConsumerFactory;
92-
}
93-
9486
/**
9587
* Get the share consumer factory.
9688
* @return the share consumer factory
@@ -99,6 +91,14 @@ public void setShareConsumerFactory(ShareConsumerFactory<? super K, ? super V> s
9991
return this.shareConsumerFactory;
10092
}
10193

94+
/**
95+
* Set the share consumer factory to use for creating containers.
96+
* @param shareConsumerFactory the share consumer factory
97+
*/
98+
public void setShareConsumerFactory(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory) {
99+
this.shareConsumerFactory = shareConsumerFactory;
100+
}
101+
102102
/**
103103
* Set whether containers created by this factory should auto-start.
104104
* @param autoStartup true to auto-start
@@ -136,7 +136,8 @@ public ShareKafkaMessageListenerContainer<K, V> createListenerContainer(KafkaLis
136136
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
137137
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
138138
}
139-
endpoint.setupListenerContainer(instance, null); // No message converter for MVP
139+
// TODO: No message converter for queue at the moment
140+
endpoint.setupListenerContainer(instance, null);
140141
initializeContainer(instance, endpoint);
141142
return instance;
142143
}
@@ -192,6 +193,7 @@ public ShareKafkaMessageListenerContainer<K, V> createContainer(TopicPartitionOf
192193
@Override
193194
public ShareKafkaMessageListenerContainer<K, V> createContainer(String... topics) {
194195
return createContainerInstance(new KafkaListenerEndpointAdapter() {
196+
195197
@Override
196198
public Collection<String> getTopics() {
197199
return Arrays.asList(topics);
@@ -213,6 +215,7 @@ protected ShareKafkaMessageListenerContainer<K, V> createContainerInstance(Kafka
213215
Collection<String> topics = endpoint.getTopics();
214216
Assert.state(topics != null, "'topics' must not be null");
215217
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
216-
new ContainerProperties(topics.toArray(new String[0])));
218+
new ContainerProperties(topics.toArray(new String[0])));
217219
}
220+
218221
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,6 @@ public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? supe
7878
Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided");
7979
}
8080

81-
/**
82-
* Set the {@code client.id} to use for the consumer.
83-
* @param clientId the client id to set
84-
*/
85-
public void setClientId(String clientId) {
86-
this.clientId = clientId;
87-
}
88-
8981
/**
9082
* Get the {@code client.id} for the consumer.
9183
* @return the client id, or null if not set
@@ -95,6 +87,14 @@ public String getClientId() {
9587
return this.clientId;
9688
}
9789

90+
/**
91+
* Set the {@code client.id} to use for the consumer.
92+
* @param clientId the client id to set
93+
*/
94+
public void setClientId(String clientId) {
95+
this.clientId = clientId;
96+
}
97+
9898
@Override
9999
public boolean isInExpectedState() {
100100
return isRunning();
@@ -167,8 +167,8 @@ private class ShareListenerConsumer implements Runnable {
167167

168168
ShareListenerConsumer(GenericMessageListener<?> listener) {
169169
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
170-
ShareKafkaMessageListenerContainer.this.getGroupId(),
171-
ShareKafkaMessageListenerContainer.this.getClientId());
170+
ShareKafkaMessageListenerContainer.this.getGroupId(),
171+
ShareKafkaMessageListenerContainer.this.getClientId());
172172

173173
this.genericListener = listener;
174174
this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
@@ -196,7 +196,7 @@ public void run() {
196196
}
197197
else {
198198
GenericMessageListener<ConsumerRecord<K, V>> listener =
199-
(GenericMessageListener<ConsumerRecord<K, V>>) this.genericListener;
199+
(GenericMessageListener<ConsumerRecord<K, V>>) this.genericListener;
200200
listener.onMessage(record);
201201
}
202202
// Temporarily auto-acknowledge and commit.
@@ -239,9 +239,11 @@ private void wrapUp() {
239239
@Override
240240
public String toString() {
241241
return "ShareKafkaMessageListenerContainer.ShareListenerConsumer ["
242-
+ "consumerGroupId=" + this.consumerGroupId
243-
+ ", clientId=" + this.clientId
244-
+ "]";
242+
+ "consumerGroupId=" + this.consumerGroupId
243+
+ ", clientId=" + this.clientId
244+
+ "]";
245245
}
246+
246247
}
248+
247249
}

0 commit comments

Comments
 (0)