Skip to content

Commit c22a587

Browse files
committed
Getting offset and sequence number from event data rather than amqp annotations.
1 parent 5a3c32e commit c22a587

File tree

1 file changed

+9
-52
lines changed

1 file changed

+9
-52
lines changed

sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/partition_client.rs

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -127,58 +127,15 @@ impl PartitionClient {
127127
/// # Errors
128128
/// Returns an error if the sequence number or offset is invalid, or if updating the checkpoint fails.
129129
pub async fn update_checkpoint(&self, event_data: &ReceivedEventData) -> Result<()> {
130-
let mut sequence_number: Option<i64> = None;
131-
let mut offset: Option<String> = None;
132-
133-
let amqp_message = event_data.raw_amqp_message();
134-
if let Some(message_annotations) = &amqp_message.message_annotations {
135-
for (key, value) in message_annotations.0.iter() {
136-
if *key == crate::consumer::SEQUENCE_NUMBER_ANNOTATION {
137-
match value {
138-
azure_core_amqp::AmqpValue::UInt(value) => {
139-
sequence_number = Some(*value as i64);
140-
}
141-
azure_core_amqp::AmqpValue::ULong(value) => {
142-
sequence_number = Some(*value as i64);
143-
}
144-
azure_core_amqp::AmqpValue::Long(value) => {
145-
sequence_number = Some(*value);
146-
}
147-
azure_core_amqp::AmqpValue::Int(value) => {
148-
sequence_number = Some(*value as i64);
149-
}
150-
_ => {
151-
return Err(azure_core::error::Error::with_message(
152-
azure_core::error::ErrorKind::Other,
153-
"Invalid sequence number",
154-
));
155-
}
156-
}
157-
} else if *key == crate::consumer::OFFSET_ANNOTATION {
158-
match value {
159-
azure_core_amqp::AmqpValue::String(value) => {
160-
offset = Some(value.to_string());
161-
}
162-
_ => {
163-
return Err(azure_core::error::Error::with_message(
164-
azure_core::error::ErrorKind::Other,
165-
"Invalid offset",
166-
));
167-
}
168-
}
169-
}
170-
}
171-
let checkpoint = Checkpoint {
172-
fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(),
173-
event_hub_name: self.client_details.eventhub_name.clone(),
174-
consumer_group: self.client_details.consumer_group.clone(),
175-
partition_id: self.partition_id.clone(),
176-
offset,
177-
sequence_number,
178-
};
179-
self.checkpoint_store.update_checkpoint(checkpoint).await?;
180-
}
181-
Ok(())
130+
let checkpoint = Checkpoint {
131+
fully_qualified_namespace: self.client_details.fully_qualified_namespace.clone(),
132+
event_hub_name: self.client_details.eventhub_name.clone(),
133+
consumer_group: self.client_details.consumer_group.clone(),
134+
partition_id: self.partition_id.clone(),
135+
offset: event_data.offset().clone(),
136+
sequence_number: event_data.sequence_number(),
137+
};
138+
self.checkpoint_store.update_checkpoint(checkpoint).await
182139
}
183140

184141
pub(crate) fn set_event_receiver(&self, event_receiver: EventReceiver) -> Result<()> {

0 commit comments

Comments
 (0)