From c22a587c918ae11a9a1ccce6af9a262a470f85db Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Tue, 7 Oct 2025 22:08:09 -0400 Subject: [PATCH 1/2] Getting offset and sequence number from event data rather than amqp annotations. --- .../src/event_processor/partition_client.rs | 61 +++---------------- 1 file changed, 9 insertions(+), 52 deletions(-) diff --git a/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs b/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs index e0b5467a23..c623bb4bbf 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs +++ b/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs @@ -127,58 +127,15 @@ impl PartitionClient { /// # Errors /// Returns an error if the sequence number or offset is invalid, or if updating the checkpoint fails. pub async fn update_checkpoint(&self, event_data: &ReceivedEventData) -> Result<()> { - let mut sequence_number: Option = None; - let mut offset: Option = None; - - let amqp_message = event_data.raw_amqp_message(); - if let Some(message_annotations) = &amqp_message.message_annotations { - for (key, value) in message_annotations.0.iter() { - if *key == crate::consumer::SEQUENCE_NUMBER_ANNOTATION { - match value { - azure_core_amqp::AmqpValue::UInt(value) => { - sequence_number = Some(*value as i64); - } - azure_core_amqp::AmqpValue::ULong(value) => { - sequence_number = Some(*value as i64); - } - azure_core_amqp::AmqpValue::Long(value) => { - sequence_number = Some(*value); - } - azure_core_amqp::AmqpValue::Int(value) => { - sequence_number = Some(*value as i64); - } - _ => { - return Err(azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Other, - "Invalid sequence number", - )); - } - } - } else if *key == crate::consumer::OFFSET_ANNOTATION { - match value { - azure_core_amqp::AmqpValue::String(value) => { - offset = Some(value.to_string()); - } - _ => { - return Err(azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Other, - "Invalid offset", - )); - } - } - } - } - let checkpoint = Checkpoint { - fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(), - event_hub_name: self.client_details.eventhub_name.clone(), - consumer_group: self.client_details.consumer_group.clone(), - partition_id: self.partition_id.clone(), - offset, - sequence_number, - }; - self.checkpoint_store.update_checkpoint(checkpoint).await?; - } - Ok(()) + let checkpoint = Checkpoint { + fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(), + event_hub_name: self.client_details.eventhub_name.clone(), + consumer_group: self.client_details.consumer_group.clone(), + partition_id: self.partition_id.clone(), + offset: event_data.offset().clone(), + sequence_number: event_data.sequence_number(), + }; + self.checkpoint_store.update_checkpoint(checkpoint).await } pub(crate) fn set_event_receiver(&self, event_receiver: EventReceiver) -> Result<()> { From af3dc26224a97027f6739f4de39ddb22fde07b3e Mon Sep 17 00:00:00 2001 From: Johnathan W Date: Wed, 8 Oct 2025 10:00:13 -0400 Subject: [PATCH 2/2] Going back to the previous structure. Cleaned up the logic path. Used correct &str consts. --- .../src/event_processor/partition_client.rs | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs b/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs index c623bb4bbf..12ae9250e9 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs +++ b/sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs @@ -8,6 +8,8 @@ use crate::{ EventReceiver, }; use azure_core::Result; +use azure_core_amqp::message::AmqpAnnotationKey; +use azure_core_amqp::AmqpValue; use futures::Stream; use std::pin::Pin; use std::sync::{Arc, OnceLock, Weak}; @@ -127,13 +129,39 @@ impl PartitionClient { /// # Errors /// Returns an error if the sequence number or offset is invalid, or if updating the checkpoint fails. pub async fn update_checkpoint(&self, event_data: &ReceivedEventData) -> Result<()> { + let mut offset_option = None; + let mut sequence_number_option = None; + + let event_data_message = event_data.raw_amqp_message(); + let Some(message_annotations) = event_data_message.message_annotations.as_ref() else { + // No message annotations. Nothing to do. + return Ok(()); + }; + for (key, value) in message_annotations.0.iter() { + let AmqpAnnotationKey::Symbol(symbol) = key else { + continue; + }; + + if *symbol == "x-opt-offset" { + let AmqpValue::String(offset_value) = value else { + continue; + }; + offset_option = Some(offset_value.clone()); + } else if *symbol == "x-opt-sequence-number" { + let AmqpValue::Long(sequence_number_value) = value else { + continue; + }; + sequence_number_option = Some(*sequence_number_value); + } + } + let checkpoint = Checkpoint { fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(), event_hub_name: self.client_details.eventhub_name.clone(), consumer_group: self.client_details.consumer_group.clone(), partition_id: self.partition_id.clone(), - offset: event_data.offset().clone(), - sequence_number: event_data.sequence_number(), + offset: offset_option, + sequence_number: sequence_number_option, }; self.checkpoint_store.update_checkpoint(checkpoint).await }