Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::log::trace;
use crate::message::{BorrowedMessage, Message};
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, NativePtr, Timeout};
use crate::util::{cstr_to_owned, NativePtr, Timeout, TopicPartitionOffset};

/// A low-level consumer that requires manual polling.
///
Expand Down Expand Up @@ -244,8 +244,18 @@ where
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
let topic = unsafe {
CStr::from_ptr((*tp_ptr).topic)
.to_string_lossy()
.into_owned()
};
let offset = unsafe { (*tp_ptr).offset };
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
Some(KafkaError::PartitionEOF(partition))
Some(KafkaError::PartitionEOF(TopicPartitionOffset{
topic,
partition,
offset,
}))
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::util::{KafkaDrop, NativePtr};
use crate::util::{KafkaDrop, NativePtr, TopicPartitionOffset};

// Re-export rdkafka error code
pub use rdsys::types::RDKafkaErrorCode;
Expand Down Expand Up @@ -170,7 +170,7 @@ pub enum KafkaError {
/// Offset fetch failed.
OffsetFetch(RDKafkaErrorCode),
/// End of partition reached.
PartitionEOF(i32),
PartitionEOF(TopicPartitionOffset),
/// Pause/Resume failed.
PauseResume(String),
/// Rebalance failed.
Expand Down
16 changes: 14 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use rdkafka_sys::types::*;

use crate::admin::NativeEvent;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr, TopicPartitionOffset};

/// Timestamp of a Kafka message.
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Copy)]
Expand Down Expand Up @@ -346,7 +346,19 @@ impl<'a> BorrowedMessage<'a> {
if ptr.err.is_error() {
let err = match ptr.err {
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
KafkaError::PartitionEOF(ptr.partition)
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
let partition = unsafe { (*tp_ptr).partition };
let topic = unsafe {
CStr::from_ptr((*tp_ptr).topic)
.to_string_lossy()
.into_owned()
};
let offset = unsafe { (*tp_ptr).offset };
KafkaError::PartitionEOF(TopicPartitionOffset{
topic,
partition,
offset,
})
}
e => KafkaError::MessageConsumption(e.into()),
};
Expand Down
22 changes: 22 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ pub(crate) enum Deadline {
Never,
}

/// Represents the coordinates of a kafka record
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct TopicPartitionOffset{
/// The name of the Kafka topic.
pub topic: String,
/// The partition within the Kafka topic.
pub partition: i32,
/// The offset within the specified Kafka partition.
pub offset: i64,
}


impl fmt::Display for TopicPartitionOffset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Topic: {}, Partition: {}, Offset: {}",
self.topic, self.partition, self.offset
)
}
}

impl Deadline {
// librdkafka's flush api requires an i32 millisecond timeout
const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);
Expand Down
40 changes: 40 additions & 0 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,43 @@ async fn test_invalid_consumer_position() {
Err(KafkaError::MetadataFetch(RDKafkaErrorCode::UnknownGroup))
);
}

#[tokio::test]
async fn test_partition_eof_error_details() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic("test_partition_eof_error_details");
let message_count = 5;
populate_topic(&topic_name, message_count, &value_fn, &key_fn, Some(0), None).await;

let mut config_overrides = HashMap::new();
config_overrides.insert("enable.partition.eof", "true");
let consumer = create_base_consumer(&rand_test_group(), Some(config_overrides));

let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning).unwrap();
consumer.assign(&tpl).unwrap();

for i in 0..message_count {
match consumer.poll(Timeout::from(Duration::from_secs(5))) {
Some(Ok(message)) => {
assert_eq!(message.offset(), i as i64);
assert_eq!(message.partition(), 0);
assert_eq!(message.topic(), topic_name);
}
Some(Err(e)) => panic!("Error receiving message: {:?}", e),
None => panic!("No message received within timeout"),
}
}

// The next poll should return a PartitionEOF error with detailed information
match consumer.poll(Timeout::from(Duration::from_secs(5))) {
Some(Err(KafkaError::PartitionEOF(tpo))) => {
assert_eq!(tpo.topic, topic_name);
assert_eq!(tpo.partition, 0);
assert_eq!(tpo.offset, message_count as i64);
}
Some(Ok(_)) => panic!("Expected PartitionEOF error, got message"),
Some(Err(e)) => panic!("Expected PartitionEOF error, got: {:?}", e),
None => panic!("No message or error received within timeout"),
}
}