Skip to content

Commit 8e85eea

Browse files
committed
feat: Add error callback in the ProducerContext
1 parent 01653b4 commit 8e85eea

File tree

4 files changed

+39
-3
lines changed

4 files changed

+39
-3
lines changed

src/producer/base_producer.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use crate::topic_partition_list::TopicPartitionList;
7070
use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout};
7171

7272
pub use crate::message::DeliveryResult;
73+
use crate::ClientContext;
7374

7475
use super::NoCustomPartitioner;
7576

@@ -408,11 +409,17 @@ where
408409

409410
fn handle_error_event(&self, event: NativePtr<RDKafkaEvent>) {
410411
let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) };
411-
let error = KafkaError::Global(rdkafka_err.into());
412+
let rdkafka_err_is_fatal = unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) };
413+
let error = if rdkafka_err_is_fatal != 0 {
414+
KafkaError::MessageProductionFatal(rdkafka_err.into())
415+
} else {
416+
KafkaError::MessageProduction(rdkafka_err.into())
417+
};
412418
let reason = unsafe {
413419
CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy()
414420
};
415-
self.context().error(error, reason.trim());
421+
<C as ClientContext>::error(self.context(), error.clone(), reason.trim());
422+
<C as ProducerContext<Part>>::error_callback(self.context(), error, reason.trim());
416423
}
417424

418425
/// Returns a pointer to the native Kafka client.

src/producer/future_producer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ where
201201
};
202202
let _ = tx.send(owned_delivery_result); // TODO: handle error
203203
}
204+
205+
fn error_callback(&self, error: KafkaError, reason: &str) {
206+
C::error(&self.wrapped_context, error, reason);
207+
}
204208
}
205209

206210
/// A producer that returns a [`Future`] for every message being produced.
@@ -450,6 +454,10 @@ mod tests {
450454
fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {
451455
unimplemented!()
452456
}
457+
458+
fn error_callback(&self, _: KafkaError, _: &str) {
459+
unimplemented!()
460+
}
453461
}
454462

455463
// Verify that the future producer is clone, according to documentation.

src/producer/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ use std::sync::Arc;
163163

164164
use crate::client::{Client, ClientContext};
165165
use crate::consumer::ConsumerGroupMetadata;
166-
use crate::error::KafkaResult;
166+
use crate::error::{KafkaError, KafkaResult};
167167
use crate::topic_partition_list::TopicPartitionList;
168168
use crate::util::{IntoOpaque, Timeout};
169169

@@ -201,6 +201,9 @@ pub trait ProducerContext<Part: Partitioner = NoCustomPartitioner>: ClientContex
201201
/// when calling send.
202202
fn delivery(&self, delivery_result: &DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque);
203203

204+
/// Error callback
205+
fn error_callback(&self, error: KafkaError, reason: &str);
206+
204207
/// This method is called when creating producer in order to optionally register custom partitioner.
205208
/// If custom partitioner is not used then `partitioner` configuration property is used (or its default).
206209
///
@@ -264,6 +267,8 @@ impl ProducerContext<NoCustomPartitioner> for DefaultProducerContext {
264267
type DeliveryOpaque = ();
265268

266269
fn delivery(&self, _: &DeliveryResult<'_>, _: Self::DeliveryOpaque) {}
270+
271+
fn error_callback(&self, _: KafkaError, _: &str) {}
267272
}
268273

269274
/// Common trait for all producers.

tests/test_low_producers.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ impl ProducerContext for PrintingContext {
4242
fn delivery(&self, delivery_result: &DeliveryResult, delivery_opaque: Self::DeliveryOpaque) {
4343
println!("Delivery: {:?} {:?}", delivery_result, delivery_opaque);
4444
}
45+
46+
fn error_callback(&self, error: KafkaError, reason: &str) {
47+
println!("Error: {reason:?} {error:?}");
48+
}
4549
}
4650

4751
type TestProducerDeliveryResult = (OwnedMessage, Option<KafkaError>, usize);
@@ -100,6 +104,10 @@ impl<Part: Partitioner + Send + Sync> ProducerContext<Part> for CollectingContex
100104
Some(p) => Some(p),
101105
}
102106
}
107+
108+
fn error_callback(&self, _: KafkaError, _: &str) {
109+
unimplemented!()
110+
}
103111
}
104112

105113
// Partitioner sending all messages to single, defined partition.
@@ -340,6 +348,10 @@ impl ProducerContext for HeaderCheckContext {
340348
}
341349
(*self.ids.lock().unwrap()).insert(message_id);
342350
}
351+
352+
fn error_callback(&self, _: KafkaError, _: &str) {
353+
unimplemented!()
354+
}
343355
}
344356

345357
#[test]
@@ -429,6 +441,10 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
429441
let mut shared_count = opaque.lock().unwrap();
430442
*shared_count += 1;
431443
}
444+
445+
fn error_callback(&self, _: KafkaError, _: &str) {
446+
unimplemented!()
447+
}
432448
}
433449

434450
let shared_count = Arc::new(Mutex::new(0));

0 commit comments

Comments
 (0)