Skip to content

Commit d54374d

Browse files
committed
expose
1 parent 06c0744 commit d54374d

File tree

4 files changed

+164
-0
lines changed

4 files changed

+164
-0
lines changed

src/confluent_kafka/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
Uuid,
3939
libversion,
4040
version,
41+
murmur2,
42+
consistent,
43+
fnv1a,
4144
TIMESTAMP_NOT_AVAILABLE,
4245
TIMESTAMP_CREATE_TIME,
4346
TIMESTAMP_LOG_APPEND_TIME,
@@ -56,6 +59,9 @@
5659
"kafkatest",
5760
"libversion",
5861
"version",
62+
"murmur2",
63+
"consistent",
64+
"fnv1a",
5965
"Message",
6066
"OFFSET_BEGINNING",
6167
"OFFSET_END",

src/confluent_kafka/cimpl.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,10 @@ class NewPartitions:
427427
def libversion() -> Tuple[str, int]: ...
428428
def version() -> Tuple[str, int]: ...
429429

430+
def murmur2(key: bytes, partition_count: int) -> int: ...
431+
def consistent(key: bytes, partition_count: int) -> int: ...
432+
def fnv1a(key: bytes, partition_count: int) -> int: ...
433+
430434
# ===== CONSTANTS (From stubgen) =====
431435

432436
ACL_OPERATION_ALL: int

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2972,6 +2972,81 @@ static PyObject *version (PyObject *self, PyObject *args) {
29722972
return Py_BuildValue("s", CFL_VERSION_STR);
29732973
}
29742974

2975+
2976+
/****************************************************************************
2977+
*
2978+
*
2979+
* Partitioner functions
2980+
*
2981+
*
2982+
****************************************************************************/
2983+
2984+
/**
2985+
* @brief Type definition for librdkafka partitioner functions.
2986+
*/
2987+
typedef int32_t (*partitioner_func_t)(const rd_kafka_topic_t *rkt,
2988+
const void *key,
2989+
size_t keylen,
2990+
int32_t partition_cnt,
2991+
void *rkt_opaque,
2992+
void *msg_opaque);
2993+
2994+
/**
2995+
* @brief Helper function for partitioner wrappers.
2996+
*
2997+
* Handles common argument parsing, validation, and calling of
2998+
* librdkafka partitioner functions.
2999+
*
3000+
* @param args Python arguments (key: bytes, partition_count: int)
3001+
* @param partitioner_func librdkafka partitioner function to call
3002+
*
3003+
* @returns PyObject* - Partition ID as Python int, or NULL on error
3004+
*/
3005+
static PyObject *partitioner_helper(PyObject *args,
3006+
partitioner_func_t partitioner_func) {
3007+
const char *key;
3008+
Py_ssize_t keylen;
3009+
int partition_count;
3010+
int32_t partition;
3011+
3012+
if (!PyArg_ParseTuple(args, "y#i", &key, &keylen, &partition_count))
3013+
return NULL;
3014+
3015+
if (partition_count <= 0) {
3016+
PyErr_Format(PyExc_ValueError,
3017+
"partition_count must be > 0, got %d",
3018+
partition_count);
3019+
return NULL;
3020+
}
3021+
3022+
partition = partitioner_func(NULL, key, (size_t)keylen,
3023+
partition_count, NULL, NULL);
3024+
3025+
return cfl_PyInt_FromInt(partition);
3026+
}
3027+
3028+
/**
3029+
* @brief Calculate partition using Murmur2 hash (Java-compatible). Deterministic.
3030+
*/
3031+
static PyObject *murmur2(PyObject *self, PyObject *args) {
3032+
return partitioner_helper(args, rd_kafka_msg_partitioner_murmur2);
3033+
}
3034+
3035+
/**
3036+
* @brief Calculate partition using consistent hash (CRC32). Deterministic.
3037+
*/
3038+
static PyObject *consistent(PyObject *self, PyObject *args) {
3039+
return partitioner_helper(args, rd_kafka_msg_partitioner_consistent);
3040+
}
3041+
3042+
/**
3043+
* @brief Calculate partition using FNV-1a hash. Deterministic.
3044+
*/
3045+
static PyObject *fnv1a(PyObject *self, PyObject *args) {
3046+
return partitioner_helper(args, rd_kafka_msg_partitioner_fnv1a);
3047+
}
3048+
3049+
29753050
static PyMethodDef cimpl_methods[] = {
29763051
{"libversion", libversion, METH_NOARGS,
29773052
" Retrieve librdkafka version string and integer\n"
@@ -2987,6 +3062,50 @@ static PyMethodDef cimpl_methods[] = {
29873062
" :rtype: str\n"
29883063
"\n"
29893064
},
3065+
{"murmur2", murmur2, METH_VARARGS,
3066+
" Calculate partition using Murmur2 hash (Java-compatible).\n"
3067+
"\n"
3068+
" Deterministic function using Murmur2 hashing algorithm.\n"
3069+
" assigned to.\n"
3070+
"\n"
3071+
" :param bytes key: The message key\n"
3072+
" :param int partition_count: Number of partitions (must be > 0)\n"
3073+
" :returns: Partition ID (0 to partition_count-1)\n"
3074+
" :rtype: int\n"
3075+
" :raises ValueError: If partition_count <= 0\n"
3076+
" :raises TypeError: If key is not bytes or partition_count is not int\n"
3077+
"\n"
3078+
" Example::\n"
3079+
"\n"
3080+
" partition = murmur2(b\"user_12345\", 10)\n"
3081+
"\n"
3082+
},
3083+
{"consistent", consistent, METH_VARARGS,
3084+
" Calculate partition using CRC32-based consistent hash.\n"
3085+
"\n"
3086+
" Deterministic function using CRC32 hashing algorithm.\n"
3087+
"\n"
3088+
" :param bytes key: The message key\n"
3089+
" :param int partition_count: Number of partitions (must be > 0)\n"
3090+
" :returns: Partition ID (0 to partition_count-1)\n"
3091+
" :rtype: int\n"
3092+
" :raises ValueError: If partition_count <= 0\n"
3093+
" :raises TypeError: If key is not bytes or partition_count is not int\n"
3094+
"\n"
3095+
},
3096+
{"fnv1a", fnv1a, METH_VARARGS,
3097+
" Calculate partition using FNV-1a hash.\n"
3098+
"\n"
3099+
" Deterministic function using FNV-1a hashing algorithm.\n"
3100+
"\n"
3101+
" :param bytes key: The message key\n"
3102+
" :param int partition_count: Number of partitions (must be > 0)\n"
3103+
" :returns: Partition ID (0 to partition_count-1)\n"
3104+
" :rtype: int\n"
3105+
" :raises ValueError: If partition_count <= 0\n"
3106+
" :raises TypeError: If key is not bytes or partition_count is not int\n"
3107+
"\n"
3108+
},
29903109
{ NULL }
29913110
};
29923111

tests/test_partitioners.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
"""
4+
Unit tests for partitioner functions.
5+
"""
6+
7+
import pytest
8+
from confluent_kafka import murmur2, consistent, fnv1a
9+
10+
11+
class TestPartitioners:
12+
13+
def test_deterministic(self):
14+
"""Test that deterministic partitioners produce the same output for the same input."""
15+
key = b"test_key"
16+
partition_count = 10
17+
18+
# Same input should always produce same output
19+
assert murmur2(key, partition_count) == murmur2(key, partition_count)
20+
assert consistent(key, partition_count) == consistent(key, partition_count)
21+
assert fnv1a(key, partition_count) == fnv1a(key, partition_count)
22+
23+
def test_input_validation(self):
24+
"""Test input validation logic."""
25+
# Invalid partition_count
26+
with pytest.raises(ValueError, match="partition_count must be > 0"):
27+
murmur2(b"key", 0)
28+
29+
# Invalid key type
30+
with pytest.raises(TypeError):
31+
murmur2("string_key", 10)
32+
33+
# Invalid partition_count type
34+
with pytest.raises(TypeError):
35+
murmur2(b"key", "10")

0 commit comments

Comments
 (0)