diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index 6bcb016f7..e22a890d6 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -38,6 +38,9 @@ Uuid, libversion, version, + murmur2, + consistent, + fnv1a, TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME, @@ -56,6 +59,9 @@ "kafkatest", "libversion", "version", + "murmur2", + "consistent", + "fnv1a", "Message", "OFFSET_BEGINNING", "OFFSET_END", diff --git a/src/confluent_kafka/cimpl.pyi b/src/confluent_kafka/cimpl.pyi index 99a888acc..bbec494d8 100644 --- a/src/confluent_kafka/cimpl.pyi +++ b/src/confluent_kafka/cimpl.pyi @@ -427,6 +427,10 @@ class NewPartitions: def libversion() -> Tuple[str, int]: ... def version() -> Tuple[str, int]: ... +def murmur2(key: bytes, partition_count: int) -> int: ... +def consistent(key: bytes, partition_count: int) -> int: ... +def fnv1a(key: bytes, partition_count: int) -> int: ... + # ===== CONSTANTS (From stubgen) ===== ACL_OPERATION_ALL: int diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index d0d395507..13b1a266d 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -2972,6 +2972,81 @@ static PyObject *version (PyObject *self, PyObject *args) { return Py_BuildValue("s", CFL_VERSION_STR); } + +/**************************************************************************** + * + * + * Partitioner functions + * + * + ****************************************************************************/ + +/** + * @brief Type definition for librdkafka partitioner functions. + */ +typedef int32_t (*partitioner_func_t)(const rd_kafka_topic_t *rkt, + const void *key, + size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque); + +/** + * @brief Helper function for partitioner wrappers. + * + * Handles common argument parsing, validation, and calling of + * librdkafka partitioner functions. + * + * @param args Python arguments (key: bytes, partition_count: int) + * @param partitioner_func librdkafka partitioner function to call + * + * @returns PyObject* - Partition ID as Python int, or NULL on error + */ +static PyObject *partitioner_helper(PyObject *args, + partitioner_func_t partitioner_func) { + const char *key; + Py_ssize_t keylen; + int partition_count; + int32_t partition; + + if (!PyArg_ParseTuple(args, "y#i", &key, &keylen, &partition_count)) + return NULL; + + if (partition_count <= 0) { + PyErr_Format(PyExc_ValueError, + "partition_count must be > 0, got %d", + partition_count); + return NULL; + } + + partition = partitioner_func(NULL, key, (size_t)keylen, + partition_count, NULL, NULL); + + return cfl_PyInt_FromInt(partition); +} + +/** + * @brief Calculate partition using Murmur2 hash (Java-compatible). Deterministic. + */ +static PyObject *murmur2(PyObject *args) { + return partitioner_helper(args, rd_kafka_msg_partitioner_murmur2); +} + +/** + * @brief Calculate partition using consistent hash (CRC32). Deterministic. + */ +static PyObject *consistent(PyObject *args) { + return partitioner_helper(args, rd_kafka_msg_partitioner_consistent); +} + +/** + * @brief Calculate partition using FNV-1a hash. Deterministic. + */ +static PyObject *fnv1a(PyObject *args) { + return partitioner_helper(args, rd_kafka_msg_partitioner_fnv1a); +} + + static PyMethodDef cimpl_methods[] = { {"libversion", libversion, METH_NOARGS, " Retrieve librdkafka version string and integer\n" @@ -2987,6 +3062,49 @@ static PyMethodDef cimpl_methods[] = { " :rtype: str\n" "\n" }, + {"murmur2", murmur2, METH_VARARGS, + " Calculate partition using Murmur2 hash (Java-compatible).\n" + "\n" + " Deterministic function using Murmur2 hashing algorithm.\n" + "\n" + " :param bytes key: The message key\n" + " :param int partition_count: Number of partitions (must be > 0)\n" + " :returns: Partition ID (0 to partition_count-1)\n" + " :rtype: int\n" + " :raises ValueError: If partition_count <= 0\n" + " :raises TypeError: If key is not bytes or partition_count is not int\n" + "\n" + " Example::\n" + "\n" + " partition = murmur2(b\"user_12345\", 10)\n" + "\n" + }, + {"consistent", consistent, METH_VARARGS, + " Calculate partition using CRC32-based consistent hash.\n" + "\n" + " Deterministic function using CRC32 hashing algorithm.\n" + "\n" + " :param bytes key: The message key\n" + " :param int partition_count: Number of partitions (must be > 0)\n" + " :returns: Partition ID (0 to partition_count-1)\n" + " :rtype: int\n" + " :raises ValueError: If partition_count <= 0\n" + " :raises TypeError: If key is not bytes or partition_count is not int\n" + "\n" + }, + {"fnv1a", fnv1a, METH_VARARGS, + " Calculate partition using FNV-1a hash.\n" + "\n" + " Deterministic function using FNV-1a hashing algorithm.\n" + "\n" + " :param bytes key: The message key\n" + " :param int partition_count: Number of partitions (must be > 0)\n" + " :returns: Partition ID (0 to partition_count-1)\n" + " :rtype: int\n" + " :raises ValueError: If partition_count <= 0\n" + " :raises TypeError: If key is not bytes or partition_count is not int\n" + "\n" + }, { NULL } }; diff --git a/tests/test_partitioners.py b/tests/test_partitioners.py new file mode 100644 index 000000000..e0f38463c --- /dev/null +++ b/tests/test_partitioners.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Unit tests for partitioner functions. +""" + +import pytest +from confluent_kafka import murmur2, consistent, fnv1a + + +class TestPartitioners: + + def test_deterministic(self): + """Test that deterministic partitioners produce the same output for the same input.""" + key = b"test_key" + partition_count = 10 + + # Same input should always produce same output + assert murmur2(key, partition_count) == murmur2(key, partition_count) + assert consistent(key, partition_count) == consistent(key, partition_count) + assert fnv1a(key, partition_count) == fnv1a(key, partition_count) + + def test_input_validation(self): + """Test input validation logic.""" + # Invalid partition_count + with pytest.raises(ValueError, match="partition_count must be > 0"): + murmur2(b"key", 0) + + # Invalid key type + with pytest.raises(TypeError): + murmur2("string_key", 10) + + # Invalid partition_count type + with pytest.raises(TypeError): + murmur2(b"key", "10")