Skip to content

Commit cd28388

Browse files
committed
Fix constructor config handle to allow both confdict and kwargs
Supported parameter constellations: - kwargs (conf={..}, logger=..) - args and kwargs ({..}, logger=..) - args ({..})
1 parent 9c53580 commit cd28388

File tree

1 file changed

+38
-11
lines changed

1 file changed

+38
-11
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,6 +1371,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
13711371
rd_kafka_topic_conf_t *tconf;
13721372
Py_ssize_t pos = 0;
13731373
PyObject *ko, *vo;
1374+
PyObject *confdict = NULL;
13741375
int32_t (*partitioner_cb) (const rd_kafka_topic_t *,
13751376
const void *, size_t, int32_t,
13761377
void *, void *) = partitioner_cb;
@@ -1383,15 +1384,41 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
13831384
return NULL;
13841385
}
13851386

1386-
if (!kwargs) {
1387-
/* If no kwargs, fall back on single dict arg, if any. */
1388-
if (!args || !PyTuple_Check(args) || PyTuple_Size(args) < 1 ||
1389-
!PyDict_Check((kwargs = PyTuple_GetItem(args, 0)))) {
1390-
PyErr_SetString(PyExc_TypeError,
1391-
"expected configuration dict");
1392-
return NULL;
1393-
}
1394-
}
1387+
/* Supported parameter constellations:
1388+
* - kwargs (conf={..}, logger=..)
1389+
* - args and kwargs ({..}, logger=..)
1390+
* - args ({..})
1391+
* When both args and kwargs are present the kwargs take
1392+
* precedence in case of duplicate keys.
1393+
* All keys map to configuration properties.
1394+
*/
1395+
if (args) {
1396+
if (!PyTuple_Check(args) ||
1397+
PyTuple_Size(args) > 1) {
1398+
PyErr_SetString(PyExc_TypeError,
1399+
"expected tuple containing single dict");
1400+
return NULL;
1401+
} else if (PyTuple_Size(args) == 1 &&
1402+
!PyDict_Check((confdict = PyTuple_GetItem(args, 0)))) {
1403+
PyErr_SetString(PyExc_TypeError,
1404+
"expected configuration dict");
1405+
return NULL;
1406+
}
1407+
}
1408+
1409+
if (!confdict) {
1410+
if (!kwargs) {
1411+
PyErr_SetString(PyExc_TypeError,
1412+
"expected configuration dict");
1413+
return NULL;
1414+
}
1415+
1416+
confdict = kwargs;
1417+
1418+
} else if (kwargs) {
1419+
/* Update confdict with kwargs */
1420+
PyDict_Update(confdict, kwargs);
1421+
}
13951422

13961423
conf = rd_kafka_conf_new();
13971424
tconf = rd_kafka_topic_conf_new();
@@ -1403,8 +1430,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
14031430
/* Enable valid offsets in delivery reports */
14041431
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);
14051432

1406-
/* Convert kwargs dict to config key-value pairs. */
1407-
while (PyDict_Next(kwargs, &pos, &ko, &vo)) {
1433+
/* Convert config dict to config key-value pairs. */
1434+
while (PyDict_Next(confdict, &pos, &ko, &vo)) {
14081435
PyObject *ks, *ks8;
14091436
PyObject *vs = NULL, *vs8 = NULL;
14101437
const char *k;

0 commit comments

Comments
 (0)