Skip to content

Commit 20978de

Browse files
committed
Addressing PR review
Signed-off-by: Soby Chacko <[email protected]>
1 parent 8ae7ff9 commit 20978de

File tree

3 files changed

+12
-31
lines changed

3 files changed

+12
-31
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,7 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageLis
176176
Assert.state(this.messageHandlerMethodFactory != null,
177177
"Could not create message listener - MessageHandlerMethodFactory not set");
178178

179-
final MessagingMessageListenerAdapter<K, V> messageListener;
180-
messageListener = createMessageListenerInstance(messageConverter);
179+
final MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
181180
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
182181
JavaUtils.INSTANCE
183182
.acceptIfNotNull(getReplyTopic(), replyTopic -> {
@@ -203,6 +202,11 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte
203202
return new HandlerAdapter(invocableHandlerMethod);
204203
}
205204

205+
/**
206+
* Create an empty {@link MessagingMessageListenerAdapter} instance.
207+
* @param messageConverter the converter (may be null).
208+
* @return the {@link MessagingMessageListenerAdapter} instance.
209+
*/
206210
protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
207211
@Nullable MessageConverter messageConverter) {
208212

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,7 @@ else if (this.autoStartup != null) {
186186

187187
@Override
188188
public ShareKafkaMessageListenerContainer<K, V> createContainer(TopicPartitionOffset... topicPartitions) {
189-
return createContainerInstance(new KafkaListenerEndpointAdapter() {
190-
@Override
191-
public TopicPartitionOffset[] getTopicPartitionsToAssign() {
192-
return Arrays.copyOf(topicPartitions, topicPartitions.length);
193-
}
194-
});
189+
throw new UnsupportedOperationException("ShareConsumer does not support explicit partition assignment");
195190
}
196191

197192
@Override
@@ -206,12 +201,7 @@ public Collection<String> getTopics() {
206201

207202
@Override
208203
public ShareKafkaMessageListenerContainer<K, V> createContainer(Pattern topicPattern) {
209-
return createContainerInstance(new KafkaListenerEndpointAdapter() {
210-
@Override
211-
public Pattern getTopicPattern() {
212-
return topicPattern;
213-
}
214-
});
204+
throw new UnsupportedOperationException("ShareConsumer does not support topic patterns");
215205
}
216206

217207
/**
@@ -220,21 +210,9 @@ public Pattern getTopicPattern() {
220210
* @return the container instance
221211
*/
222212
protected ShareKafkaMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
223-
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
224-
if (topicPartitions != null && topicPartitions.length > 0) {
225-
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), new ContainerProperties(topicPartitions));
226-
}
227-
else {
228-
Collection<String> topics = endpoint.getTopics();
229-
Assert.state(topics != null, "'topics' must not be null");
230-
if (!topics.isEmpty()) {
231-
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
232-
new ContainerProperties(topics.toArray(new String[0])));
233-
}
234-
else {
235-
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
236-
new ContainerProperties(endpoint.getTopicPattern()));
237-
}
238-
}
213+
Collection<String> topics = endpoint.getTopics();
214+
Assert.state(topics != null, "'topics' must not be null");
215+
return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(),
216+
new ContainerProperties(topics.toArray(new String[0])));
239217
}
240218
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ private class ShareListenerConsumer implements Runnable {
172172

173173
this.genericListener = listener;
174174
this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
175-
// Subscribe to topics, just like in the test
176175
ContainerProperties containerProperties = getContainerProperties();
177176
this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
178177
}

0 commit comments

Comments
 (0)