Skip to content

Commit 0e7b694

Browse files
committed
Added error_cb for propagating generic errors from librdkafka to app (issue #14)
1 parent fdaa0d4 commit 0e7b694

File tree

4 files changed

+79
-1
lines changed

4 files changed

+79
-1
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,39 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
801801
}
802802

803803

804+
/****************************************************************************
805+
*
806+
*
807+
* Common callbacks
808+
*
809+
*
810+
*
811+
*
812+
****************************************************************************/
813+
static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
814+
Handle *h = opaque;
815+
PyObject *eo, *result;
816+
817+
PyEval_RestoreThread(h->thread_state);
818+
if (!h->error_cb) {
819+
/* No callback defined */
820+
goto done;
821+
}
822+
823+
eo = KafkaError_new0(err, "%s", reason);
824+
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
825+
Py_DECREF(eo);
826+
827+
if (result) {
828+
Py_DECREF(result);
829+
} else {
830+
h->callback_crashed++;
831+
rd_kafka_yield(h->rk);
832+
}
833+
834+
done:
835+
h->thread_state = PyEval_SaveThread();
836+
}
804837

805838

806839
/****************************************************************************
@@ -814,6 +847,28 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
814847
****************************************************************************/
815848

816849

850+
851+
/**
852+
* Clear Python object references in Handle
853+
*/
854+
void Handle_clear (Handle *h) {
855+
if (h->error_cb) {
856+
Py_DECREF(h->error_cb);
857+
}
858+
}
859+
860+
/**
861+
* GC traversal for Python object references
862+
*/
863+
int Handle_traverse (Handle *h, visitproc visit, void *arg) {
864+
if (h->error_cb)
865+
Py_VISIT(h->error_cb);
866+
867+
return 0;
868+
}
869+
870+
871+
817872
/**
818873
* Populate topic conf from provided dict.
819874
*
@@ -1053,6 +1108,14 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
10531108

10541109
Py_DECREF(ks);
10551110
continue;
1111+
1112+
} else if (!strcmp(k, "error_cb")) {
1113+
if (h->error_cb)
1114+
Py_DECREF(h->error_cb);
1115+
h->error_cb = vo;
1116+
Py_INCREF(h->error_cb);
1117+
Py_DECREF(ks);
1118+
continue;
10561119
}
10571120

10581121
/* Special handling for certain config keys. */
@@ -1102,6 +1165,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11021165
Py_DECREF(ks);
11031166
}
11041167

1168+
if (h->error_cb)
1169+
rd_kafka_conf_set_error_cb(conf, error_cb);
11051170
rd_kafka_topic_conf_set_opaque(tconf, h);
11061171
rd_kafka_conf_set_default_topic_conf(conf, tconf);
11071172

docs/index.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ The Python bindings also provide some additional configuration properties:
7878
* ``default.topic.config``: value is a dict of topic-level configuration
7979
properties that are applied to all used topics for the instance.
8080

81+
* ``error_cb``: Callback for generic/global error events. This callback is served by
82+
poll().
83+
8184
* ``on_delivery`` (**Producer**): value is a Python function reference
8285
that is called once for each produced message to indicate the final
8386
delivery result (success or failure).

examples/integration_test.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737

3838

3939

40+
def error_cb (err):
41+
print('Error: %s' % err)
4042

4143
class MyTestDr(object):
4244
""" Producer: Delivery report callback """
@@ -77,6 +79,7 @@ def verify_producer():
7779

7880
# Producer config
7981
conf = {'bootstrap.servers': bootstrap_servers,
82+
'error_cb': error_cb,
8083
'default.topic.config':{'produce.offset.report': True}}
8184

8285
# Create producer
@@ -112,7 +115,8 @@ def verify_producer():
112115

113116
def verify_producer_performance(with_dr_cb=True):
114117
""" Time how long it takes to produce and delivery X messages """
115-
conf = {'bootstrap.servers': bootstrap_servers}
118+
conf = {'bootstrap.servers': bootstrap_servers,
119+
'error_cb': error_cb}
116120

117121
p = confluent_kafka.Producer(**conf)
118122

@@ -207,6 +211,7 @@ def verify_consumer():
207211
'session.timeout.ms': 6000,
208212
'enable.auto.commit': False,
209213
'on_commit': print_commit_result,
214+
'error_cb': error_cb,
210215
'default.topic.config': {
211216
'auto.offset.reset': 'earliest'
212217
}}
@@ -275,6 +280,7 @@ def verify_consumer_performance():
275280
conf = {'bootstrap.servers': bootstrap_servers,
276281
'group.id': uuid.uuid1(),
277282
'session.timeout.ms': 6000,
283+
'error_cb': error_cb,
278284
'default.topic.config': {
279285
'auto.offset.reset': 'earliest'
280286
}}

tests/test_Producer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ def test_basic_api():
1313
assert str(e) == "expected configuration dict"
1414

1515

16+
def error_cb (err):
17+
print('error_cb', err)
18+
1619
p = Producer({'socket.timeout.ms':10,
20+
'error_cb': error_cb,
1721
'default.topic.config': {'message.timeout.ms': 10}})
1822

1923
p.produce('mytopic')

0 commit comments

Comments
 (0)