Skip to content

Commit d6c8539

Browse files
rnpridgeonRyan P
authored andcommitted
Ensure plugin configs handled in proper order
1 parent 7ceb398 commit d6c8539

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,6 +1568,47 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
15681568
/* Enable valid offsets in delivery reports */
15691569
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);
15701570

1571+
/*
1572+
* Plugins must be configured prior to handling any of their configuration properties.
1573+
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here.
1574+
* This ensures all configuration properties are handled in the right order.
1575+
*/
1576+
if ((vo = PyDict_GetItemString(confdict, "plugin.library.paths"))) {
1577+
const char *v;
1578+
char errstr[256];
1579+
PyObject *vs = NULL, *vs8 = NULL;
1580+
1581+
if (!(vs = cfl_PyObject_Unistr(vo))) {
1582+
PyErr_SetString(PyExc_TypeError,
1583+
"expected configuration property name "
1584+
"as type unicode string");
1585+
rd_kafka_topic_conf_destroy(tconf);
1586+
rd_kafka_conf_destroy(conf);
1587+
return NULL;
1588+
}
1589+
1590+
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
1591+
1592+
if (rd_kafka_conf_set(conf, "plugin.library.paths", v, errstr, sizeof(errstr))
1593+
!= RD_KAFKA_CONF_OK) {
1594+
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
1595+
"%s", NULL);
1596+
1597+
rd_kafka_topic_conf_destroy(tconf);
1598+
rd_kafka_conf_destroy(conf);
1599+
1600+
Py_XDECREF(vs8);
1601+
Py_XDECREF(vs);
1602+
1603+
return NULL;
1604+
}
1605+
1606+
Py_XDECREF(vs8);
1607+
Py_DECREF(vs);
1608+
1609+
PyDict_DelItemString(confdict, "plugin.library.paths");
1610+
}
1611+
15711612
/* Convert config dict to config key-value pairs. */
15721613
while (PyDict_Next(confdict, &pos, &ko, &vo)) {
15731614
PyObject *ks, *ks8;

tests/test_misc.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import confluent_kafka
44
import json
55
import pytest
6+
import os
67

78

89
def test_version():
@@ -123,3 +124,23 @@ def test_throttle_event_types():
123124
assert isinstance(throttle_event.broker_id, int) and throttle_event.broker_id == 0
124125
assert isinstance(throttle_event.throttle_time, float) and throttle_event.throttle_time == 10.0
125126
assert str(throttle_event) == "broker/0 throttled for 10000 ms"
127+
128+
129+
@pytest.mark.skipif(len([True for x in (".so", ".dylib", ".dll")
130+
if os.path.exists("monitoring-interceptor" + x)]) == 0,
131+
reason="requires confluent-librdkafka-plugins be installed and copied to the current directory")
132+
@pytest.mark.parametrize("init_func", [
133+
(confluent_kafka.Consumer),
134+
(confluent_kafka.Producer),
135+
(confluent_kafka.admin.AdminClient),
136+
])
137+
def test_unordered_dict(init_func):
138+
"""
139+
Interceptor configs can only be handled after the plugin has been loaded not before.
140+
"""
141+
init_func({'confluent.monitoring.interceptor.publishMs': 1000,
142+
'confluent.monitoring.interceptor.sessionDurationMs': 1000,
143+
'plugin.library.paths': 'monitoring-interceptor',
144+
'confluent.monitoring.interceptor.topic': 'confluent-kafka-testing',
145+
'confluent.monitoring.interceptor.icdebug': False
146+
})

0 commit comments

Comments
 (0)