11use async_trait:: async_trait;
2- use http:: {
3- header:: { ETAG , IF_NONE_MATCH , USER_AGENT } ,
4- HeaderValue , StatusCode ,
2+ use hive_console_sdk:: supergraph_fetcher:: {
3+ SupergraphFetcher , SupergraphFetcherAsyncState , SupergraphFetcherError ,
54} ;
6- use lazy_static:: lazy_static;
7- use reqwest_middleware:: { ClientBuilder , ClientWithMiddleware } ;
8- use reqwest_retry:: RetryTransientMiddleware ;
9- use retry_policies:: policies:: ExponentialBackoff ;
105use std:: time:: Duration ;
11- use tokio:: sync:: RwLock ;
126use tracing:: { debug, error} ;
137
148use crate :: {
159 consts:: ROUTER_VERSION ,
1610 supergraph:: base:: { LoadSupergraphError , ReloadSupergraphResult , SupergraphLoader } ,
1711} ;
1812
19- lazy_static ! {
20- pub static ref USER_AGENT_VALUE : HeaderValue = {
21- HeaderValue :: from_str( & format!( "hive-router/{}" , ROUTER_VERSION ) )
22- . expect( "failed to construct user-agent" )
23- } ;
24- }
25-
26- static AUTH_HEADER_NAME : & str = "x-hive-cdn-key" ;
27-
2813pub struct SupergraphHiveConsoleLoader {
29- endpoint : String ,
30- key : String ,
31- http_client : ClientWithMiddleware ,
14+ fetcher : SupergraphFetcher < SupergraphFetcherAsyncState > ,
3215 poll_interval : Duration ,
33- timeout : Duration ,
34- last_etag : RwLock < Option < HeaderValue > > ,
3516}
3617
37- #[ async_trait]
38- impl SupergraphLoader for SupergraphHiveConsoleLoader {
39- async fn load ( & self ) -> Result < ReloadSupergraphResult , LoadSupergraphError > {
40- debug ! (
41- "Fetching supergraph from Hive Console CDN: '{}'" ,
42- self . endpoint,
43- ) ;
44-
45- let mut req = self
46- . http_client
47- . get ( & self . endpoint )
48- . header ( AUTH_HEADER_NAME , & self . key )
49- . header ( USER_AGENT , USER_AGENT_VALUE . clone ( ) )
50- . timeout ( self . timeout ) ;
51-
52- let mut etag_used = false ;
53-
54- match self . last_etag . try_read ( ) {
55- Ok ( lock_guard) => {
56- if let Some ( etag) = lock_guard. as_ref ( ) {
57- req = req. header ( IF_NONE_MATCH , etag) ;
58- etag_used = true ;
59- }
18+ impl From < SupergraphFetcherError > for LoadSupergraphError {
19+ fn from ( err : SupergraphFetcherError ) -> Self {
20+ match err {
21+ SupergraphFetcherError :: NetworkError ( e) => LoadSupergraphError :: NetworkError ( e) ,
22+ SupergraphFetcherError :: NetworkResponseError ( e) => {
23+ LoadSupergraphError :: NetworkResponseError ( e)
6024 }
61- Err ( e) => {
62- error ! ( "Failed to read etag record: {:?}" , e) ;
25+ SupergraphFetcherError :: Lock ( e) => LoadSupergraphError :: LockError ( e) ,
26+ SupergraphFetcherError :: FetcherCreationError ( e) => {
27+ LoadSupergraphError :: InitializationError ( e. to_string ( ) )
6328 }
64- } ;
65-
66- let response = req. send ( ) . await ?. error_for_status ( ) ?;
67-
68- if etag_used && response. status ( ) == StatusCode :: NOT_MODIFIED {
69- Ok ( ReloadSupergraphResult :: Unchanged )
70- } else {
71- if let Some ( new_etag) = response. headers ( ) . get ( ETAG ) {
72- match self . last_etag . try_write ( ) {
73- Ok ( mut v) => {
74- debug ! ( "saving etag record: {:?}" , new_etag) ;
75- * v = Some ( new_etag. clone ( ) ) ;
76- }
77- Err ( e) => {
78- error ! ( "Failed to save etag record: {:?}" , e) ;
79- }
80- }
29+ SupergraphFetcherError :: InvalidKey ( e) => {
30+ LoadSupergraphError :: InvalidConfiguration ( format ! ( "Invalid CDN key: {}" , e) )
8131 }
32+ }
33+ }
34+ }
8235
83- let content = response
84- . text ( )
85- . await
86- . map_err ( LoadSupergraphError :: NetworkResponseError ) ?;
87-
88- Ok ( ReloadSupergraphResult :: Changed { new_sdl : content } )
36+ #[ async_trait]
37+ impl SupergraphLoader for SupergraphHiveConsoleLoader {
38+ async fn load ( & self ) -> Result < ReloadSupergraphResult , LoadSupergraphError > {
39+ let fetcher_result = self . fetcher . fetch_supergraph ( ) . await ;
40+ match fetcher_result {
41+ // If there was an error fetching the supergraph, propagate it
42+ Err ( err) => {
43+ error ! ( "Error fetching supergraph from Hive Console: {}" , err) ;
44+ Err ( LoadSupergraphError :: from ( err) )
45+ }
46+ // If the supergraph has not changed, return Unchanged
47+ Ok ( None ) => Ok ( ReloadSupergraphResult :: Unchanged ) ,
48+ // If there is a new supergraph SDL, return it
49+ Ok ( Some ( sdl) ) => Ok ( ReloadSupergraphResult :: Changed { new_sdl : sdl } ) ,
8950 }
9051 }
9152
@@ -95,31 +56,34 @@ impl SupergraphLoader for SupergraphHiveConsoleLoader {
9556}
9657
9758impl SupergraphHiveConsoleLoader {
98- pub fn new (
59+ pub fn try_new (
9960 endpoint : String ,
10061 key : & str ,
10162 poll_interval : Duration ,
102- timeout : Duration ,
103- retry_policy : ExponentialBackoff ,
63+ connect_timeout : Duration ,
64+ request_timeout : Duration ,
65+ accept_invalid_certs : bool ,
66+ retry_count : u32 ,
10467 ) -> Result < Box < Self > , LoadSupergraphError > {
10568 debug ! (
106- "Creating supergraph source from Hive Console CDN: '{}' (poll interval: {}ms, timeout : {}ms)" ,
69+ "Creating supergraph source from Hive Console CDN: '{}' (poll interval: {}ms, request_timeout : {}ms)" ,
10770 endpoint,
10871 poll_interval. as_millis( ) ,
109- timeout . as_millis( )
72+ request_timeout . as_millis( )
11073 ) ;
111-
112- let client = ClientBuilder :: new ( reqwest:: Client :: new ( ) )
113- . with ( RetryTransientMiddleware :: new_with_policy ( retry_policy) )
114- . build ( ) ;
115-
116- Ok ( Box :: new ( Self {
74+ let fetcher = SupergraphFetcher :: try_new_async (
11775 endpoint,
118- key : key. to_string ( ) ,
119- http_client : client,
76+ key,
77+ format ! ( "hive-router/{}" , ROUTER_VERSION ) ,
78+ connect_timeout,
79+ request_timeout,
80+ accept_invalid_certs,
81+ retry_count,
82+ ) ?;
83+
84+ Ok ( Box :: new ( SupergraphHiveConsoleLoader {
85+ fetcher,
12086 poll_interval,
121- timeout,
122- last_etag : RwLock :: new ( None ) ,
12387 } ) )
12488 }
12589}
0 commit comments