Skip to content

Commit 9c2d2a9

Browse files
committed
feat: Add error callback in the ConsumerContext
1 parent 8e85eea commit 9c2d2a9

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

src/consumer/base_consumer.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,16 +241,22 @@ where
241241
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) -> Option<KafkaError> {
242242
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
243243
if rdkafka_err.is_error() {
244-
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
245-
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
246-
let partition = unsafe { (*tp_ptr).partition };
247-
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
248-
Some(KafkaError::PartitionEOF(partition))
249-
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
250-
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
251-
} else {
252-
Some(KafkaError::MessageConsumption(rdkafka_err.into()))
253-
}
244+
let error =
245+
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
246+
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
247+
let partition = unsafe { (*tp_ptr).partition };
248+
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
249+
KafkaError::PartitionEOF(partition)
250+
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
251+
KafkaError::MessageConsumptionFatal(rdkafka_err.into())
252+
} else {
253+
KafkaError::MessageConsumption(rdkafka_err.into())
254+
};
255+
let reason = unsafe {
256+
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
257+
};
258+
self.context().error_callback(error.clone(), reason.trim());
259+
Some(error)
254260
} else {
255261
None
256262
}

src/consumer/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ pub trait ConsumerContext: ClientContext + Sized {
130130
fn main_queue_min_poll_interval(&self) -> Timeout {
131131
Timeout::After(Duration::from_secs(1))
132132
}
133+
134+
/// Error callback
135+
#[allow(unused_variables)]
136+
fn error_callback(&self, error: KafkaError, reason: &str) {}
133137
}
134138

135139
/// An inert [`ConsumerContext`] that can be used when no customizations are

0 commit comments

Comments
 (0)