Skip to content

Commit 81a6473

Browse files
authored
support key hash partitioner (#54)
1 parent ef284c1 commit 81a6473

File tree

2 files changed

+56
-6
lines changed

2 files changed

+56
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.hypertrace.core.kafkastreams.framework.partitioner;
2+
3+
import org.apache.kafka.common.utils.Utils;
4+
import org.apache.kafka.streams.processor.StreamPartitioner;
5+
6+
public class KeyHashPartitioner<K, V> implements StreamPartitioner<K, V> {
7+
@Override
8+
public Integer partition(String topic, K key, V value, int numPartitions) {
9+
int hashcode = 0;
10+
if (key != null) {
11+
hashcode = key.hashCode();
12+
}
13+
return Utils.toPositive(hashcode) % numPartitions;
14+
}
15+
}

kafka-streams-partitioners/weighted-group-partitioner/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/WeightedGroupPartitionerTest.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@
1313
public class WeightedGroupPartitionerTest {
1414
private final PartitionerConfigServiceClient configServiceClient = getTestServiceClient();
1515
private final BiFunction<String, String, String> groupKeyExtractor = (key, value) -> key;
16-
private final StreamPartitioner<String, String> delegatePartitioner =
16+
private final StreamPartitioner<String, String> roundRobinPartitioner =
1717
new RoundRobinPartitioner<>();
18+
private final StreamPartitioner<String, String> keyHashPartitioner = new KeyHashPartitioner<>();
1819

1920
@Test
2021
public void testPartitionerWithNonOverlappingGroupPartitions() {
2122
WeightedGroupPartitioner<String, String> partitioner =
2223
new WeightedGroupPartitioner<>(
23-
"spans", configServiceClient, groupKeyExtractor, delegatePartitioner);
24+
"spans", configServiceClient, groupKeyExtractor, roundRobinPartitioner);
2425

25-
// Test case 1: tenant-1 belong to group-1 (partitions: [0,])
26+
// Test case 1: tenant-1 belong to group-1 (partitions: [0,1])
2627
int partition = partitioner.partition("test-topic", "tenant-1", "span-1", 8);
2728
assertTrue(partition >= 0 && partition <= 1);
2829

@@ -43,7 +44,7 @@ public void testPartitionerWithNonOverlappingGroupPartitions() {
4344
public void testPartitionerWithOverlappingGroupPartitions() {
4445
WeightedGroupPartitioner<String, String> partitioner =
4546
new WeightedGroupPartitioner<>(
46-
"spans", configServiceClient, groupKeyExtractor, delegatePartitioner);
47+
"spans", configServiceClient, groupKeyExtractor, roundRobinPartitioner);
4748

4849
// Test case 1: tenant-1 belong to group-1 (partitions: [0])
4950
int partition = partitioner.partition("test-topic", "tenant-1", "span-1", 3);
@@ -66,7 +67,7 @@ public void testPartitionerWithOverlappingGroupPartitions() {
6667
public void testPartitionerWithSinglePartition() {
6768
WeightedGroupPartitioner<String, String> partitioner =
6869
new WeightedGroupPartitioner<>(
69-
"spans", configServiceClient, groupKeyExtractor, delegatePartitioner);
70+
"spans", configServiceClient, groupKeyExtractor, roundRobinPartitioner);
7071

7172
// Test case 1: tenant-1 belong to group-1 (partitions: [0])
7273
int partition = partitioner.partition("test-topic", "tenant-1", "span-1", 1);
@@ -89,7 +90,7 @@ public void testPartitionerWithSinglePartition() {
8990
public void testPartitionerWhenGroupKeyIsNull() {
9091
WeightedGroupPartitioner<String, String> partitioner =
9192
new WeightedGroupPartitioner<>(
92-
"spans", configServiceClient, (key, value) -> null, delegatePartitioner);
93+
"spans", configServiceClient, (key, value) -> null, roundRobinPartitioner);
9394

9495
// should always use default group when group key is null [4,5,6,7]
9596
int partition = partitioner.partition("test-topic", null, "value-1", 8);
@@ -140,6 +141,40 @@ public void testPartitionerWhenDelegateReturnsNull() {
140141
assertTrue(partition >= 4 && partition <= 7);
141142
}
142143

144+
@Test
145+
public void testPartitionerWithKeyHashDelegatePartitioner() {
146+
WeightedGroupPartitioner<String, String> partitioner =
147+
new WeightedGroupPartitioner<>(
148+
"spans", configServiceClient, groupKeyExtractor, keyHashPartitioner);
149+
150+
// Test case 1: tenant-1 belong to group-1 (partitions: [0,1])
151+
int partition = partitioner.partition("test-topic", "tenant-1", "span-1", 8);
152+
assertTrue(partition >= 0 && partition <= 1);
153+
154+
// Test case 2: tenant-2 belong to group-2 (partitions: [2,3])
155+
partition = partitioner.partition("test-topic", "tenant-2", "span-2", 8);
156+
assertTrue(partition >= 2 && partition <= 3);
157+
158+
// Test case 3: tenant-3 belong to group-2 (partitions: [2,3])
159+
partition = partitioner.partition("test-topic", "tenant-3", "span-3", 8);
160+
assertTrue(partition >= 2 && partition <= 3);
161+
162+
// Test case 4: groupKey=unknown should use default group [4,5,6,7]
163+
partition = partitioner.partition("test-topic", "unknown", "span-4", 8);
164+
assertTrue(partition >= 4 && partition <= 7);
165+
166+
// Test case 5: groupKey=unknown should use default group [4,5,6,7], key is null - should always
167+
// go to first partition in the group
168+
partition = partitioner.partition("test-topic", null, "span-5", 8);
169+
assertEquals(4, partition);
170+
partition = partitioner.partition("test-topic", null, "span-6", 8);
171+
assertEquals(4, partition);
172+
partition = partitioner.partition("test-topic", null, "span-7", 8);
173+
assertEquals(4, partition);
174+
partition = partitioner.partition("test-topic", null, "span-8", 8);
175+
assertEquals(4, partition);
176+
}
177+
143178
private PartitionerConfigServiceClient getTestServiceClient() {
144179
return (profileName) ->
145180
new WeightedGroupProfile(

0 commit comments

Comments
 (0)