Skip to content

Commit f670e9b

Browse files
urbandanchia7712
authored andcommitted
KAFKA-17632: Fix RoundRobinPartitioner for even partition counts (apache#17620)
RoundRobinPartitioner does not handle the fact that on new batch creation, the partition method is called twice. Reviewers: Viktor Somogyi-Vass <[email protected]>, Mickael Maison <[email protected]>
1 parent 681c710 commit f670e9b

File tree

4 files changed

+64
-2
lines changed

4 files changed

+64
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,8 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
10741074

10751075
if (result.abortForNewBatch) {
10761076
int prevPartition = partition;
1077+
// IMPORTANT NOTE: the following onNewBatch and partition calls should not interrupted to allow
1078+
// the custom partitioner to correctly track its state
10771079
onNewBatch(record.topic(), cluster, prevPartition);
10781080
partition = partition(record, serializedKey, serializedValue, cluster);
10791081
if (log.isTraceEnabled()) {

clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ public interface Partitioner extends Configurable, Closeable {
4949
* <p>
5050
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
5151
* this method can change the chosen sticky partition for the new batch.
52+
* <p>
53+
* After onNewBatch, the {@link #partition(String, Object, byte[], Object, byte[], Cluster)} method is called again
54+
* which allows the implementation to "redirect" the message on new batch creation.
5255
* @param topic The topic name
5356
* @param cluster The current cluster metadata
5457
* @param prevPartition The partition previously selected for the record that triggered a new batch

clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.Cluster;
2020
import org.apache.kafka.common.PartitionInfo;
21+
import org.apache.kafka.common.TopicPartition;
2122
import org.apache.kafka.common.utils.Utils;
2223

2324
import java.util.List;
@@ -36,6 +37,7 @@
3637
*/
3738
public class RoundRobinPartitioner implements Partitioner {
3839
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
40+
private final ThreadLocal<TopicPartition> previousPartition = new ThreadLocal<>();
3941

4042
public void configure(Map<String, ?> configs) {}
4143

@@ -51,6 +53,14 @@ public void configure(Map<String, ?> configs) {}
5153
*/
5254
@Override
5355
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
56+
TopicPartition prevPartition = previousPartition.get();
57+
if (prevPartition != null) {
58+
previousPartition.remove();
59+
if (topic.equals(prevPartition.topic())) {
60+
return prevPartition.partition();
61+
}
62+
}
63+
5464
int nextValue = nextValue(topic);
5565
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
5666
if (!availablePartitions.isEmpty()) {
@@ -68,6 +78,11 @@ private int nextValue(String topic) {
6878
return counter.getAndIncrement();
6979
}
7080

71-
public void close() {}
81+
@SuppressWarnings("deprecation")
82+
@Override
83+
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
84+
previousPartition.set(new TopicPartition(topic, prevPartition));
85+
}
7286

87+
public void close() {}
7388
}

clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public void testRoundRobinWithKeyBytes() {
9797
assertEquals(10, partitionCount.get(2).intValue());
9898
}
9999

100+
@SuppressWarnings("deprecation")
100101
@Test
101102
public void testRoundRobinWithNullKeyBytes() {
102103
final String topicA = "topicA";
@@ -113,6 +114,10 @@ public void testRoundRobinWithNullKeyBytes() {
113114
Partitioner partitioner = new RoundRobinPartitioner();
114115
for (int i = 0; i < 30; ++i) {
115116
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
117+
// Simulate single-message batches
118+
partitioner.onNewBatch(topicA, testCluster, partition);
119+
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
120+
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
116121
Integer count = partitionCount.get(partition);
117122
if (null == count)
118123
count = 0;
@@ -126,5 +131,42 @@ public void testRoundRobinWithNullKeyBytes() {
126131
assertEquals(10, partitionCount.get(0).intValue());
127132
assertEquals(10, partitionCount.get(1).intValue());
128133
assertEquals(10, partitionCount.get(2).intValue());
129-
}
134+
}
135+
136+
@SuppressWarnings("deprecation")
137+
@Test
138+
public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() {
139+
final String topicA = "topicA";
140+
final String topicB = "topicB";
141+
142+
List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
143+
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
144+
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new PartitionInfo(topicA, 3, NODES[0], NODES, NODES));
145+
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
146+
Collections.emptySet(), Collections.emptySet());
147+
148+
final Map<Integer, Integer> partitionCount = new HashMap<>();
149+
150+
Partitioner partitioner = new RoundRobinPartitioner();
151+
for (int i = 0; i < 40; ++i) {
152+
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
153+
// Simulate single-message batches
154+
partitioner.onNewBatch(topicA, testCluster, partition);
155+
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
156+
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
157+
Integer count = partitionCount.get(partition);
158+
if (null == count)
159+
count = 0;
160+
partitionCount.put(partition, count + 1);
161+
162+
if (i % 5 == 0) {
163+
partitioner.partition(topicB, null, null, null, null, testCluster);
164+
}
165+
}
166+
167+
assertEquals(10, partitionCount.get(0).intValue());
168+
assertEquals(10, partitionCount.get(1).intValue());
169+
assertEquals(10, partitionCount.get(2).intValue());
170+
assertEquals(10, partitionCount.get(3).intValue());
171+
}
130172
}

0 commit comments

Comments
 (0)