From 564d7d267152d088f8cdd63298a84ee8310f405e Mon Sep 17 00:00:00 2001 From: florin akermann Date: Tue, 29 Apr 2025 22:38:50 +0200 Subject: [PATCH] Refactor PartitionEOF to use TopicPartitionOffset struct Replaced the `PartitionEOF` error variant to include detailed information by introducing the `TopicPartitionOffset` struct. This change provides more context by encapsulating topic name, partition, and offset details, improving error reporting and debugging. --- src/consumer/base_consumer.rs | 14 ++++++++++-- src/error.rs | 4 ++-- src/message.rs | 16 ++++++++++++-- src/util.rs | 22 +++++++++++++++++++ tests/test_low_consumers.rs | 40 +++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 6 deletions(-) diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index a25e36611..c742da25b 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -25,7 +25,7 @@ use crate::log::trace; use crate::message::{BorrowedMessage, Message}; use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; -use crate::util::{cstr_to_owned, NativePtr, Timeout}; +use crate::util::{cstr_to_owned, NativePtr, Timeout, TopicPartitionOffset}; /// A low-level consumer that requires manual polling. /// @@ -244,8 +244,18 @@ where if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF { let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; let partition = unsafe { (*tp_ptr).partition }; + let topic = unsafe { + CStr::from_ptr((*tp_ptr).topic) + .to_string_lossy() + .into_owned() + }; + let offset = unsafe { (*tp_ptr).offset }; unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) }; - Some(KafkaError::PartitionEOF(partition)) + Some(KafkaError::PartitionEOF(TopicPartitionOffset{ + topic, + partition, + offset, + })) } else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 { Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into())) } else { diff --git a/src/error.rs b/src/error.rs index 634fe9eee..364ea5886 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; -use crate::util::{KafkaDrop, NativePtr}; +use crate::util::{KafkaDrop, NativePtr, TopicPartitionOffset}; // Re-export rdkafka error code pub use rdsys::types::RDKafkaErrorCode; @@ -170,7 +170,7 @@ pub enum KafkaError { /// Offset fetch failed. OffsetFetch(RDKafkaErrorCode), /// End of partition reached. - PartitionEOF(i32), + PartitionEOF(TopicPartitionOffset), /// Pause/Resume failed. PauseResume(String), /// Rebalance failed. diff --git a/src/message.rs b/src/message.rs index e72741219..5c2bd30f2 100644 --- a/src/message.rs +++ b/src/message.rs @@ -14,7 +14,7 @@ use rdkafka_sys::types::*; use crate::admin::NativeEvent; use crate::error::{IsError, KafkaError, KafkaResult}; -use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr}; +use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr, TopicPartitionOffset}; /// Timestamp of a Kafka message. #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Copy)] @@ -346,7 +346,19 @@ impl<'a> BorrowedMessage<'a> { if ptr.err.is_error() { let err = match ptr.err { rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => { - KafkaError::PartitionEOF(ptr.partition) + let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) }; + let partition = unsafe { (*tp_ptr).partition }; + let topic = unsafe { + CStr::from_ptr((*tp_ptr).topic) + .to_string_lossy() + .into_owned() + }; + let offset = unsafe { (*tp_ptr).offset }; + KafkaError::PartitionEOF(TopicPartitionOffset{ + topic, + partition, + offset, + }) } e => KafkaError::MessageConsumption(e.into()), }; diff --git a/src/util.rs b/src/util.rs index 719033780..319cbad06 100644 --- a/src/util.rs +++ b/src/util.rs @@ -37,6 +37,28 @@ pub(crate) enum Deadline { Never, } +/// Represents the coordinates of a kafka record +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct TopicPartitionOffset{ + /// The name of the Kafka topic. + pub topic: String, + /// The partition within the Kafka topic. + pub partition: i32, + /// The offset within the specified Kafka partition. + pub offset: i64, +} + + +impl fmt::Display for TopicPartitionOffset { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Topic: {}, Partition: {}, Offset: {}", + self.topic, self.partition, self.offset + ) + } +} + impl Deadline { // librdkafka's flush api requires an i32 millisecond timeout const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64); diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index c97802243..67516cf7d 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -555,3 +555,43 @@ async fn test_invalid_consumer_position() { Err(KafkaError::MetadataFetch(RDKafkaErrorCode::UnknownGroup)) ); } + +#[tokio::test] +async fn test_partition_eof_error_details() { + let _r = env_logger::try_init(); + let topic_name = rand_test_topic("test_partition_eof_error_details"); + let message_count = 5; + populate_topic(&topic_name, message_count, &value_fn, &key_fn, Some(0), None).await; + + let mut config_overrides = HashMap::new(); + config_overrides.insert("enable.partition.eof", "true"); + let consumer = create_base_consumer(&rand_test_group(), Some(config_overrides)); + + let mut tpl = TopicPartitionList::new(); + tpl.add_partition_offset(&topic_name, 0, Offset::Beginning).unwrap(); + consumer.assign(&tpl).unwrap(); + + for i in 0..message_count { + match consumer.poll(Timeout::from(Duration::from_secs(5))) { + Some(Ok(message)) => { + assert_eq!(message.offset(), i as i64); + assert_eq!(message.partition(), 0); + assert_eq!(message.topic(), topic_name); + } + Some(Err(e)) => panic!("Error receiving message: {:?}", e), + None => panic!("No message received within timeout"), + } + } + + // The next poll should return a PartitionEOF error with detailed information + match consumer.poll(Timeout::from(Duration::from_secs(5))) { + Some(Err(KafkaError::PartitionEOF(tpo))) => { + assert_eq!(tpo.topic, topic_name); + assert_eq!(tpo.partition, 0); + assert_eq!(tpo.offset, message_count as i64); + } + Some(Ok(_)) => panic!("Expected PartitionEOF error, got message"), + Some(Err(e)) => panic!("Expected PartitionEOF error, got: {:?}", e), + None => panic!("No message or error received within timeout"), + } +}