@@ -270,22 +270,32 @@ mod tests {
270
270
#[ tokio:: test]
271
271
async fn queue_drains_expired_message ( ) {
272
272
let max_size = 3 ;
273
+ let total_expired_messages = 4 ;
274
+ let total_non_expired_messages = 6 ;
273
275
let current_timestamp = 10 ;
274
276
let expires_at_expired = 1 ;
275
277
let expires_at_non_expired = 100 ;
276
278
let queue = create_queue ( max_size, current_timestamp) ;
277
- let mut messages = fake_messages ( 1 ..=10 , expires_at_expired) ;
278
- for ( index, mut message) in messages. clone ( ) . into_iter ( ) . enumerate ( ) {
279
- if index >= 5 {
280
- message. msg_payload . expires_at = expires_at_non_expired;
281
- messages[ index] = message. clone ( ) ;
282
- }
279
+ let expired_messages = fake_messages ( 1 ..=total_expired_messages, expires_at_expired) ;
280
+ let non_expired_messages = fake_messages (
281
+ total_expired_messages + 1 ..=total_non_expired_messages + total_expired_messages,
282
+ expires_at_non_expired,
283
+ ) ;
284
+ for message in expired_messages. clone ( ) {
285
+ queue. enqueue ( message) . await ;
286
+ }
287
+ for message in non_expired_messages. clone ( ) {
283
288
queue. enqueue ( message) . await ;
284
289
}
285
290
let limit = None ;
286
291
287
292
let dequeued_messages = queue. dequeue_blocking ( limit) . await ;
288
293
289
- assert_eq ! ( messages[ 7 ..=9 ] . to_vec( ) , dequeued_messages) ;
294
+ let expected_non_expired_messages_range =
295
+ total_non_expired_messages as usize - max_size..total_non_expired_messages as usize ;
296
+ assert_eq ! (
297
+ non_expired_messages[ expected_non_expired_messages_range] . to_vec( ) ,
298
+ dequeued_messages
299
+ ) ;
290
300
}
291
301
}
0 commit comments