Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
Uuid,
libversion,
version,
murmur2,
consistent,
fnv1a,
TIMESTAMP_NOT_AVAILABLE,
TIMESTAMP_CREATE_TIME,
TIMESTAMP_LOG_APPEND_TIME,
Expand All @@ -56,6 +59,9 @@
"kafkatest",
"libversion",
"version",
"murmur2",
"consistent",
"fnv1a",
"Message",
"OFFSET_BEGINNING",
"OFFSET_END",
Expand Down
4 changes: 4 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ class NewPartitions:
def libversion() -> Tuple[str, int]: ...
def version() -> Tuple[str, int]: ...

def murmur2(key: bytes, partition_count: int) -> int: ...
def consistent(key: bytes, partition_count: int) -> int: ...
def fnv1a(key: bytes, partition_count: int) -> int: ...

# ===== CONSTANTS (From stubgen) =====

ACL_OPERATION_ALL: int
Expand Down
118 changes: 118 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2972,6 +2972,81 @@ static PyObject *version (PyObject *self, PyObject *args) {
return Py_BuildValue("s", CFL_VERSION_STR);
}


/****************************************************************************
*
*
* Partitioner functions
*
*
****************************************************************************/

/**
* @brief Type definition for librdkafka partitioner functions.
*/
typedef int32_t (*partitioner_func_t)(const rd_kafka_topic_t *rkt,
const void *key,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque);

/**
* @brief Helper function for partitioner wrappers.
*
* Handles common argument parsing, validation, and calling of
* librdkafka partitioner functions.
*
* @param args Python arguments (key: bytes, partition_count: int)
* @param partitioner_func librdkafka partitioner function to call
*
* @returns PyObject* - Partition ID as Python int, or NULL on error
*/
static PyObject *partitioner_helper(PyObject *args,
partitioner_func_t partitioner_func) {
const char *key;
Py_ssize_t keylen;
int partition_count;
int32_t partition;

if (!PyArg_ParseTuple(args, "y#i", &key, &keylen, &partition_count))
return NULL;

if (partition_count <= 0) {
PyErr_Format(PyExc_ValueError,
"partition_count must be > 0, got %d",
partition_count);
return NULL;
}

partition = partitioner_func(NULL, key, (size_t)keylen,
partition_count, NULL, NULL);

return cfl_PyInt_FromInt(partition);
}

/**
* @brief Calculate partition using Murmur2 hash (Java-compatible). Deterministic.
*/
static PyObject *murmur2(PyObject *args) {
return partitioner_helper(args, rd_kafka_msg_partitioner_murmur2);
}

/**
* @brief Calculate partition using consistent hash (CRC32). Deterministic.
*/
static PyObject *consistent(PyObject *args) {
return partitioner_helper(args, rd_kafka_msg_partitioner_consistent);
}

/**
* @brief Calculate partition using FNV-1a hash. Deterministic.
*/
static PyObject *fnv1a(PyObject *args) {
return partitioner_helper(args, rd_kafka_msg_partitioner_fnv1a);
}


static PyMethodDef cimpl_methods[] = {
{"libversion", libversion, METH_NOARGS,
" Retrieve librdkafka version string and integer\n"
Expand All @@ -2987,6 +3062,49 @@ static PyMethodDef cimpl_methods[] = {
" :rtype: str\n"
"\n"
},
{"murmur2", murmur2, METH_VARARGS,
" Calculate partition using Murmur2 hash (Java-compatible).\n"
"\n"
" Deterministic function using Murmur2 hashing algorithm.\n"
"\n"
" :param bytes key: The message key\n"
" :param int partition_count: Number of partitions (must be > 0)\n"
" :returns: Partition ID (0 to partition_count-1)\n"
" :rtype: int\n"
" :raises ValueError: If partition_count <= 0\n"
" :raises TypeError: If key is not bytes or partition_count is not int\n"
"\n"
" Example::\n"
"\n"
" partition = murmur2(b\"user_12345\", 10)\n"
"\n"
},
{"consistent", consistent, METH_VARARGS,
" Calculate partition using CRC32-based consistent hash.\n"
"\n"
" Deterministic function using CRC32 hashing algorithm.\n"
"\n"
" :param bytes key: The message key\n"
" :param int partition_count: Number of partitions (must be > 0)\n"
" :returns: Partition ID (0 to partition_count-1)\n"
" :rtype: int\n"
" :raises ValueError: If partition_count <= 0\n"
" :raises TypeError: If key is not bytes or partition_count is not int\n"
"\n"
},
{"fnv1a", fnv1a, METH_VARARGS,
" Calculate partition using FNV-1a hash.\n"
"\n"
" Deterministic function using FNV-1a hashing algorithm.\n"
"\n"
" :param bytes key: The message key\n"
" :param int partition_count: Number of partitions (must be > 0)\n"
" :returns: Partition ID (0 to partition_count-1)\n"
" :rtype: int\n"
" :raises ValueError: If partition_count <= 0\n"
" :raises TypeError: If key is not bytes or partition_count is not int\n"
"\n"
},
{ NULL }
};

Expand Down
35 changes: 35 additions & 0 deletions tests/test_partitioners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Unit tests for partitioner functions.
"""

import pytest
from confluent_kafka import murmur2, consistent, fnv1a


class TestPartitioners:

def test_deterministic(self):
"""Test that deterministic partitioners produce the same output for the same input."""
key = b"test_key"
partition_count = 10

# Same input should always produce same output
assert murmur2(key, partition_count) == murmur2(key, partition_count)
assert consistent(key, partition_count) == consistent(key, partition_count)
assert fnv1a(key, partition_count) == fnv1a(key, partition_count)

def test_input_validation(self):
"""Test input validation logic."""
# Invalid partition_count
with pytest.raises(ValueError, match="partition_count must be > 0"):
murmur2(b"key", 0)

# Invalid key type
with pytest.raises(TypeError):
murmur2("string_key", 10)

# Invalid partition_count type
with pytest.raises(TypeError):
murmur2(b"key", "10")