diff --git a/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs b/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs index e0b5467a23..12ae9250e9 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs +++ b/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs @@ -8,6 +8,8 @@ use crate::{ EventReceiver, }; use azure_core::Result; +use azure_core_amqp::message::AmqpAnnotationKey; +use azure_core_amqp::AmqpValue; use futures::Stream; use std::pin::Pin; use std::sync::{Arc, OnceLock, Weak}; @@ -127,58 +129,41 @@ impl PartitionClient { /// # Errors /// Returns an error if the sequence number or offset is invalid, or if updating the checkpoint fails. pub async fn update_checkpoint(&self, event_data: &ReceivedEventData) -> Result<()> { - let mut sequence_number: Option = None; - let mut offset: Option = None; + let mut offset_option = None; + let mut sequence_number_option = None; - let amqp_message = event_data.raw_amqp_message(); - if let Some(message_annotations) = &amqp_message.message_annotations { - for (key, value) in message_annotations.0.iter() { - if *key == crate::consumer::SEQUENCE_NUMBER_ANNOTATION { - match value { - azure_core_amqp::AmqpValue::UInt(value) => { - sequence_number = Some(*value as i64); - } - azure_core_amqp::AmqpValue::ULong(value) => { - sequence_number = Some(*value as i64); - } - azure_core_amqp::AmqpValue::Long(value) => { - sequence_number = Some(*value); - } - azure_core_amqp::AmqpValue::Int(value) => { - sequence_number = Some(*value as i64); - } - _ => { - return Err(azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Other, - "Invalid sequence number", - )); - } - } - } else if *key == crate::consumer::OFFSET_ANNOTATION { - match value { - azure_core_amqp::AmqpValue::String(value) => { - offset = Some(value.to_string()); - } - _ => { - return Err(azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Other, - "Invalid offset", - )); - } - } - } - } - let checkpoint = Checkpoint { - fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(), - event_hub_name: self.client_details.eventhub_name.clone(), - consumer_group: self.client_details.consumer_group.clone(), - partition_id: self.partition_id.clone(), - offset, - sequence_number, + let event_data_message = event_data.raw_amqp_message(); + let Some(message_annotations) = event_data_message.message_annotations.as_ref() else { + // No message annotations. Nothing to do. + return Ok(()); + }; + for (key, value) in message_annotations.0.iter() { + let AmqpAnnotationKey::Symbol(symbol) = key else { + continue; }; - self.checkpoint_store.update_checkpoint(checkpoint).await?; + + if *symbol == "x-opt-offset" { + let AmqpValue::String(offset_value) = value else { + continue; + }; + offset_option = Some(offset_value.clone()); + } else if *symbol == "x-opt-sequence-number" { + let AmqpValue::Long(sequence_number_value) = value else { + continue; + }; + sequence_number_option = Some(*sequence_number_value); + } } - Ok(()) + + let checkpoint = Checkpoint { + fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(), + event_hub_name: self.client_details.eventhub_name.clone(), + consumer_group: self.client_details.consumer_group.clone(), + partition_id: self.partition_id.clone(), + offset: offset_option, + sequence_number: sequence_number_option, + }; + self.checkpoint_store.update_checkpoint(checkpoint).await } pub(crate) fn set_event_receiver(&self, event_receiver: EventReceiver) -> Result<()> {