Skip to content

Commit 518babd

Browse files
committed
Revert "Allow passing headers as both list(tuples) and dict()"
This reverts commit c55ffc1.
1 parent c55ffc1 commit 518babd

File tree

4 files changed

+39
-143
lines changed

4 files changed

+39
-143
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 30 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -939,124 +939,42 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
939939
}
940940

941941
#ifdef RD_KAFKA_V_HEADERS
942-
943-
944-
/**
945-
* @brief Convert Python list of tuples to rd_kafka_headers_t
946-
*/
947-
static rd_kafka_headers_t *py_headers_list_to_c (PyObject *hdrs) {
948-
int i, len;
949-
rd_kafka_headers_t *rd_headers = NULL;
950-
951-
len = (int)PyList_Size(hdrs);
952-
rd_headers = rd_kafka_headers_new(len);
953-
954-
for (i = 0; i < len; i++) {
955-
rd_kafka_resp_err_t err;
956-
const char *header_key, *header_value = NULL;
957-
int header_key_len = 0, header_value_len = 0;
958-
959-
if(!PyArg_ParseTuple(PyList_GET_ITEM(hdrs, i), "s#z#",
960-
&header_key, &header_key_len,
961-
&header_value, &header_value_len)){
962-
rd_kafka_headers_destroy(rd_headers);
963-
PyErr_SetString(PyExc_TypeError,
964-
"Headers are expected to be a "
965-
"tuple of (key, value)");
966-
return NULL;
967-
}
968-
969-
err = rd_kafka_header_add(rd_headers,
970-
header_key, header_key_len,
971-
header_value, header_value_len);
972-
if (err) {
973-
cfl_PyErr_Format(err,
974-
"Unable to add message header \"%s\": "
975-
"%s",
976-
header_key, rd_kafka_err2str(err));
977-
rd_kafka_headers_destroy(rd_headers);
978-
return NULL;
979-
}
980-
}
981-
return rd_headers;
982-
}
983-
984-
985-
/**
986-
* @brief Convert Python dict to rd_kafka_headers_t
987-
*/
988-
static rd_kafka_headers_t *py_headers_dict_to_c (PyObject *hdrs) {
989-
int len;
990-
Py_ssize_t pos = 0;
991-
rd_kafka_headers_t *rd_headers = NULL;
992-
PyObject *ko, *vo;
993-
994-
len = (int)PyDict_Size(hdrs);
995-
rd_headers = rd_kafka_headers_new(len);
996-
997-
while (PyDict_Next(hdrs, &pos, &ko, &vo)) {
998-
PyObject *ks, *ks8;
999-
const char *k;
1000-
const void *v = NULL;
1001-
Py_ssize_t vsize = 0;
1002-
rd_kafka_resp_err_t err;
1003-
1004-
if (!(ks = cfl_PyObject_Unistr(ko))) {
1005-
PyErr_SetString(PyExc_TypeError,
1006-
"expected header key to be unicode "
1007-
"string");
1008-
rd_kafka_headers_destroy(rd_headers);
1009-
return NULL;
1010-
}
1011-
1012-
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1013-
1014-
if (vo != Py_None) {
1015-
if (PyString_AsStringAndSize(vo, (char **)&v,
1016-
&vsize) == -1) {
1017-
Py_DECREF(ks);
1018-
rd_kafka_headers_destroy(rd_headers);
1019-
return NULL;
1020-
}
1021-
}
1022-
1023-
if ((err = rd_kafka_header_add(rd_headers, k, -1, v, vsize))) {
1024-
cfl_PyErr_Format(err,
1025-
"Unable to add message header \"%s\": "
1026-
"%s",
1027-
k, rd_kafka_err2str(err));
1028-
Py_DECREF(ks);
1029-
rd_kafka_headers_destroy(rd_headers);
1030-
return NULL;
1031-
}
1032-
1033-
Py_DECREF(ks);
1034-
}
1035-
1036-
return rd_headers;
1037-
}
1038-
1039-
1040942
/**
1041943
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
1042944
*
1043945
* @returns The new Python list[(header_key, header_value),...] object.
1044946
*/
1045-
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs) {
1046-
1047-
if (PyList_Check(hdrs)) {
1048-
return py_headers_list_to_c(hdrs);
1049-
} else if (PyDict_Check(hdrs)) {
1050-
return py_headers_dict_to_c(hdrs);
1051-
} else {
1052-
PyErr_Format(PyExc_TypeError,
1053-
"expected headers to be "
1054-
"dict or list of (key, value) tuples, not %s",
1055-
((PyTypeObject *)PyObject_Type(hdrs))->tp_name);
1056-
return NULL;
947+
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist) {
948+
int i, len;
949+
rd_kafka_headers_t *rd_headers = NULL;
950+
rd_kafka_resp_err_t err;
951+
const char *header_key, *header_value = NULL;
952+
int header_key_len = 0, header_value_len = 0;
953+
954+
len = PyList_Size(headers_plist);
955+
rd_headers = rd_kafka_headers_new(len);
956+
957+
for (i = 0; i < len; i++) {
958+
959+
if(!PyArg_ParseTuple(PyList_GET_ITEM(headers_plist, i), "s#z#", &header_key,
960+
&header_key_len, &header_value, &header_value_len)){
961+
rd_kafka_headers_destroy(rd_headers);
962+
PyErr_SetString(PyExc_TypeError,
963+
"Headers are expected to be a tuple of (key, value)");
964+
return NULL;
1057965
}
1058-
}
1059966

967+
err = rd_kafka_header_add(rd_headers, header_key, header_key_len, header_value, header_value_len);
968+
if (err) {
969+
rd_kafka_headers_destroy(rd_headers);
970+
cfl_PyErr_Format(err,
971+
"Unable to create message headers: %s",
972+
rd_kafka_err2str(err));
973+
return NULL;
974+
}
975+
}
976+
return rd_headers;
977+
}
1060978

1061979
/**
1062980
* @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
@@ -1071,7 +989,7 @@ PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
1071989
size_t header_value_size;
1072990
PyObject *header_list;
1073991

1074-
header_size = rd_kafka_header_cnt(headers);
992+
header_size = rd_kafka_header_cnt(headers);
1075993
header_list = PyList_New(header_size);
1076994

1077995
while (!rd_kafka_header_get_all(headers, idx++,

confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
270270
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
271271

272272
#ifdef RD_KAFKA_V_HEADERS
273-
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs);
273+
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist);
274274
PyObject *c_headers_to_py (rd_kafka_headers_t *headers);
275275
#endif
276276
/****************************************************************************

examples/integration_test.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import sys
2727
import json
2828
import gc
29-
import struct
3029
from copy import copy
3130

3231
try:
@@ -118,8 +117,7 @@ def verify_producer():
118117
p = confluent_kafka.Producer(**conf)
119118
print('producer at %s' % p)
120119

121-
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1'),
122-
('foobin', struct.pack('hhl', 10, 20, 30))]
120+
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1')]
123121

124122
# Produce some messages
125123
p.produce(topic, 'Hello Python!', headers=headers)
@@ -466,8 +464,6 @@ def print_wmark(consumer, parts):
466464

467465
first_msg = None
468466

469-
example_header = None
470-
471467
while True:
472468
# Consume until EOF or error
473469

@@ -521,15 +517,12 @@ def print_wmark(consumer, parts):
521517
print('Sync committed offset: %s' % offsets)
522518

523519
msgcnt += 1
524-
if msgcnt >= max_msgcnt and example_header is not None:
520+
if msgcnt >= max_msgcnt:
525521
print('max_msgcnt %d reached' % msgcnt)
526522
break
527523

528524
assert example_header, "We should have received at least one header"
529-
assert example_header == [(u'foo1', 'bar'),
530-
(u'foo1', 'bar2'),
531-
(u'foo2', '1'),
532-
('foobin', struct.pack('hhl', 10, 20, 30))]
525+
assert example_header == [(u'foo1', 'bar'), (u'foo1', 'bar2'), (u'foo2', '1')]
533526

534527
# Get current assignment
535528
assignment = c.assignment()

tests/test_Producer.py

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import pytest
33

44
from confluent_kafka import Producer, KafkaError, KafkaException, libversion
5-
from struct import pack
65

76

87
def error_cb(err):
@@ -66,25 +65,11 @@ def test_produce_headers():
6665
'error_cb': error_cb,
6766
'default.topic.config': {'message.timeout.ms': 10}})
6867

69-
binval = pack('hhl', 1, 2, 3)
70-
71-
headers_to_test = [
72-
[('headerkey', 'headervalue')],
73-
[('dupkey', 'dupvalue'), ('empty', ''), ('dupkey', 'dupvalue')],
74-
[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')],
75-
[('key_with_null_value', None)],
76-
[('binaryval', binval)],
77-
78-
{'headerkey': 'headervalue'},
79-
{'dupkey': 'dupvalue', 'empty': '', 'dupkey': 'dupvalue'}, # noqa: F601
80-
{'dupkey': 'dupvalue', 'dupkey': 'diffvalue'}, # noqa: F601
81-
{'key_with_null_value': None},
82-
{'binaryval': binval}
83-
]
84-
85-
for headers in headers_to_test:
86-
p.produce('mytopic', value='somedata', key='a key', headers=headers)
87-
p.produce('mytopic', value='somedata', headers=headers)
68+
p.produce('mytopic', value='somedata', key='a key', headers=[('headerkey', 'headervalue')])
69+
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'dupvalue')])
70+
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')])
71+
p.produce('mytopic', value='somedata', key='a key', headers=[('key_with_null_value', None)])
72+
p.produce('mytopic', value='somedata', key='a key', headers=[])
8873

8974
with pytest.raises(TypeError) as ex:
9075
p.produce('mytopic', value='somedata', key='a key', headers=[('malformed_header')])

0 commit comments

Comments
 (0)