Skip to content

Commit dae5cee

Browse files
committed
Refactor simplipy partitionSubset logic
Signed-off-by: Su Ko <[email protected]>
1 parent b1d8836 commit dae5cee

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -316,30 +316,28 @@ private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperti
316316
}
317317

318318
private @Nullable TopicPartitionOffset @Nullable [] partitionSubset(ContainerProperties containerProperties, int index) {
319-
@Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
319+
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
320320
if (topicPartitions == null) {
321321
return null;
322322
}
323+
323324
if (this.concurrency == 1) {
324325
return topicPartitions;
325326
}
326-
else {
327-
int numPartitions = topicPartitions.length;
328-
if (numPartitions == this.concurrency) {
329-
return new TopicPartitionOffset[] { topicPartitions[index] };
330-
}
331-
else {
332-
int perContainer = numPartitions / this.concurrency;
333-
TopicPartitionOffset[] subset;
334-
if (index == this.concurrency - 1) {
335-
subset = Arrays.copyOfRange(topicPartitions, index * perContainer, topicPartitions.length);
336-
}
337-
else {
338-
subset = Arrays.copyOfRange(topicPartitions, index * perContainer, (index + 1) * perContainer);
339-
}
340-
return subset;
341-
}
327+
328+
int numPartitions = topicPartitions.length;
329+
330+
if (numPartitions == this.concurrency) {
331+
return new TopicPartitionOffset[] { topicPartitions[index] };
342332
}
333+
334+
int perContainer = numPartitions / this.concurrency;
335+
int start = index * perContainer;
336+
int end = (index == this.concurrency - 1)
337+
? numPartitions
338+
: start + perContainer;
339+
340+
return Arrays.copyOfRange(topicPartitions, start, end);
343341
}
344342

345343
/*

0 commit comments

Comments
 (0)