diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index a25e36611..58a97e3c4 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -241,16 +241,22 @@ where fn handle_error_event(&self, event: NativePtr) -> Option { let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; if rdkafka_err.is_error() { - if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF { - let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; - let partition = unsafe { (*tp_ptr).partition }; - unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; - Some(KafkaError::PartitionEOF(partition)) - } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 { - Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into())) - } else { - Some(KafkaError::MessageConsumption(rdkafka_err.into())) - } + let error = + if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF { + let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; + let partition = unsafe { (*tp_ptr).partition }; + unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; + KafkaError::PartitionEOF(partition) + } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 { + KafkaError::MessageConsumptionFatal(rdkafka_err.into()) + } else { + KafkaError::MessageConsumption(rdkafka_err.into()) + }; + let reason = unsafe { + CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy() + }; + self.context().error(error.clone(), reason.trim()); + Some(error) } else { None } diff --git a/src/error.rs b/src/error.rs index 634fe9eee..2388a349b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -161,6 +161,8 @@ pub enum KafkaError { MessageConsumptionFatal(RDKafkaErrorCode), /// Message production error. MessageProduction(RDKafkaErrorCode), + /// Message production failed with fatal error. + MessageProductionFatal(RDKafkaErrorCode), /// Metadata fetch error. MetadataFetch(RDKafkaErrorCode), /// No message was received. @@ -225,6 +227,9 @@ impl fmt::Debug for KafkaError { KafkaError::MessageProduction(err) => { write!(f, "KafkaError (Message production error: {})", err) } + KafkaError::MessageProductionFatal(err) => { + write!(f, "(Fatal) KafkaError (Message production error: {})", err) + } KafkaError::MetadataFetch(err) => { write!(f, "KafkaError (Metadata fetch error: {})", err) } @@ -274,6 +279,9 @@ impl fmt::Display for KafkaError { write!(f, "(Fatal) Message consumption error: {}", err) } KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), + KafkaError::MessageProductionFatal(err) => { + write!(f, "(Fatal) Message production error: {}", err) + } KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), KafkaError::NoMessageReceived => { write!(f, "No message received within the given poll interval") @@ -309,6 +317,7 @@ impl Error for KafkaError { KafkaError::MessageConsumption(err) => Some(err), KafkaError::MessageConsumptionFatal(err) => Some(err), KafkaError::MessageProduction(err) => Some(err), + KafkaError::MessageProductionFatal(err) => Some(err), KafkaError::MetadataFetch(err) => Some(err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, @@ -350,6 +359,7 @@ impl KafkaError { KafkaError::MessageConsumption(err) => Some(*err), KafkaError::MessageConsumptionFatal(err) => Some(*err), KafkaError::MessageProduction(err) => Some(*err), + KafkaError::MessageProductionFatal(err) => Some(*err), KafkaError::MetadataFetch(err) => Some(*err), KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 0841bafba..db4a11384 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -408,11 +408,16 @@ where fn handle_error_event(&self, event: NativePtr) { let rdkafka_err = unsafe { rdsys::rd_kafka_event_error(event.ptr()) }; - let error = KafkaError::Global(rdkafka_err.into()); + let rdkafka_err_is_fatal = unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) }; + let error = if rdkafka_err_is_fatal != 0 { + KafkaError::MessageProductionFatal(rdkafka_err.into()) + } else { + KafkaError::MessageProduction(rdkafka_err.into()) + }; let reason = unsafe { CStr::from_ptr(rdsys::rd_kafka_event_error_string(event.ptr())).to_string_lossy() }; - self.context().error(error, reason.trim()); + self.context().error(error.clone(), reason.trim()); } /// Returns a pointer to the native Kafka client.