|
60 | 60 | * @author Tomaz Fernandes |
61 | 61 | * @author Wang Zhiyang |
62 | 62 | * @author Lokesh Alamuri |
| 63 | + * @author Su Ko |
63 | 64 | */ |
64 | 65 | public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> { |
65 | 66 |
|
@@ -320,26 +321,24 @@ private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperti |
320 | 321 | if (topicPartitions == null) { |
321 | 322 | return null; |
322 | 323 | } |
| 324 | + |
323 | 325 | if (this.concurrency == 1) { |
324 | 326 | return topicPartitions; |
325 | 327 | } |
326 | | - else { |
327 | | - int numPartitions = topicPartitions.length; |
328 | | - if (numPartitions == this.concurrency) { |
329 | | - return new TopicPartitionOffset[] { topicPartitions[index] }; |
330 | | - } |
331 | | - else { |
332 | | - int perContainer = numPartitions / this.concurrency; |
333 | | - TopicPartitionOffset[] subset; |
334 | | - if (index == this.concurrency - 1) { |
335 | | - subset = Arrays.copyOfRange(topicPartitions, index * perContainer, topicPartitions.length); |
336 | | - } |
337 | | - else { |
338 | | - subset = Arrays.copyOfRange(topicPartitions, index * perContainer, (index + 1) * perContainer); |
339 | | - } |
340 | | - return subset; |
341 | | - } |
| 328 | + |
| 329 | + int numPartitions = topicPartitions.length; |
| 330 | + |
| 331 | + if (numPartitions == this.concurrency) { |
| 332 | + return new TopicPartitionOffset[] { topicPartitions[index] }; |
342 | 333 | } |
| 334 | + |
| 335 | + int perContainer = numPartitions / this.concurrency; |
| 336 | + int start = index * perContainer; |
| 337 | + int end = (index == this.concurrency - 1) |
| 338 | + ? numPartitions |
| 339 | + : start + perContainer; |
| 340 | + |
| 341 | + return Arrays.copyOfRange(topicPartitions, start, end); |
343 | 342 | } |
344 | 343 |
|
345 | 344 | /* |
|
0 commit comments