@@ -24,7 +24,8 @@ use hyper::{
2424 connect:: { dns:: GaiResolver , Connect } ,
2525 HttpConnector ,
2626 } ,
27- header, Body , Method , StatusCode ,
27+ header:: { self , InvalidHeaderValue } ,
28+ Body , Method , Response , StatusCode ,
2829} ;
2930use hyper_rustls:: { HttpsConnector , HttpsConnectorBuilder } ;
3031#[ cfg( feature = "logging" ) ]
@@ -85,6 +86,8 @@ pub trait Request {
8586 where
8687 T : Serialize + Debug + Send + Sync ,
8788 U : DeserializeOwned + Debug + Send + Sync ;
89+
90+ async fn execute ( & self , request : hyper:: Request < Body > ) -> Result < Response < Body > , Self :: Error > ;
8891}
8992
9093// -----------------------------------------------------------------------------
@@ -211,7 +214,7 @@ pub enum SignerError {
211214 Digest ( InvalidLength ) ,
212215 #[ error( "failed to compute time since unix epoch, {0}" ) ]
213216 UnixEpochTime ( SystemTimeError ) ,
214- #[ error( "failed to parse signature paramater , {0}" ) ]
217+ #[ error( "failed to parse signature parameter , {0}" ) ]
215218 Parse ( String ) ,
216219}
217220
@@ -343,6 +346,8 @@ pub enum ClientError {
343346 Signer ( SignerError ) ,
344347 #[ error( "failed to compute request digest, {0}" ) ]
345348 Digest ( SignerError ) ,
349+ #[ error( "failed to serialize signature as header value, {0}" ) ]
350+ SerializeHeaderValue ( InvalidHeaderValue ) ,
346351}
347352
348353// -----------------------------------------------------------------------------
@@ -379,19 +384,7 @@ where
379384 U : DeserializeOwned + Debug + Send + Sync ,
380385 {
381386 let buf = serde_json:: to_vec ( payload) . map_err ( ClientError :: Serialize ) ?;
382- let mut builder = hyper:: Request :: builder ( ) ;
383- if let Some ( credentials) = & self . credentials {
384- let signer = Signer :: try_from ( credentials. to_owned ( ) ) . map_err ( ClientError :: Signer ) ?;
385-
386- builder = builder. header (
387- header:: AUTHORIZATION ,
388- signer
389- . sign ( method. as_str ( ) , endpoint)
390- . map_err ( ClientError :: Digest ) ?,
391- ) ;
392- }
393-
394- let req = builder
387+ let request = hyper:: Request :: builder ( )
395388 . method ( method)
396389 . uri ( endpoint)
397390 . header (
@@ -404,24 +397,7 @@ where
404397 . body ( Body :: from ( buf. to_owned ( ) ) )
405398 . map_err ( ClientError :: RequestBuilder ) ?;
406399
407- #[ cfg( feature = "logging" ) ]
408- if log_enabled ! ( Level :: Trace ) {
409- trace ! (
410- "execute request, endpoint: '{}', method: '{}', body: '{}'" ,
411- endpoint,
412- method. to_string( ) ,
413- String :: from_utf8_lossy( & buf) . to_string( )
414- ) ;
415- }
416-
417- #[ cfg( feature = "metrics" ) ]
418- let instant = Instant :: now ( ) ;
419- let res = self
420- . inner
421- . request ( req)
422- . await
423- . map_err ( ClientError :: Request ) ?;
424-
400+ let res = self . execute ( request) . await ?;
425401 let status = res. status ( ) ;
426402 let buf = hyper:: body:: aggregate ( res. into_body ( ) )
427403 . await
@@ -430,28 +406,11 @@ where
430406 #[ cfg( feature = "logging" ) ]
431407 if log_enabled ! ( Level :: Trace ) {
432408 trace ! (
433- "received response, endpoint: '{}', method: '{}', status: '{}'" ,
434- endpoint,
435- method. to_string( ) ,
409+ "received response, endpoint: '{endpoint}', method: '{method}', status: '{}'" ,
436410 status. as_u16( )
437411 ) ;
438412 }
439413
440- #[ cfg( feature = "metrics" ) ]
441- CLIENT_REQUEST
442- . with_label_values ( & [ endpoint, method. as_ref ( ) , & status. as_u16 ( ) . to_string ( ) ] )
443- . inc ( ) ;
444-
445- #[ cfg( feature = "metrics" ) ]
446- CLIENT_REQUEST_DURATION
447- . with_label_values ( & [
448- endpoint,
449- method. as_ref ( ) ,
450- & status. as_u16 ( ) . to_string ( ) ,
451- "us" ,
452- ] )
453- . inc_by ( Instant :: now ( ) . duration_since ( instant) . as_micros ( ) as f64 ) ;
454-
455414 if !status. is_success ( ) {
456415 return Err ( ClientError :: StatusCode (
457416 status,
@@ -461,6 +420,67 @@ where
461420
462421 Ok ( serde_json:: from_reader ( buf. reader ( ) ) . map_err ( ClientError :: Deserialize ) ?)
463422 }
423+
424+ #[ cfg_attr( feature = "trace" , tracing:: instrument) ]
425+ async fn execute (
426+ & self ,
427+ mut request : hyper:: Request < Body > ,
428+ ) -> Result < Response < Body > , Self :: Error > {
429+ let method = request. method ( ) . to_string ( ) ;
430+ let endpoint = request. uri ( ) . to_string ( ) ;
431+ if !request. headers ( ) . contains_key ( & header:: AUTHORIZATION ) {
432+ if let Some ( credentials) = & self . credentials {
433+ let signer =
434+ Signer :: try_from ( credentials. to_owned ( ) ) . map_err ( ClientError :: Signer ) ?;
435+
436+ request. headers_mut ( ) . insert (
437+ header:: AUTHORIZATION ,
438+ signer
439+ . sign ( & method, & endpoint)
440+ . map_err ( ClientError :: Digest ) ?
441+ . parse ( )
442+ . map_err ( ClientError :: SerializeHeaderValue ) ?,
443+ ) ;
444+ }
445+ }
446+
447+ #[ cfg( feature = "logging" ) ]
448+ if log_enabled ! ( Level :: Trace ) {
449+ trace ! ( "execute request, endpoint: '{endpoint}', method: '{method}'" ) ;
450+ }
451+
452+ #[ cfg( feature = "metrics" ) ]
453+ let instant = Instant :: now ( ) ;
454+ let res = self
455+ . inner
456+ . request ( request)
457+ . await
458+ . map_err ( ClientError :: Request ) ?;
459+
460+ #[ cfg( feature = "metrics" ) ]
461+ {
462+ let status = res. status ( ) ;
463+
464+ CLIENT_REQUEST
465+ . with_label_values ( & [
466+ endpoint. as_str ( ) ,
467+ method. as_ref ( ) ,
468+ & status. as_u16 ( ) . to_string ( ) ,
469+ ] )
470+ . inc ( ) ;
471+
472+ CLIENT_REQUEST_DURATION
473+ . with_label_values ( & [
474+ endpoint. as_str ( ) ,
475+ method. as_ref ( ) ,
476+ & status. as_u16 ( ) . to_string ( ) ,
477+ "us" ,
478+ ] )
479+ . inc_by ( Instant :: now ( ) . duration_since ( instant) . as_micros ( ) as f64 ) ;
480+ }
481+
482+ Ok ( res)
483+ }
464484}
465485
466486#[ async_trait]
@@ -475,74 +495,20 @@ where
475495 where
476496 T : DeserializeOwned + Debug + Send + Sync ,
477497 {
478- let method = & Method :: GET ;
479- let mut builder = hyper:: Request :: builder ( ) ;
480- if let Some ( credentials) = & self . credentials {
481- let signer = Signer :: try_from ( credentials. to_owned ( ) ) . map_err ( ClientError :: Signer ) ?;
482-
483- builder = builder. header (
484- header:: AUTHORIZATION ,
485- signer
486- . sign ( method. as_str ( ) , endpoint)
487- . map_err ( ClientError :: Digest ) ?,
488- ) ;
489- }
490-
491- let req = builder
492- . method ( method)
498+ let req = hyper:: Request :: builder ( )
499+ . method ( & Method :: GET )
493500 . header ( header:: ACCEPT_CHARSET , UTF8 )
494501 . header ( header:: ACCEPT , APPLICATION_JSON )
495502 . uri ( endpoint)
496503 . body ( Body :: empty ( ) )
497504 . map_err ( ClientError :: RequestBuilder ) ?;
498505
499- #[ cfg( feature = "logging" ) ]
500- if log_enabled ! ( Level :: Trace ) {
501- trace ! (
502- "execute request, endpoint: '{}', method: '{}', body: '<none>'" ,
503- endpoint,
504- method. to_string( )
505- ) ;
506- }
507-
508- #[ cfg( feature = "metrics" ) ]
509- let instant = Instant :: now ( ) ;
510- let res = self
511- . inner
512- . request ( req)
513- . await
514- . map_err ( ClientError :: Request ) ?;
515-
506+ let res = self . execute ( req) . await ?;
516507 let status = res. status ( ) ;
517508 let buf = hyper:: body:: aggregate ( res. into_body ( ) )
518509 . await
519510 . map_err ( ClientError :: BodyAggregation ) ?;
520511
521- #[ cfg( feature = "logging" ) ]
522- if log_enabled ! ( Level :: Trace ) {
523- trace ! (
524- "received response, endpoint: '{}', method: '{}', status: '{}'" ,
525- endpoint,
526- method. to_string( ) ,
527- status. as_u16( )
528- ) ;
529- }
530-
531- #[ cfg( feature = "metrics" ) ]
532- CLIENT_REQUEST
533- . with_label_values ( & [ endpoint, method. as_ref ( ) , & status. as_u16 ( ) . to_string ( ) ] )
534- . inc ( ) ;
535-
536- #[ cfg( feature = "metrics" ) ]
537- CLIENT_REQUEST_DURATION
538- . with_label_values ( & [
539- endpoint,
540- method. as_ref ( ) ,
541- & status. as_u16 ( ) . to_string ( ) ,
542- "us" ,
543- ] )
544- . inc_by ( Instant :: now ( ) . duration_since ( instant) . as_micros ( ) as f64 ) ;
545-
546512 if !status. is_success ( ) {
547513 return Err ( ClientError :: StatusCode (
548514 status,
@@ -582,72 +548,18 @@ where
582548
583549 #[ cfg_attr( feature = "trace" , tracing:: instrument) ]
584550 async fn delete ( & self , endpoint : & str ) -> Result < ( ) , Self :: Error > {
585- let method = & Method :: DELETE ;
586- let mut builder = hyper:: Request :: builder ( ) ;
587- if let Some ( credentials) = & self . credentials {
588- let signer = Signer :: try_from ( credentials. to_owned ( ) ) . map_err ( ClientError :: Signer ) ?;
589-
590- builder = builder. header (
591- header:: AUTHORIZATION ,
592- signer
593- . sign ( method. as_str ( ) , endpoint)
594- . map_err ( ClientError :: Digest ) ?,
595- ) ;
596- }
597-
598- let req = builder
599- . method ( method)
551+ let req = hyper:: Request :: builder ( )
552+ . method ( & Method :: DELETE )
600553 . uri ( endpoint)
601554 . body ( Body :: empty ( ) )
602555 . map_err ( ClientError :: RequestBuilder ) ?;
603556
604- #[ cfg( feature = "logging" ) ]
605- if log_enabled ! ( Level :: Trace ) {
606- trace ! (
607- "execute request, endpoint: '{}', method: '{}', body: '<none>'" ,
608- endpoint,
609- method. to_string( )
610- ) ;
611- }
612-
613- #[ cfg( feature = "metrics" ) ]
614- let instant = Instant :: now ( ) ;
615- let res = self
616- . inner
617- . request ( req)
618- . await
619- . map_err ( ClientError :: Request ) ?;
620-
557+ let res = self . execute ( req) . await ?;
621558 let status = res. status ( ) ;
622559 let buf = hyper:: body:: aggregate ( res. into_body ( ) )
623560 . await
624561 . map_err ( ClientError :: BodyAggregation ) ?;
625562
626- #[ cfg( feature = "logging" ) ]
627- if log_enabled ! ( Level :: Trace ) {
628- trace ! (
629- "received response, endpoint: '{}', method: '{}', status: '{}'" ,
630- endpoint,
631- method. to_string( ) ,
632- status. as_u16( )
633- ) ;
634- }
635-
636- #[ cfg( feature = "metrics" ) ]
637- CLIENT_REQUEST
638- . with_label_values ( & [ endpoint, method. as_ref ( ) , & status. as_u16 ( ) . to_string ( ) ] )
639- . inc ( ) ;
640-
641- #[ cfg( feature = "metrics" ) ]
642- CLIENT_REQUEST_DURATION
643- . with_label_values ( & [
644- endpoint,
645- method. as_ref ( ) ,
646- & status. as_u16 ( ) . to_string ( ) ,
647- "us" ,
648- ] )
649- . inc_by ( Instant :: now ( ) . duration_since ( instant) . as_micros ( ) as f64 ) ;
650-
651563 if !status. is_success ( ) {
652564 return Err ( ClientError :: StatusCode (
653565 status,
0 commit comments