From 986f9def165dcaa3a43177a160b6b417a3feec8b Mon Sep 17 00:00:00 2001 From: Pavel Stadnikov Date: Fri, 26 Jul 2019 17:09:11 +0300 Subject: [PATCH 1/6] add partitioner with murmur2 from kafka --- aiokafka/partitioner/__init__.py | 13 ++++++ aiokafka/partitioner/default.py | 32 ++++++++++++++ aiokafka/partitioner/hashed.py | 48 +++++++++++++++++++++ aiokafka/partitioner/murmur2.py | 65 ++++++++++++++++++++++++++++ tests/test_partitioner.py | 72 ++++++++++++++++++++++++++++++++ 5 files changed, 230 insertions(+) create mode 100644 aiokafka/partitioner/__init__.py create mode 100644 aiokafka/partitioner/default.py create mode 100644 aiokafka/partitioner/hashed.py create mode 100644 aiokafka/partitioner/murmur2.py create mode 100644 tests/test_partitioner.py diff --git a/aiokafka/partitioner/__init__.py b/aiokafka/partitioner/__init__.py new file mode 100644 index 00000000..226e38e2 --- /dev/null +++ b/aiokafka/partitioner/__init__.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import + +from kafka.partitioner.roundrobin import RoundRobinPartitioner + +from .default import DefaultPartitioner +from .hashed import ( + HashedPartitioner, Murmur2Partitioner, LegacyPartitioner, murmur2, +) + +__all__ = [ + 'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner', + 'Murmur2Partitioner', 'LegacyPartitioner', 'murmur2', +] diff --git a/aiokafka/partitioner/default.py b/aiokafka/partitioner/default.py new file mode 100644 index 00000000..087166c0 --- /dev/null +++ b/aiokafka/partitioner/default.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import + +import random + +from .hashed import murmur2 + + +class DefaultPartitioner(object): + """Default partitioner. + + Hashes key to partition using murmur2 hashing (from java client) + If key is None, selects partition randomly from available, + or from all partitions if none are currently available + """ + @classmethod + def __call__(cls, key, all_partitions, available): + """ + Get the partition corresponding to key + :param key: partitioning key + :param all_partitions: list of all partitions sorted by partition ID + :param available: list of available partitions in no particular order + :return: one of the values from all_partitions or available + """ + if key is None: + if available: + return random.choice(available) + return random.choice(all_partitions) + + idx = murmur2(key) + idx &= 0x7fffffff + idx %= len(all_partitions) + return all_partitions[idx] diff --git a/aiokafka/partitioner/hashed.py b/aiokafka/partitioner/hashed.py new file mode 100644 index 00000000..0b488ce5 --- /dev/null +++ b/aiokafka/partitioner/hashed.py @@ -0,0 +1,48 @@ +from __future__ import absolute_import + +from kafka.partitioner.base import Partitioner + +from .murmur2 import murmur2 + + +class Murmur2Partitioner(Partitioner): + """ + Implements a partitioner which selects the target partition based on + the hash of the key. Attempts to apply the same hashing + function as mainline java client. + """ + def __call__(self, key, partitions=None, available=None): + if available: + return self.partition(key, available) + return self.partition(key, partitions) + + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions + + # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 + idx = (murmur2(key) & 0x7fffffff) % len(partitions) + + return partitions[idx] + + +class LegacyPartitioner(object): + """DEPRECATED -- See Issue 374 + + Implements a partitioner which selects the target partition based on + the hash of the key + """ + def __init__(self, partitions): + self.partitions = partitions + + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions + size = len(partitions) + idx = hash(key) % size + + return partitions[idx] + + +# Default will change to Murmur2 in 0.10 release +HashedPartitioner = LegacyPartitioner diff --git a/aiokafka/partitioner/murmur2.py b/aiokafka/partitioner/murmur2.py new file mode 100644 index 00000000..099d878c --- /dev/null +++ b/aiokafka/partitioner/murmur2.py @@ -0,0 +1,65 @@ +# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 +def murmur2(data): + """Pure-python Murmur2 implementation. + + Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 + + Args: + data (bytes): opaque bytes + + Returns: MurmurHash2 of data + """ + length = len(data) + seed = 0x9747b28c + # 'm' and 'r' are mixing constants generated offline. + # They're not really 'magic', they just happen to work well. + m = 0x5bd1e995 + r = 24 + + # Initialize the hash to a random value + h = seed ^ length + length4 = length // 4 + + for i in range(length4): + i4 = i * 4 + k = ( + (data[i4 + 0] & 0xff) + + ((data[i4 + 1] & 0xff) << 8) + + ((data[i4 + 2] & 0xff) << 16) + + ((data[i4 + 3] & 0xff) << 24) + ) + k &= 0xffffffff + k *= m + k &= 0xffffffff + k ^= (k % 0x100000000) >> r # k ^= k >>> r + k &= 0xffffffff + k *= m + k &= 0xffffffff + + h *= m + h &= 0xffffffff + h ^= k + h &= 0xffffffff + + # Handle the last few bytes of the input array + extra_bytes = length % 4 + if extra_bytes >= 3: + h ^= (data[(length & ~3) + 2] & 0xff) << 16 + h &= 0xffffffff + if extra_bytes >= 2: + h ^= (data[(length & ~3) + 1] & 0xff) << 8 + h &= 0xffffffff + if extra_bytes >= 1: + h ^= (data[length & ~3] & 0xff) + h &= 0xffffffff + h *= m + h &= 0xffffffff + + h ^= (h % 0x100000000) >> 13 # h >>> 13; + h &= 0xffffffff + h *= m + h &= 0xffffffff + h ^= (h % 0x100000000) >> 15 # h >>> 15; + h &= 0xffffffff + + return h diff --git a/tests/test_partitioner.py b/tests/test_partitioner.py new file mode 100644 index 00000000..7c61ec4a --- /dev/null +++ b/tests/test_partitioner.py @@ -0,0 +1,72 @@ +from __future__ import absolute_import + +import pytest + +from aiokafka.partitioner import ( + DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner, murmur2, +) + + +def test_default_partitioner(): + partitioner = DefaultPartitioner() + all_partitions = available = list(range(100)) + # partitioner should return the same partition for the same key + p1 = partitioner(b'foo', all_partitions, available) + p2 = partitioner(b'foo', all_partitions, available) + assert p1 == p2 + assert p1 in all_partitions + + # when key is None, choose one of available partitions + assert partitioner(None, all_partitions, [123]) == 123 + + # with fallback to all_partitions + assert partitioner(None, all_partitions, []) in all_partitions + + +def test_roundrobin_partitioner(): + partitioner = RoundRobinPartitioner() + all_partitions = available = list(range(100)) + # partitioner should cycle between partitions + i = 0 + max_partition = all_partitions[len(all_partitions) - 1] + while i <= max_partition: + assert i == partitioner(None, all_partitions, available) + i += 1 + + i = 0 + while i <= int(max_partition / 2): + assert i == partitioner(None, all_partitions, available) + i += 1 + + # test dynamic partition re-assignment + available = available[:-25] + + while i <= max(available): + assert i == partitioner(None, all_partitions, available) + i += 1 + + all_partitions = list(range(200)) + available = all_partitions + + max_partition = all_partitions[len(all_partitions) - 1] + while i <= max_partition: + assert i == partitioner(None, all_partitions, available) + i += 1 + + +@pytest.mark.parametrize("bytes_payload,partition_number", [ + (b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566), + (b'\x00 ', 742) +]) +def test_murmur2_java_compatibility(bytes_payload, partition_number): + p = Murmur2Partitioner(range(1000)) + # compare with output from Kafka's + # org.apache.kafka.clients.producer.Partitioner + assert p.partition(bytes_payload) == partition_number + + +def test_murmur2_not_ascii(): + # Verify no regression of murmur2() bug encoding py2 + # bytes that don't ascii encode + murmur2(b'\xa4') + murmur2(b'\x81' * 1000) From 2a2d467d86075a5230713f0eed30813658799293 Mon Sep 17 00:00:00 2001 From: Pavel Stadnikov Date: Fri, 26 Jul 2019 18:51:29 +0300 Subject: [PATCH 2/6] rewrite murmur2 to cython, use it in producer --- .gitignore | 1 + aiokafka/partitioner/{murmur2.py => murmur2.pyx} | 3 +++ aiokafka/producer/producer.py | 2 +- setup.py | 7 +++++++ 4 files changed, 12 insertions(+), 1 deletion(-) rename aiokafka/partitioner/{murmur2.py => murmur2.pyx} (95%) diff --git a/.gitignore b/.gitignore index 79e30457..80b35f73 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ tests/ssl_cert tests/keytab # Cython extensions +aiokafka/partitioner/murmur2.c aiokafka/record/_crecords/default_records.c aiokafka/record/_crecords/legacy_records.c aiokafka/record/_crecords/memory_records.c diff --git a/aiokafka/partitioner/murmur2.py b/aiokafka/partitioner/murmur2.pyx similarity index 95% rename from aiokafka/partitioner/murmur2.py rename to aiokafka/partitioner/murmur2.pyx index 099d878c..ee2f613a 100644 --- a/aiokafka/partitioner/murmur2.py +++ b/aiokafka/partitioner/murmur2.pyx @@ -9,6 +9,9 @@ def murmur2(data): Returns: MurmurHash2 of data """ + cdef int length, seed, r, length4, i, i4, extra_bytes + cdef long m, h, k + length = len(data) seed = 0x9747b28c # 'm' and 'r' are mixing constants generated offline. diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index d96e0a1b..0e5db207 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -4,7 +4,6 @@ import traceback import warnings -from kafka.partitioner.default import DefaultPartitioner from kafka.codec import has_gzip, has_snappy, has_lz4 from aiokafka.client import AIOKafkaClient @@ -17,6 +16,7 @@ ) from .message_accumulator import MessageAccumulator +from ..partitioner import DefaultPartitioner from .sender import Sender from .transaction_manager import TransactionManager diff --git a/setup.py b/setup.py index 4add681e..a8ab7d1f 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,13 @@ extra_compile_args=CFLAGS, extra_link_args=LDFLAGS ), + Extension( + 'aiokafka.partitioner.murmur2', + ['aiokafka/partitioner/murmur2' + ext], + libraries=LIBRARIES, + extra_compile_args=CFLAGS, + extra_link_args=LDFLAGS + ), ] From 62211017e4364cdd2076bac377fedcae7b676ea1 Mon Sep 17 00:00:00 2001 From: Paul Stadnikov Date: Sun, 28 Jul 2019 21:07:17 +0300 Subject: [PATCH 3/6] fix typos --- benchmark/README | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmark/README b/benchmark/README index b2a13fc0..257a90b5 100644 --- a/benchmark/README +++ b/benchmark/README @@ -1,7 +1,7 @@ The batch compose and read benchmarks in this section are written using -``perf`` library, created by Viktor Stinner. For more information on how to get +``perf`` library, created by Victor Stinner. For more information on how to get reliable results of test runs please consult -http://perf.readthedocs.io/en/latest/run_benchmark.html. +https://pyperf.readthedocs.io/en/latest/run_benchmark.html. The `simple_` benchmarks can be just run, consult command line argument on how to run those. From d0b018b4eb7d0a29c9784b631f06289cabd1e970 Mon Sep 17 00:00:00 2001 From: Paul Stadnikov Date: Sun, 4 Aug 2019 16:34:45 +0300 Subject: [PATCH 4/6] add benchmark --- .gitignore | 3 +++ aiokafka/partitioner/murmur2.pyx | 2 +- benchmark/murmur2.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 benchmark/murmur2.py diff --git a/.gitignore b/.gitignore index 80b35f73..5aa3d971 100644 --- a/.gitignore +++ b/.gitignore @@ -61,6 +61,9 @@ target/ # PyCharm .idea +# VSCode +.vscode + kafka_2* tests/ssl_cert diff --git a/aiokafka/partitioner/murmur2.pyx b/aiokafka/partitioner/murmur2.pyx index ee2f613a..643dbb51 100644 --- a/aiokafka/partitioner/murmur2.pyx +++ b/aiokafka/partitioner/murmur2.pyx @@ -1,6 +1,6 @@ # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 def murmur2(data): - """Pure-python Murmur2 implementation. + """Cython Murmur2 implementation. Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 diff --git a/benchmark/murmur2.py b/benchmark/murmur2.py new file mode 100644 index 00000000..b231f80e --- /dev/null +++ b/benchmark/murmur2.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 +import pyperf + +from kafka.partitioner.hashed import murmur2 as murmur2_kafka +from aiokafka.partitioner import murmur2 + + +def run_murmur2(loops: int): + data = list(range(10)) + t0 = pyperf.perf_counter() + for _ in range(loops): + murmur2(data) + res = pyperf.perf_counter() - t0 + + return res + +def run_murmur2_kafka(loops: int): + data = list(range(10)) + t0 = pyperf.perf_counter() + for _ in range(loops): + murmur2_kafka(data) + res = pyperf.perf_counter() - t0 + + return res + + +runner = pyperf.Runner() +runner.bench_time_func('murmur2 cython realization', run_murmur2) +runner.bench_time_func('murmur2 python realization', run_murmur2_kafka) From 73e1e36f420484d13d45671765ebb723fd397518 Mon Sep 17 00:00:00 2001 From: Pavel Stadnikov Date: Fri, 9 Aug 2019 12:19:37 +0300 Subject: [PATCH 5/6] add some improvements --- aiokafka/partitioner/murmur2.html | 531 ++++++++++++++++++++++++++++++ aiokafka/partitioner/murmur2.pyx | 33 +- benchmark/murmur2.py | 4 +- 3 files changed, 547 insertions(+), 21 deletions(-) create mode 100644 aiokafka/partitioner/murmur2.html diff --git a/aiokafka/partitioner/murmur2.html b/aiokafka/partitioner/murmur2.html new file mode 100644 index 00000000..419f1e49 --- /dev/null +++ b/aiokafka/partitioner/murmur2.html @@ -0,0 +1,531 @@ + + + + + + Cython: murmur2.pyx + + + +

Generated by Cython 0.29.11

+

+ Yellow lines hint at Python interaction.
+ Click on a line that starts with a "+" to see the C code that Cython generated for it. +

+

Raw output: murmur2.c

+
 01: from cpython cimport (
+
 02:     PyBytes_AS_STRING
+
 03: )
+
 04: from libc.stdint cimport uint32_t
+
 05: 
+
 06: 
+
 07: # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
+
+08: def murmur2(bytes in_bytes):
+
/* Python wrapper */
+static PyObject *__pyx_pw_8aiokafka_11partitioner_7murmur2_1murmur2(PyObject *__pyx_self, PyObject *__pyx_v_in_bytes); /*proto*/
+static char __pyx_doc_8aiokafka_11partitioner_7murmur2_murmur2[] = "Cython Murmur2 implementation.\n\n    Based on java client, see org.apache.kafka.common.utils.Utils.murmur2\n\n    Args:\n        data (bytes): opaque bytes\n\n    Returns: MurmurHash2 of data\n    ";
+static PyMethodDef __pyx_mdef_8aiokafka_11partitioner_7murmur2_1murmur2 = {"murmur2", (PyCFunction)__pyx_pw_8aiokafka_11partitioner_7murmur2_1murmur2, METH_O, __pyx_doc_8aiokafka_11partitioner_7murmur2_murmur2};
+static PyObject *__pyx_pw_8aiokafka_11partitioner_7murmur2_1murmur2(PyObject *__pyx_self, PyObject *__pyx_v_in_bytes) {
+  PyObject *__pyx_r = 0;
+  __Pyx_RefNannyDeclarations
+  __Pyx_RefNannySetupContext("murmur2 (wrapper)", 0);
+  if (unlikely(!__Pyx_ArgTypeTest(((PyObject *)__pyx_v_in_bytes), (&PyBytes_Type), 1, "in_bytes", 1))) __PYX_ERR(0, 8, __pyx_L1_error)
+  __pyx_r = __pyx_pf_8aiokafka_11partitioner_7murmur2_murmur2(__pyx_self, ((PyObject*)__pyx_v_in_bytes));
+
+  /* function exit code */
+  goto __pyx_L0;
+  __pyx_L1_error:;
+  __pyx_r = NULL;
+  __pyx_L0:;
+  __Pyx_RefNannyFinishContext();
+  return __pyx_r;
+}
+
+static PyObject *__pyx_pf_8aiokafka_11partitioner_7murmur2_murmur2(CYTHON_UNUSED PyObject *__pyx_self, PyObject *__pyx_v_in_bytes) {
+  uint32_t __pyx_v_length;
+  uint32_t __pyx_v_seed;
+  uint32_t __pyx_v_r;
+  uint32_t __pyx_v_length4;
+  uint32_t __pyx_v_i;
+  uint32_t __pyx_v_i4;
+  uint32_t __pyx_v_extra_bytes;
+  uint32_t __pyx_v_m;
+  uint32_t __pyx_v_h;
+  uint32_t __pyx_v_k;
+  char *__pyx_v_data;
+  PyObject *__pyx_r = NULL;
+  __Pyx_RefNannyDeclarations
+  __Pyx_RefNannySetupContext("murmur2", 0);
+/* … */
+  /* function exit code */
+  __pyx_L1_error:;
+  __Pyx_XDECREF(__pyx_t_6);
+  __Pyx_AddTraceback("aiokafka.partitioner.murmur2.murmur2", __pyx_clineno, __pyx_lineno, __pyx_filename);
+  __pyx_r = NULL;
+  __pyx_L0:;
+  __Pyx_XGIVEREF(__pyx_r);
+  __Pyx_RefNannyFinishContext();
+  return __pyx_r;
+}
+/* … */
+  __pyx_tuple_ = PyTuple_Pack(12, __pyx_n_s_in_bytes, __pyx_n_s_length, __pyx_n_s_seed, __pyx_n_s_r, __pyx_n_s_length4, __pyx_n_s_i, __pyx_n_s_i4, __pyx_n_s_extra_bytes, __pyx_n_s_m, __pyx_n_s_h, __pyx_n_s_k, __pyx_n_s_data); if (unlikely(!__pyx_tuple_)) __PYX_ERR(0, 8, __pyx_L1_error)
+  __Pyx_GOTREF(__pyx_tuple_);
+  __Pyx_GIVEREF(__pyx_tuple_);
+/* … */
+  __pyx_t_1 = PyCFunction_NewEx(&__pyx_mdef_8aiokafka_11partitioner_7murmur2_1murmur2, NULL, __pyx_n_s_aiokafka_partitioner_murmur2); if (unlikely(!__pyx_t_1)) __PYX_ERR(0, 8, __pyx_L1_error)
+  __Pyx_GOTREF(__pyx_t_1);
+  if (PyDict_SetItem(__pyx_d, __pyx_n_s_murmur2, __pyx_t_1) < 0) __PYX_ERR(0, 8, __pyx_L1_error)
+  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
+
 09:     """Cython Murmur2 implementation.
+
 10: 
+
 11:     Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
+
 12: 
+
 13:     Args:
+
 14:         data (bytes): opaque bytes
+
 15: 
+
 16:     Returns: MurmurHash2 of data
+
 17:     """
+
 18:     cdef uint32_t length, seed, r, length4, i, i4, extra_bytes
+
 19:     cdef uint32_t m, h, k
+
 20:     cdef char* data
+
 21: 
+
+22:     data = PyBytes_AS_STRING(in_bytes)
+
  __pyx_v_data = PyBytes_AS_STRING(__pyx_v_in_bytes);
+
+23:     length = len(data)
+
  __pyx_t_1 = strlen(__pyx_v_data); 
+  __pyx_v_length = __pyx_t_1;
+
+24:     seed = 0x9747b28c
+
  __pyx_v_seed = 0x9747b28c;
+
 25:     # 'm' and 'r' are mixing constants generated offline.
+
 26:     # They're not really 'magic', they just happen to work well.
+
+27:     m = 0x5bd1e995
+
  __pyx_v_m = 0x5bd1e995;
+
+28:     r = 24
+
  __pyx_v_r = 24;
+
 29: 
+
 30:     # Initialize the hash to a random value
+
+31:     h = seed ^ length
+
  __pyx_v_h = (__pyx_v_seed ^ __pyx_v_length);
+
+32:     length4 = length // 4
+
  __pyx_v_length4 = __Pyx_div_long(__pyx_v_length, 4);
+
 33: 
+
+34:     for i in range(length4):
+
  __pyx_t_2 = __pyx_v_length4;
+  __pyx_t_3 = __pyx_t_2;
+  for (__pyx_t_4 = 0; __pyx_t_4 < __pyx_t_3; __pyx_t_4+=1) {
+    __pyx_v_i = __pyx_t_4;
+
+35:         i4 = i * 4
+
    __pyx_v_i4 = (__pyx_v_i * 4);
+
 36:         k = (
+
 37:                 (data[i4 + 0] & 0xff) +
+
 38:                 ((data[i4 + 1] & 0xff) << 8) +
+
+39:                 ((data[i4 + 2] & 0xff) << 16) +
+
    __pyx_v_k = (((((__pyx_v_data[(__pyx_v_i4 + 0)]) & 0xff) + (((__pyx_v_data[(__pyx_v_i4 + 1)]) & 0xff) << 8)) + (((__pyx_v_data[(__pyx_v_i4 + 2)]) & 0xff) << 16)) + (((__pyx_v_data[(__pyx_v_i4 + 3)]) & 0xff) << 24));
+
 40:                 ((data[i4 + 3] & 0xff) << 24)
+
 41:         )
+
+42:         k *= m
+
    __pyx_v_k = (__pyx_v_k * __pyx_v_m);
+
+43:         k ^= k >> r  # k ^= k >>> r
+
    __pyx_v_k = (__pyx_v_k ^ (__pyx_v_k >> __pyx_v_r));
+
+44:         k *= m
+
    __pyx_v_k = (__pyx_v_k * __pyx_v_m);
+
 45: 
+
+46:         h *= m
+
    __pyx_v_h = (__pyx_v_h * __pyx_v_m);
+
+47:         h ^= k
+
    __pyx_v_h = (__pyx_v_h ^ __pyx_v_k);
+  }
+
 48: 
+
 49:     # Handle the last few bytes of the input array
+
+50:     extra_bytes = length % 4
+
  __pyx_v_extra_bytes = __Pyx_mod_long(__pyx_v_length, 4);
+
+51:     if extra_bytes >= 3:
+
  __pyx_t_5 = ((__pyx_v_extra_bytes >= 3) != 0);
+  if (__pyx_t_5) {
+/* … */
+  }
+
+52:         h ^= (data[(length & ~3) + 2] & 0xff) << 16
+
    __pyx_v_h = (__pyx_v_h ^ (((__pyx_v_data[((__pyx_v_length & (~3)) + 2)]) & 0xff) << 16));
+
+53:     if extra_bytes >= 2:
+
  __pyx_t_5 = ((__pyx_v_extra_bytes >= 2) != 0);
+  if (__pyx_t_5) {
+/* … */
+  }
+
+54:         h ^= (data[(length & ~3) + 1] & 0xff) << 8
+
    __pyx_v_h = (__pyx_v_h ^ (((__pyx_v_data[((__pyx_v_length & (~3)) + 1)]) & 0xff) << 8));
+
+55:     if extra_bytes >= 1:
+
  __pyx_t_5 = ((__pyx_v_extra_bytes >= 1) != 0);
+  if (__pyx_t_5) {
+/* … */
+  }
+
+56:         h ^= (data[length & ~3] & 0xff)
+
    __pyx_v_h = (__pyx_v_h ^ ((__pyx_v_data[(__pyx_v_length & (~3))]) & 0xff));
+
+57:         h *= m
+
    __pyx_v_h = (__pyx_v_h * __pyx_v_m);
+
 58: 
+
+59:     h ^= h >> 13  # h >>> 13;
+
  __pyx_v_h = (__pyx_v_h ^ (__pyx_v_h >> 13));
+
+60:     h *= m
+
  __pyx_v_h = (__pyx_v_h * __pyx_v_m);
+
+61:     h ^= h >> 15  # h >>> 15;
+
  __pyx_v_h = (__pyx_v_h ^ (__pyx_v_h >> 15));
+
 62: 
+
+63:     return h
+
  __Pyx_XDECREF(__pyx_r);
+  __pyx_t_6 = __Pyx_PyInt_From_uint32_t(__pyx_v_h); if (unlikely(!__pyx_t_6)) __PYX_ERR(0, 63, __pyx_L1_error)
+  __Pyx_GOTREF(__pyx_t_6);
+  __pyx_r = __pyx_t_6;
+  __pyx_t_6 = 0;
+  goto __pyx_L0;
+
diff --git a/aiokafka/partitioner/murmur2.pyx b/aiokafka/partitioner/murmur2.pyx index 643dbb51..0615e73a 100644 --- a/aiokafka/partitioner/murmur2.pyx +++ b/aiokafka/partitioner/murmur2.pyx @@ -1,5 +1,11 @@ +from cpython cimport ( + PyBytes_AS_STRING +) +from libc.stdint cimport uint32_t + + # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 -def murmur2(data): +def murmur2(bytes in_bytes): """Cython Murmur2 implementation. Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 @@ -9,9 +15,11 @@ def murmur2(data): Returns: MurmurHash2 of data """ - cdef int length, seed, r, length4, i, i4, extra_bytes - cdef long m, h, k + cdef uint32_t length, seed, r, length4, i, i4, extra_bytes + cdef uint32_t m, h, k + cdef char* data + data = PyBytes_AS_STRING(in_bytes) length = len(data) seed = 0x9747b28c # 'm' and 'r' are mixing constants generated offline. @@ -31,38 +39,25 @@ def murmur2(data): ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24) ) - k &= 0xffffffff k *= m - k &= 0xffffffff - k ^= (k % 0x100000000) >> r # k ^= k >>> r - k &= 0xffffffff + k ^= k >> r # k ^= k >>> r k *= m - k &= 0xffffffff h *= m - h &= 0xffffffff h ^= k - h &= 0xffffffff # Handle the last few bytes of the input array extra_bytes = length % 4 if extra_bytes >= 3: h ^= (data[(length & ~3) + 2] & 0xff) << 16 - h &= 0xffffffff if extra_bytes >= 2: h ^= (data[(length & ~3) + 1] & 0xff) << 8 - h &= 0xffffffff if extra_bytes >= 1: h ^= (data[length & ~3] & 0xff) - h &= 0xffffffff h *= m - h &= 0xffffffff - h ^= (h % 0x100000000) >> 13 # h >>> 13; - h &= 0xffffffff + h ^= h >> 13 # h >>> 13; h *= m - h &= 0xffffffff - h ^= (h % 0x100000000) >> 15 # h >>> 15; - h &= 0xffffffff + h ^= h >> 15 # h >>> 15; return h diff --git a/benchmark/murmur2.py b/benchmark/murmur2.py index b231f80e..dd60b10a 100644 --- a/benchmark/murmur2.py +++ b/benchmark/murmur2.py @@ -6,7 +6,7 @@ def run_murmur2(loops: int): - data = list(range(10)) + data = bytes(range(10)) t0 = pyperf.perf_counter() for _ in range(loops): murmur2(data) @@ -15,7 +15,7 @@ def run_murmur2(loops: int): return res def run_murmur2_kafka(loops: int): - data = list(range(10)) + data = bytes(range(10)) t0 = pyperf.perf_counter() for _ in range(loops): murmur2_kafka(data) From 245f743331be028183381648f46b3a9e1d0b5931 Mon Sep 17 00:00:00 2001 From: Paul Stadnikov Date: Sun, 15 Sep 2019 18:59:41 +0300 Subject: [PATCH 6/6] remove murmur.html --- aiokafka/partitioner/murmur2.html | 531 ------------------------------ 1 file changed, 531 deletions(-) delete mode 100644 aiokafka/partitioner/murmur2.html diff --git a/aiokafka/partitioner/murmur2.html b/aiokafka/partitioner/murmur2.html deleted file mode 100644 index 419f1e49..00000000 --- a/aiokafka/partitioner/murmur2.html +++ /dev/null @@ -1,531 +0,0 @@ - - - - - - Cython: murmur2.pyx - - - -

Generated by Cython 0.29.11

-

- Yellow lines hint at Python interaction.
- Click on a line that starts with a "+" to see the C code that Cython generated for it. -

-

Raw output: murmur2.c

-
 01: from cpython cimport (
-
 02:     PyBytes_AS_STRING
-
 03: )
-
 04: from libc.stdint cimport uint32_t
-
 05: 
-
 06: 
-
 07: # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
-
+08: def murmur2(bytes in_bytes):
-
/* Python wrapper */
-static PyObject *__pyx_pw_8aiokafka_11partitioner_7murmur2_1murmur2(PyObject *__pyx_self, PyObject *__pyx_v_in_bytes); /*proto*/
-static char __pyx_doc_8aiokafka_11partitioner_7murmur2_murmur2[] = "Cython Murmur2 implementation.\n\n    Based on java client, see org.apache.kafka.common.utils.Utils.murmur2\n\n    Args:\n        data (bytes): opaque bytes\n\n    Returns: MurmurHash2 of data\n    ";
-static PyMethodDef __pyx_mdef_8aiokafka_11partitioner_7murmur2_1murmur2 = {"murmur2", (PyCFunction)__pyx_pw_8aiokafka_11partitioner_7murmur2_1murmur2, METH_O, __pyx_doc_8aiokafka_11partitioner_7murmur2_murmur2};
-static PyObject *__pyx_pw_8aiokafka_11partitioner_7murmur2_1murmur2(PyObject *__pyx_self, PyObject *__pyx_v_in_bytes) {
-  PyObject *__pyx_r = 0;
-  __Pyx_RefNannyDeclarations
-  __Pyx_RefNannySetupContext("murmur2 (wrapper)", 0);
-  if (unlikely(!__Pyx_ArgTypeTest(((PyObject *)__pyx_v_in_bytes), (&PyBytes_Type), 1, "in_bytes", 1))) __PYX_ERR(0, 8, __pyx_L1_error)
-  __pyx_r = __pyx_pf_8aiokafka_11partitioner_7murmur2_murmur2(__pyx_self, ((PyObject*)__pyx_v_in_bytes));
-
-  /* function exit code */
-  goto __pyx_L0;
-  __pyx_L1_error:;
-  __pyx_r = NULL;
-  __pyx_L0:;
-  __Pyx_RefNannyFinishContext();
-  return __pyx_r;
-}
-
-static PyObject *__pyx_pf_8aiokafka_11partitioner_7murmur2_murmur2(CYTHON_UNUSED PyObject *__pyx_self, PyObject *__pyx_v_in_bytes) {
-  uint32_t __pyx_v_length;
-  uint32_t __pyx_v_seed;
-  uint32_t __pyx_v_r;
-  uint32_t __pyx_v_length4;
-  uint32_t __pyx_v_i;
-  uint32_t __pyx_v_i4;
-  uint32_t __pyx_v_extra_bytes;
-  uint32_t __pyx_v_m;
-  uint32_t __pyx_v_h;
-  uint32_t __pyx_v_k;
-  char *__pyx_v_data;
-  PyObject *__pyx_r = NULL;
-  __Pyx_RefNannyDeclarations
-  __Pyx_RefNannySetupContext("murmur2", 0);
-/* … */
-  /* function exit code */
-  __pyx_L1_error:;
-  __Pyx_XDECREF(__pyx_t_6);
-  __Pyx_AddTraceback("aiokafka.partitioner.murmur2.murmur2", __pyx_clineno, __pyx_lineno, __pyx_filename);
-  __pyx_r = NULL;
-  __pyx_L0:;
-  __Pyx_XGIVEREF(__pyx_r);
-  __Pyx_RefNannyFinishContext();
-  return __pyx_r;
-}
-/* … */
-  __pyx_tuple_ = PyTuple_Pack(12, __pyx_n_s_in_bytes, __pyx_n_s_length, __pyx_n_s_seed, __pyx_n_s_r, __pyx_n_s_length4, __pyx_n_s_i, __pyx_n_s_i4, __pyx_n_s_extra_bytes, __pyx_n_s_m, __pyx_n_s_h, __pyx_n_s_k, __pyx_n_s_data); if (unlikely(!__pyx_tuple_)) __PYX_ERR(0, 8, __pyx_L1_error)
-  __Pyx_GOTREF(__pyx_tuple_);
-  __Pyx_GIVEREF(__pyx_tuple_);
-/* … */
-  __pyx_t_1 = PyCFunction_NewEx(&__pyx_mdef_8aiokafka_11partitioner_7murmur2_1murmur2, NULL, __pyx_n_s_aiokafka_partitioner_murmur2); if (unlikely(!__pyx_t_1)) __PYX_ERR(0, 8, __pyx_L1_error)
-  __Pyx_GOTREF(__pyx_t_1);
-  if (PyDict_SetItem(__pyx_d, __pyx_n_s_murmur2, __pyx_t_1) < 0) __PYX_ERR(0, 8, __pyx_L1_error)
-  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
-
 09:     """Cython Murmur2 implementation.
-
 10: 
-
 11:     Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
-
 12: 
-
 13:     Args:
-
 14:         data (bytes): opaque bytes
-
 15: 
-
 16:     Returns: MurmurHash2 of data
-
 17:     """
-
 18:     cdef uint32_t length, seed, r, length4, i, i4, extra_bytes
-
 19:     cdef uint32_t m, h, k
-
 20:     cdef char* data
-
 21: 
-
+22:     data = PyBytes_AS_STRING(in_bytes)
-
  __pyx_v_data = PyBytes_AS_STRING(__pyx_v_in_bytes);
-
+23:     length = len(data)
-
  __pyx_t_1 = strlen(__pyx_v_data); 
-  __pyx_v_length = __pyx_t_1;
-
+24:     seed = 0x9747b28c
-
  __pyx_v_seed = 0x9747b28c;
-
 25:     # 'm' and 'r' are mixing constants generated offline.
-
 26:     # They're not really 'magic', they just happen to work well.
-
+27:     m = 0x5bd1e995
-
  __pyx_v_m = 0x5bd1e995;
-
+28:     r = 24
-
  __pyx_v_r = 24;
-
 29: 
-
 30:     # Initialize the hash to a random value
-
+31:     h = seed ^ length
-
  __pyx_v_h = (__pyx_v_seed ^ __pyx_v_length);
-
+32:     length4 = length // 4
-
  __pyx_v_length4 = __Pyx_div_long(__pyx_v_length, 4);
-
 33: 
-
+34:     for i in range(length4):
-
  __pyx_t_2 = __pyx_v_length4;
-  __pyx_t_3 = __pyx_t_2;
-  for (__pyx_t_4 = 0; __pyx_t_4 < __pyx_t_3; __pyx_t_4+=1) {
-    __pyx_v_i = __pyx_t_4;
-
+35:         i4 = i * 4
-
    __pyx_v_i4 = (__pyx_v_i * 4);
-
 36:         k = (
-
 37:                 (data[i4 + 0] & 0xff) +
-
 38:                 ((data[i4 + 1] & 0xff) << 8) +
-
+39:                 ((data[i4 + 2] & 0xff) << 16) +
-
    __pyx_v_k = (((((__pyx_v_data[(__pyx_v_i4 + 0)]) & 0xff) + (((__pyx_v_data[(__pyx_v_i4 + 1)]) & 0xff) << 8)) + (((__pyx_v_data[(__pyx_v_i4 + 2)]) & 0xff) << 16)) + (((__pyx_v_data[(__pyx_v_i4 + 3)]) & 0xff) << 24));
-
 40:                 ((data[i4 + 3] & 0xff) << 24)
-
 41:         )
-
+42:         k *= m
-
    __pyx_v_k = (__pyx_v_k * __pyx_v_m);
-
+43:         k ^= k >> r  # k ^= k >>> r
-
    __pyx_v_k = (__pyx_v_k ^ (__pyx_v_k >> __pyx_v_r));
-
+44:         k *= m
-
    __pyx_v_k = (__pyx_v_k * __pyx_v_m);
-
 45: 
-
+46:         h *= m
-
    __pyx_v_h = (__pyx_v_h * __pyx_v_m);
-
+47:         h ^= k
-
    __pyx_v_h = (__pyx_v_h ^ __pyx_v_k);
-  }
-
 48: 
-
 49:     # Handle the last few bytes of the input array
-
+50:     extra_bytes = length % 4
-
  __pyx_v_extra_bytes = __Pyx_mod_long(__pyx_v_length, 4);
-
+51:     if extra_bytes >= 3:
-
  __pyx_t_5 = ((__pyx_v_extra_bytes >= 3) != 0);
-  if (__pyx_t_5) {
-/* … */
-  }
-
+52:         h ^= (data[(length & ~3) + 2] & 0xff) << 16
-
    __pyx_v_h = (__pyx_v_h ^ (((__pyx_v_data[((__pyx_v_length & (~3)) + 2)]) & 0xff) << 16));
-
+53:     if extra_bytes >= 2:
-
  __pyx_t_5 = ((__pyx_v_extra_bytes >= 2) != 0);
-  if (__pyx_t_5) {
-/* … */
-  }
-
+54:         h ^= (data[(length & ~3) + 1] & 0xff) << 8
-
    __pyx_v_h = (__pyx_v_h ^ (((__pyx_v_data[((__pyx_v_length & (~3)) + 1)]) & 0xff) << 8));
-
+55:     if extra_bytes >= 1:
-
  __pyx_t_5 = ((__pyx_v_extra_bytes >= 1) != 0);
-  if (__pyx_t_5) {
-/* … */
-  }
-
+56:         h ^= (data[length & ~3] & 0xff)
-
    __pyx_v_h = (__pyx_v_h ^ ((__pyx_v_data[(__pyx_v_length & (~3))]) & 0xff));
-
+57:         h *= m
-
    __pyx_v_h = (__pyx_v_h * __pyx_v_m);
-
 58: 
-
+59:     h ^= h >> 13  # h >>> 13;
-
  __pyx_v_h = (__pyx_v_h ^ (__pyx_v_h >> 13));
-
+60:     h *= m
-
  __pyx_v_h = (__pyx_v_h * __pyx_v_m);
-
+61:     h ^= h >> 15  # h >>> 15;
-
  __pyx_v_h = (__pyx_v_h ^ (__pyx_v_h >> 15));
-
 62: 
-
+63:     return h
-
  __Pyx_XDECREF(__pyx_r);
-  __pyx_t_6 = __Pyx_PyInt_From_uint32_t(__pyx_v_h); if (unlikely(!__pyx_t_6)) __PYX_ERR(0, 63, __pyx_L1_error)
-  __Pyx_GOTREF(__pyx_t_6);
-  __pyx_r = __pyx_t_6;
-  __pyx_t_6 = 0;
-  goto __pyx_L0;
-