@@ -127,58 +127,15 @@ impl PartitionClient {
127
127
/// # Errors
128
128
/// Returns an error if the sequence number or offset is invalid, or if updating the checkpoint fails.
129
129
pub async fn update_checkpoint ( & self , event_data : & ReceivedEventData ) -> Result < ( ) > {
130
- let mut sequence_number: Option < i64 > = None ;
131
- let mut offset: Option < String > = None ;
132
-
133
- let amqp_message = event_data. raw_amqp_message ( ) ;
134
- if let Some ( message_annotations) = & amqp_message. message_annotations {
135
- for ( key, value) in message_annotations. 0 . iter ( ) {
136
- if * key == crate :: consumer:: SEQUENCE_NUMBER_ANNOTATION {
137
- match value {
138
- azure_core_amqp:: AmqpValue :: UInt ( value) => {
139
- sequence_number = Some ( * value as i64 ) ;
140
- }
141
- azure_core_amqp:: AmqpValue :: ULong ( value) => {
142
- sequence_number = Some ( * value as i64 ) ;
143
- }
144
- azure_core_amqp:: AmqpValue :: Long ( value) => {
145
- sequence_number = Some ( * value) ;
146
- }
147
- azure_core_amqp:: AmqpValue :: Int ( value) => {
148
- sequence_number = Some ( * value as i64 ) ;
149
- }
150
- _ => {
151
- return Err ( azure_core:: error:: Error :: with_message (
152
- azure_core:: error:: ErrorKind :: Other ,
153
- "Invalid sequence number" ,
154
- ) ) ;
155
- }
156
- }
157
- } else if * key == crate :: consumer:: OFFSET_ANNOTATION {
158
- match value {
159
- azure_core_amqp:: AmqpValue :: String ( value) => {
160
- offset = Some ( value. to_string ( ) ) ;
161
- }
162
- _ => {
163
- return Err ( azure_core:: error:: Error :: with_message (
164
- azure_core:: error:: ErrorKind :: Other ,
165
- "Invalid offset" ,
166
- ) ) ;
167
- }
168
- }
169
- }
170
- }
171
- let checkpoint = Checkpoint {
172
- fully_qualified_namespace : self . client_details . fully_qualified_namespace . clone ( ) ,
173
- event_hub_name : self . client_details . eventhub_name . clone ( ) ,
174
- consumer_group : self . client_details . consumer_group . clone ( ) ,
175
- partition_id : self . partition_id . clone ( ) ,
176
- offset,
177
- sequence_number,
178
- } ;
179
- self . checkpoint_store . update_checkpoint ( checkpoint) . await ?;
180
- }
181
- Ok ( ( ) )
130
+ let checkpoint = Checkpoint {
131
+ fully_qualified_namespace : self . client_details . fully_qualified_namespace . clone ( ) ,
132
+ event_hub_name : self . client_details . eventhub_name . clone ( ) ,
133
+ consumer_group : self . client_details . consumer_group . clone ( ) ,
134
+ partition_id : self . partition_id . clone ( ) ,
135
+ offset : event_data. offset ( ) . clone ( ) ,
136
+ sequence_number : event_data. sequence_number ( ) . clone ( ) ,
137
+ } ;
138
+ self . checkpoint_store . update_checkpoint ( checkpoint) . await
182
139
}
183
140
184
141
pub ( crate ) fn set_event_receiver ( & self , event_receiver : EventReceiver ) -> Result < ( ) > {
0 commit comments