Skip to content

Commit 1937de2

Browse files
rnpridgeonRyan P
authored andcommitted
Copy config dict priort to mutating contents
.......
1 parent d6c8539 commit 1937de2

File tree

2 files changed

+39
-11
lines changed

2 files changed

+39
-11
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,8 +1529,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15291529
* When both args and kwargs are present the kwargs take
15301530
* precedence in case of duplicate keys.
15311531
* All keys map to configuration properties.
1532+
*
1533+
* Copy configuration dict to avoid manipulating application config.
15321534
*/
1533-
if (args) {
1535+
if (args && PyTuple_Size(args)) {
15341536
if (!PyTuple_Check(args) ||
15351537
PyTuple_Size(args) > 1) {
15361538
PyErr_SetString(PyExc_TypeError,
@@ -1542,6 +1544,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15421544
"expected configuration dict");
15431545
return NULL;
15441546
}
1547+
confdict = PyDict_Copy(confdict);
15451548
}
15461549

15471550
if (!confdict) {
@@ -1551,7 +1554,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15511554
return NULL;
15521555
}
15531556

1554-
confdict = kwargs;
1557+
confdict = PyDict_Copy(kwargs);
15551558

15561559
} else if (kwargs) {
15571560
/* Update confdict with kwargs */
@@ -1568,11 +1571,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15681571
/* Enable valid offsets in delivery reports */
15691572
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);
15701573

1571-
/*
1572-
* Plugins must be configured prior to handling any of their configuration properties.
1573-
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here.
1574-
* This ensures all configuration properties are handled in the right order.
1575-
*/
1574+
/*
1575+
* Plugins must be configured prior to handling any of their configuration properties.
1576+
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here.
1577+
* This ensures plugin configuration properties are handled in the correct order.
1578+
*/
15761579
if ((vo = PyDict_GetItemString(confdict, "plugin.library.paths"))) {
15771580
const char *v;
15781581
char errstr[256];
@@ -1584,6 +1587,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15841587
"as type unicode string");
15851588
rd_kafka_topic_conf_destroy(tconf);
15861589
rd_kafka_conf_destroy(conf);
1590+
Py_DECREF(confdict);
1591+
15871592
return NULL;
15881593
}
15891594

@@ -1592,10 +1597,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15921597
if (rd_kafka_conf_set(conf, "plugin.library.paths", v, errstr, sizeof(errstr))
15931598
!= RD_KAFKA_CONF_OK) {
15941599
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1595-
"%s", NULL);
1600+
"%s", errstr);
15961601

15971602
rd_kafka_topic_conf_destroy(tconf);
15981603
rd_kafka_conf_destroy(conf);
1604+
Py_DECREF(confdict);
15991605

16001606
Py_XDECREF(vs8);
16011607
Py_XDECREF(vs);
@@ -1624,6 +1630,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16241630
"as type unicode string");
16251631
rd_kafka_topic_conf_destroy(tconf);
16261632
rd_kafka_conf_destroy(conf);
1633+
Py_DECREF(confdict);
1634+
16271635
return NULL;
16281636
}
16291637

@@ -1633,6 +1641,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16331641
Py_DECREF(ks);
16341642
rd_kafka_topic_conf_destroy(tconf);
16351643
rd_kafka_conf_destroy(conf);
1644+
Py_DECREF(confdict);
16361645
return NULL;
16371646
}
16381647
Py_XDECREF(ks8);
@@ -1646,8 +1655,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16461655
"as a callable function");
16471656
rd_kafka_topic_conf_destroy(tconf);
16481657
rd_kafka_conf_destroy(conf);
1658+
Py_DECREF(confdict);
1659+
16491660
Py_XDECREF(ks8);
16501661
Py_DECREF(ks);
1662+
16511663
return NULL;
16521664
}
16531665
if (h->error_cb) {
@@ -1668,8 +1680,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16681680
"as a callable function");
16691681
rd_kafka_topic_conf_destroy(tconf);
16701682
rd_kafka_conf_destroy(conf);
1683+
Py_DECREF(confdict);
1684+
16711685
Py_XDECREF(ks8);
16721686
Py_DECREF(ks);
1687+
16731688
return NULL;
16741689
}
16751690
if (h->throttle_cb) {
@@ -1690,8 +1705,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
16901705
"as a callable function");
16911706
rd_kafka_topic_conf_destroy(tconf);
16921707
rd_kafka_conf_destroy(conf);
1708+
Py_DECREF(confdict);
1709+
16931710
Py_XDECREF(ks8);
16941711
Py_DECREF(ks);
1712+
16951713
return NULL;
16961714
}
16971715

@@ -1732,6 +1750,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17321750
Py_DECREF(ks);
17331751
rd_kafka_topic_conf_destroy(tconf);
17341752
rd_kafka_conf_destroy(conf);
1753+
Py_DECREF(confdict);
1754+
17351755
return NULL;
17361756

17371757
} else if (r == 1) {
@@ -1753,8 +1773,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17531773
"unicode string");
17541774
rd_kafka_topic_conf_destroy(tconf);
17551775
rd_kafka_conf_destroy(conf);
1776+
Py_DECREF(confdict);
1777+
17561778
Py_XDECREF(ks8);
17571779
Py_DECREF(ks);
1780+
17581781
return NULL;
17591782
}
17601783
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
@@ -1766,10 +1789,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17661789
"%s", errstr);
17671790
rd_kafka_topic_conf_destroy(tconf);
17681791
rd_kafka_conf_destroy(conf);
1792+
Py_DECREF(confdict);
1793+
17691794
Py_XDECREF(vs8);
17701795
Py_XDECREF(vs);
17711796
Py_XDECREF(ks8);
17721797
Py_DECREF(ks);
1798+
17731799
return NULL;
17741800
}
17751801

@@ -1779,6 +1805,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
17791805
Py_DECREF(ks);
17801806
}
17811807

1808+
Py_DECREF(confdict);
1809+
17821810
if (h->error_cb)
17831811
rd_kafka_conf_set_error_cb(conf, error_cb);
17841812

tests/test_misc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ def test_throttle_event_types():
130130
if os.path.exists("monitoring-interceptor" + x)]) == 0,
131131
reason="requires confluent-librdkafka-plugins be installed and copied to the current directory")
132132
@pytest.mark.parametrize("init_func", [
133-
(confluent_kafka.Consumer),
134-
(confluent_kafka.Producer),
135-
(confluent_kafka.admin.AdminClient),
133+
confluent_kafka.Consumer,
134+
confluent_kafka.Producer,
135+
confluent_kafka.admin.AdminClient,
136136
])
137137
def test_unordered_dict(init_func):
138138
"""

0 commit comments

Comments
 (0)