1- pub use aws_sdk_sqs:: Client ;
21pub use aws_sdk_sqs:: model:: Message ;
32pub use aws_sdk_sqs:: model:: MessageAttributeValue ;
43pub use aws_sdk_sqs:: model:: MessageSystemAttributeName ;
54pub use aws_sdk_sqs:: model:: QueueAttributeName ;
5+ pub use aws_sdk_sqs:: Client ;
66
77use crate :: utils:: MessageUtils ;
88
99const QUEUE_TIMEOUT_SECS : u16 = 30 ;
1010
1111pub async fn get_queue_timeout_secs ( client : & Client , queue_url : & str ) -> u16 {
12- match client. get_queue_attributes ( ) . queue_url ( queue_url) . attribute_names ( QueueAttributeName :: VisibilityTimeout ) . send ( ) . await {
12+ match client
13+ . get_queue_attributes ( )
14+ . queue_url ( queue_url)
15+ . attribute_names ( QueueAttributeName :: VisibilityTimeout )
16+ . send ( )
17+ . await
18+ {
1319 Err ( e) => {
1420 tracing:: warn!( "Queue {queue_url}: unable to establish queue timeout, using default {QUEUE_TIMEOUT_SECS} secs: {}" , e. to_string( ) ) ;
1521 QUEUE_TIMEOUT_SECS
16- } ,
17- Ok ( gqa) => {
18- match gqa. attributes ( ) {
19- None => {
20- tracing:: debug!( "Queue {queue_url}: no attrs, using default {QUEUE_TIMEOUT_SECS} secs" ) ;
21- QUEUE_TIMEOUT_SECS
22- }
23- Some ( attrs) => match attrs. get ( & aws_sdk_sqs:: model:: QueueAttributeName :: VisibilityTimeout ) {
22+ }
23+ Ok ( gqa) => match gqa. attributes ( ) {
24+ None => {
25+ tracing:: debug!(
26+ "Queue {queue_url}: no attrs, using default {QUEUE_TIMEOUT_SECS} secs"
27+ ) ;
28+ QUEUE_TIMEOUT_SECS
29+ }
30+ Some ( attrs) => {
31+ match attrs. get ( & aws_sdk_sqs:: model:: QueueAttributeName :: VisibilityTimeout ) {
2432 None => {
2533 tracing:: debug!( "Queue {queue_url}: no timeout attr found, using default {QUEUE_TIMEOUT_SECS} secs" ) ;
2634 QUEUE_TIMEOUT_SECS
27- } ,
35+ }
2836 Some ( vt) => {
2937 tracing:: debug!( "Queue {queue_url}: parsing queue tiemout {vt}" ) ;
3038 vt. parse ( ) . unwrap_or ( QUEUE_TIMEOUT_SECS )
3139 }
3240 }
3341 }
34- }
42+ } ,
3543 }
3644}
3745
38- pub fn hold_message_lease ( client : & Client , queue_url : & str , m : & Message , timeout_secs : u16 ) -> Option < tokio:: task:: JoinHandle < ( ) > > {
46+ pub fn hold_message_lease (
47+ client : & Client ,
48+ queue_url : & str ,
49+ m : & Message ,
50+ timeout_secs : u16 ,
51+ ) -> Option < tokio:: task:: JoinHandle < ( ) > > {
3952 if let Some ( rcpt_handle) = m. receipt_handle ( ) . map ( |s| s. to_owned ( ) ) {
4053 let client = client. clone ( ) ;
4154 let queue_url = queue_url. to_owned ( ) ;
4255 let msg_id = m. display_id ( ) ;
4356 let interval = renewal_interval_for_timeout_secs ( timeout_secs) ;
4457
4558 let join_handle = tokio:: spawn ( async move {
46- let mut ticker = tokio:: time:: interval_at ( tokio:: time:: Instant :: now ( ) + interval, interval) ;
59+ let mut ticker =
60+ tokio:: time:: interval_at ( tokio:: time:: Instant :: now ( ) + interval, interval) ;
4761 loop {
4862 ticker. tick ( ) . await ;
4963 tracing:: info!( "Message {msg_id}: renewing lease via {rcpt_handle}" ) ;
@@ -55,11 +69,14 @@ pub fn hold_message_lease(client: &Client, queue_url: &str, m: &Message, timeout
5569 . send ( )
5670 . await ;
5771 if let Err ( e) = cmv {
58- tracing:: error!( "Message {msg_id}: failed to update lease: {}" , e. to_string( ) ) ;
72+ tracing:: error!(
73+ "Message {msg_id}: failed to update lease: {}" ,
74+ e. to_string( )
75+ ) ;
5976 }
6077 }
6178 } ) ;
62-
79+
6380 Some ( join_handle)
6481 } else {
6582 None
@@ -68,4 +85,4 @@ pub fn hold_message_lease(client: &Client, queue_url: &str, m: &Message, timeout
6885
6986fn renewal_interval_for_timeout_secs ( timeout_secs : u16 ) -> tokio:: time:: Duration {
7087 tokio:: time:: Duration :: from_secs ( ( timeout_secs / 2 ) . into ( ) )
71- }
88+ }
0 commit comments