@@ -3,11 +3,11 @@ use std::{collections::HashMap, time::Duration};
33
44use async_nats:: jetstream:: { AckKind , Message as JetstreamMessage } ;
55use async_nats:: {
6+ ConnectOptions ,
67 jetstream:: consumer:: {
7- pull:: { Config , Stream } ,
88 Consumer , PullConsumer ,
9+ pull:: { Config , Stream } ,
910 } ,
10- ConnectOptions ,
1111} ;
1212use backoff:: retry:: Retry ;
1313use backoff:: strategy:: fixed;
@@ -185,7 +185,9 @@ impl JetstreamActor {
185185 tls_config : TlsConfig ,
186186 ) -> Result < ConnectOptions > {
187187 if tls_config. insecure_skip_verify {
188- tracing:: warn!( "'insecureSkipVerify' is set to true, certificate validation will not be performed when connecting to NATS server" ) ;
188+ tracing:: warn!(
189+ "'insecureSkipVerify' is set to true, certificate validation will not be performed when connecting to NATS server"
190+ ) ;
189191 let tls_client_config = rustls:: ClientConfig :: builder ( )
190192 . dangerous ( )
191193 . with_custom_certificate_verifier ( Arc :: new ( NoVerifier ) )
@@ -447,7 +449,7 @@ impl MessageProcessingTracker {
447449 let nack_retry_interval =
448450 fixed:: Interval :: from_millis ( ACK_RETRY_INTERVAL ) . take ( ACK_RETRY_ATTEMPTS ) ;
449451
450- let ack_msg = || async {
452+ let ack_msg = async || {
451453 if let Err ( err) = msg. ack ( ) . await {
452454 tracing:: error!( ?err, "Failed to Ack message" ) ;
453455 return Err ( format ! ( "Acknowledging Jetstream message: {:?}" , err) ) ;
@@ -457,14 +459,14 @@ impl MessageProcessingTracker {
457459
458460 let ack_with_retry = Retry :: retry ( ack_retry_interval, ack_msg, |_: & String | true ) ;
459461
460- let ack_in_progress = || async {
462+ let ack_in_progress = async || {
461463 let ack_result = msg. ack_with ( AckKind :: Progress ) . await ;
462464 if let Err ( e) = ack_result {
463465 tracing:: error!( ?e, "Failed to send InProgress Ack to Jetstream for message" ) ;
464466 }
465467 } ;
466468
467- let nack_msg = || async {
469+ let nack_msg = async || {
468470 let ack_result = msg. ack_with ( AckKind :: Nak ( None ) ) . await ;
469471 if let Err ( e) = ack_result {
470472 tracing:: error!( ?e, "Failed to send InProgress Ack to Jetstream for message" ) ;
@@ -660,8 +662,8 @@ XdvExDsAdjbkBG7ynn9pmMgIJg==
660662 assert ! ( result. is_ok( ) ) ;
661663 }
662664
663- use async_nats:: jetstream:: stream:: Config as StreamConfig ;
664665 use async_nats:: jetstream:: Context ;
666+ use async_nats:: jetstream:: stream:: Config as StreamConfig ;
665667 use tokio:: time:: Duration ;
666668
667669 async fn setup_jetstream ( ) -> ( Context , String ) {
0 commit comments