Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
Uuid,
libversion,
version,
murmur2,
consistent,
fnv1a,
TIMESTAMP_NOT_AVAILABLE,
TIMESTAMP_CREATE_TIME,
TIMESTAMP_LOG_APPEND_TIME,
Expand All @@ -56,6 +59,9 @@
"kafkatest",
"libversion",
"version",
"murmur2",
"consistent",
"fnv1a",
"Message",
"OFFSET_BEGINNING",
"OFFSET_END",
Expand Down
4 changes: 4 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ class NewPartitions:
def libversion() -> Tuple[str, int]: ...
def version() -> Tuple[str, int]: ...

def murmur2(key: bytes, partition_count: int) -> int: ...
def consistent(key: bytes, partition_count: int) -> int: ...
def fnv1a(key: bytes, partition_count: int) -> int: ...

# ===== CONSTANTS (From stubgen) =====

ACL_OPERATION_ALL: int
Expand Down
119 changes: 119 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2972,6 +2972,81 @@ static PyObject *version (PyObject *self, PyObject *args) {
return Py_BuildValue("s", CFL_VERSION_STR);
}


/****************************************************************************
*
*
* Partitioner functions
*
*
****************************************************************************/

/**
* @brief Type definition for librdkafka partitioner functions.
*/
typedef int32_t (*partitioner_func_t)(const rd_kafka_topic_t *rkt,
const void *key,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque);

/**
* @brief Helper function for partitioner wrappers.
*
* Handles common argument parsing, validation, and calling of
* librdkafka partitioner functions.
*
* @param args Python arguments (key: bytes, partition_count: int)
* @param partitioner_func librdkafka partitioner function to call
*
* @returns PyObject* - Partition ID as Python int, or NULL on error
*/
static PyObject *partitioner_helper(PyObject *args,
partitioner_func_t partitioner_func) {
const char *key;
Py_ssize_t keylen;
int partition_count;
int32_t partition;

if (!PyArg_ParseTuple(args, "y#i", &key, &keylen, &partition_count))
return NULL;

if (partition_count <= 0) {
PyErr_Format(PyExc_ValueError,
"partition_count must be > 0, got %d",
partition_count);
return NULL;
}

partition = partitioner_func(NULL, key, (size_t)keylen,
partition_count, NULL, NULL);

return cfl_PyInt_FromInt(partition);
}

/**
* @brief Calculate partition using Murmur2 hash (Java-compatible). Deterministic.
*/
static PyObject *murmur2(PyObject *self, PyObject *args) {
return partitioner_helper(args, rd_kafka_msg_partitioner_murmur2);
}

/**
* @brief Calculate partition using consistent hash (CRC32). Deterministic.
*/
static PyObject *consistent(PyObject *self, PyObject *args) {
return partitioner_helper(args, rd_kafka_msg_partitioner_consistent);
}

/**
* @brief Calculate partition using FNV-1a hash. Deterministic.
*/
static PyObject *fnv1a(PyObject *self, PyObject *args) {
return partitioner_helper(args, rd_kafka_msg_partitioner_fnv1a);
}


static PyMethodDef cimpl_methods[] = {
{"libversion", libversion, METH_NOARGS,
" Retrieve librdkafka version string and integer\n"
Expand All @@ -2987,6 +3062,50 @@ static PyMethodDef cimpl_methods[] = {
" :rtype: str\n"
"\n"
},
{"murmur2", murmur2, METH_VARARGS,
" Calculate partition using Murmur2 hash (Java-compatible).\n"
"\n"
" Deterministic function using Murmur2 hashing algorithm.\n"
" assigned to.\n"
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line appears to be a documentation fragment that doesn't make sense in context. It should be removed or completed with proper context.

Suggested change
" assigned to.\n"

Copilot uses AI. Check for mistakes.
"\n"
" :param bytes key: The message key\n"
" :param int partition_count: Number of partitions (must be > 0)\n"
" :returns: Partition ID (0 to partition_count-1)\n"
" :rtype: int\n"
" :raises ValueError: If partition_count <= 0\n"
" :raises TypeError: If key is not bytes or partition_count is not int\n"
"\n"
" Example::\n"
"\n"
" partition = murmur2(b\"user_12345\", 10)\n"
"\n"
},
{"consistent", consistent, METH_VARARGS,
" Calculate partition using CRC32-based consistent hash.\n"
"\n"
" Deterministic function using CRC32 hashing algorithm.\n"
"\n"
" :param bytes key: The message key\n"
" :param int partition_count: Number of partitions (must be > 0)\n"
" :returns: Partition ID (0 to partition_count-1)\n"
" :rtype: int\n"
" :raises ValueError: If partition_count <= 0\n"
" :raises TypeError: If key is not bytes or partition_count is not int\n"
"\n"
},
{"fnv1a", fnv1a, METH_VARARGS,
" Calculate partition using FNV-1a hash.\n"
"\n"
" Deterministic function using FNV-1a hashing algorithm.\n"
"\n"
" :param bytes key: The message key\n"
" :param int partition_count: Number of partitions (must be > 0)\n"
" :returns: Partition ID (0 to partition_count-1)\n"
" :rtype: int\n"
" :raises ValueError: If partition_count <= 0\n"
" :raises TypeError: If key is not bytes or partition_count is not int\n"
"\n"
},
{ NULL }
};

Expand Down
35 changes: 35 additions & 0 deletions tests/test_partitioners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Unit tests for partitioner functions.
"""

import pytest
from confluent_kafka import murmur2, consistent, fnv1a


class TestPartitioners:

def test_deterministic(self):
"""Test that deterministic partitioners produce the same output for the same input."""
key = b"test_key"
partition_count = 10

# Same input should always produce same output
assert murmur2(key, partition_count) == murmur2(key, partition_count)
assert consistent(key, partition_count) == consistent(key, partition_count)
assert fnv1a(key, partition_count) == fnv1a(key, partition_count)

def test_input_validation(self):
"""Test input validation logic."""
# Invalid partition_count
with pytest.raises(ValueError, match="partition_count must be > 0"):
murmur2(b"key", 0)

# Invalid key type
with pytest.raises(TypeError):
murmur2("string_key", 10)

# Invalid partition_count type
with pytest.raises(TypeError):
murmur2(b"key", "10")