Skip to content

Commit adff670

Browse files
authored
Getting offset and sequence number from event data rather than amqp annotations. (#3148)
This PR addresses: #3073 Properly retrieve the offset and sequence numbers to properly store checkpoints.
1 parent c990b53 commit adff670

File tree

1 file changed

+34
-49
lines changed

1 file changed

+34
-49
lines changed

sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs

Lines changed: 34 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use crate::{
88
EventReceiver,
99
};
1010
use azure_core::Result;
11+
use azure_core_amqp::message::AmqpAnnotationKey;
12+
use azure_core_amqp::AmqpValue;
1113
use futures::Stream;
1214
use std::pin::Pin;
1315
use std::sync::{Arc, OnceLock, Weak};
@@ -127,58 +129,41 @@ impl PartitionClient {
127129
/// # Errors
128130
/// Returns an error if the sequence number or offset is invalid, or if updating the checkpoint fails.
129131
pub async fn update_checkpoint(&self, event_data: &ReceivedEventData) -> Result<()> {
130-
let mut sequence_number: Option<i64> = None;
131-
let mut offset: Option<String> = None;
132+
let mut offset_option = None;
133+
let mut sequence_number_option = None;
132134

133-
let amqp_message = event_data.raw_amqp_message();
134-
if let Some(message_annotations) = &amqp_message.message_annotations {
135-
for (key, value) in message_annotations.0.iter() {
136-
if *key == crate::consumer::SEQUENCE_NUMBER_ANNOTATION {
137-
match value {
138-
azure_core_amqp::AmqpValue::UInt(value) => {
139-
sequence_number = Some(*value as i64);
140-
}
141-
azure_core_amqp::AmqpValue::ULong(value) => {
142-
sequence_number = Some(*value as i64);
143-
}
144-
azure_core_amqp::AmqpValue::Long(value) => {
145-
sequence_number = Some(*value);
146-
}
147-
azure_core_amqp::AmqpValue::Int(value) => {
148-
sequence_number = Some(*value as i64);
149-
}
150-
_ => {
151-
return Err(azure_core::error::Error::with_message(
152-
azure_core::error::ErrorKind::Other,
153-
"Invalid sequence number",
154-
));
155-
}
156-
}
157-
} else if *key == crate::consumer::OFFSET_ANNOTATION {
158-
match value {
159-
azure_core_amqp::AmqpValue::String(value) => {
160-
offset = Some(value.to_string());
161-
}
162-
_ => {
163-
return Err(azure_core::error::Error::with_message(
164-
azure_core::error::ErrorKind::Other,
165-
"Invalid offset",
166-
));
167-
}
168-
}
169-
}
170-
}
171-
let checkpoint = Checkpoint {
172-
fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(),
173-
event_hub_name: self.client_details.eventhub_name.clone(),
174-
consumer_group: self.client_details.consumer_group.clone(),
175-
partition_id: self.partition_id.clone(),
176-
offset,
177-
sequence_number,
135+
let event_data_message = event_data.raw_amqp_message();
136+
let Some(message_annotations) = event_data_message.message_annotations.as_ref() else {
137+
// No message annotations. Nothing to do.
138+
return Ok(());
139+
};
140+
for (key, value) in message_annotations.0.iter() {
141+
let AmqpAnnotationKey::Symbol(symbol) = key else {
142+
continue;
178143
};
179-
self.checkpoint_store.update_checkpoint(checkpoint).await?;
144+
145+
if *symbol == "x-opt-offset" {
146+
let AmqpValue::String(offset_value) = value else {
147+
continue;
148+
};
149+
offset_option = Some(offset_value.clone());
150+
} else if *symbol == "x-opt-sequence-number" {
151+
let AmqpValue::Long(sequence_number_value) = value else {
152+
continue;
153+
};
154+
sequence_number_option = Some(*sequence_number_value);
155+
}
180156
}
181-
Ok(())
157+
158+
let checkpoint = Checkpoint {
159+
fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(),
160+
event_hub_name: self.client_details.eventhub_name.clone(),
161+
consumer_group: self.client_details.consumer_group.clone(),
162+
partition_id: self.partition_id.clone(),
163+
offset: offset_option,
164+
sequence_number: sequence_number_option,
165+
};
166+
self.checkpoint_store.update_checkpoint(checkpoint).await
182167
}
183168

184169
pub(crate) fn set_event_receiver(&self, event_receiver: EventReceiver) -> Result<()> {

0 commit comments

Comments
 (0)