1+ use std:: collections:: BTreeMap ;
12use std:: sync:: Arc ;
23
34use crate :: executors:: common:: HttpExecutionResponse ;
45use crate :: executors:: dedupe:: { request_fingerprint, ABuildHasher , SharedResponse } ;
56use dashmap:: DashMap ;
7+ use hive_router_config:: hmac_signature:: BooleanOrExpression ;
68use hive_router_config:: HiveRouterConfig ;
79use tokio:: sync:: OnceCell ;
810
911use async_trait:: async_trait;
1012
1113use bytes:: { BufMut , Bytes , BytesMut } ;
14+ use hmac:: { Hmac , Mac } ;
1215use http:: HeaderMap ;
1316use http:: HeaderValue ;
1417use http_body_util:: BodyExt ;
1518use http_body_util:: Full ;
1619use hyper:: Version ;
1720use hyper_tls:: HttpsConnector ;
1821use hyper_util:: client:: legacy:: { connect:: HttpConnector , Client } ;
22+ use sha2:: Sha256 ;
1923use tokio:: sync:: Semaphore ;
2024use tracing:: debug;
2125
@@ -27,6 +31,7 @@ use crate::utils::consts::COLON;
2731use crate :: utils:: consts:: COMMA ;
2832use crate :: utils:: consts:: QUOTE ;
2933use crate :: { executors:: common:: SubgraphExecutor , json_writer:: write_and_escape_string} ;
34+ use vrl:: core:: Value as VrlValue ;
3035
3136#[ derive( Debug ) ]
3237pub struct HTTPSubgraphExecutor {
@@ -41,9 +46,12 @@ pub struct HTTPSubgraphExecutor {
4146
4247const FIRST_VARIABLE_STR : & [ u8 ] = b",\" variables\" :{" ;
4348const FIRST_QUOTE_STR : & [ u8 ] = b"{\" query\" :" ;
49+ const FIRST_EXTENSION_STR : & [ u8 ] = b",\" extensions\" :{" ;
4450
4551pub type HttpClient = Client < HttpsConnector < HttpConnector > , Full < Bytes > > ;
4652
53+ type HmacSha256 = Hmac < Sha256 > ;
54+
4755impl HTTPSubgraphExecutor {
4856 pub fn new (
4957 subgraph_name : String ,
@@ -74,9 +82,9 @@ impl HTTPSubgraphExecutor {
7482 }
7583 }
7684
77- fn build_request_body < ' a > (
85+ fn build_request_body < ' exec , ' req > (
7886 & self ,
79- execution_request : & HttpExecutionRequest < ' a > ,
87+ execution_request : & HttpExecutionRequest < ' exec , ' req > ,
8088 ) -> Result < Vec < u8 > , SubgraphExecutorError > {
8189 let mut body = Vec :: with_capacity ( 4096 ) ;
8290 body. put ( FIRST_QUOTE_STR ) ;
@@ -118,13 +126,90 @@ impl HTTPSubgraphExecutor {
118126 body. put ( CLOSE_BRACE ) ;
119127 }
120128
121- if let Some ( extensions) = & execution_request. extensions {
122- if !extensions. is_empty ( ) {
123- let as_value = sonic_rs:: to_value ( extensions) . unwrap ( ) ;
129+ let should_sign_hmac = match & self . config . hmac_signature . enabled {
130+ BooleanOrExpression :: Boolean ( b) => * b,
131+ BooleanOrExpression :: Expression ( expr) => {
132+ // .subgraph
133+ let subgraph_value = VrlValue :: Object ( BTreeMap :: from ( [ (
134+ "name" . into ( ) ,
135+ VrlValue :: Bytes ( Bytes :: from ( self . subgraph_name . to_owned ( ) ) ) ,
136+ ) ] ) ) ;
137+ // .request
138+ let request_value: VrlValue = execution_request. client_request . into ( ) ;
139+ let target_value = VrlValue :: Object ( BTreeMap :: from ( [
140+ ( "subgraph" . into ( ) , subgraph_value) ,
141+ ( "request" . into ( ) , request_value) ,
142+ ] ) ) ;
143+ let result = expr. execute_with_value ( target_value) ;
144+ match result {
145+ Ok ( VrlValue :: Boolean ( b) ) => b,
146+ Ok ( _) => {
147+ return Err ( SubgraphExecutorError :: HMACSignatureError (
148+ "HMAC signature expression did not evaluate to a boolean" . to_string ( ) ,
149+ ) ) ;
150+ }
151+ Err ( e) => {
152+ return Err ( SubgraphExecutorError :: HMACSignatureError ( format ! (
153+ "HMAC signature expression evaluation error: {}" ,
154+ e
155+ ) ) ) ;
156+ }
157+ }
158+ }
159+ } ;
124160
125- body. put ( COMMA ) ;
126- body. put ( "\" extensions\" :" . as_bytes ( ) ) ;
127- body. extend_from_slice ( as_value. to_string ( ) . as_bytes ( ) ) ;
161+ let hmac_signature_ext = if should_sign_hmac {
162+ let mut mac = HmacSha256 :: new_from_slice ( self . config . hmac_signature . secret . as_bytes ( ) )
163+ . map_err ( |e| {
164+ SubgraphExecutorError :: HMACSignatureError ( format ! (
165+ "Failed to create HMAC instance: {}" ,
166+ e
167+ ) )
168+ } ) ?;
169+ let mut body_without_extensions = body. clone ( ) ;
170+ body_without_extensions. put ( CLOSE_BRACE ) ;
171+ mac. update ( & body_without_extensions) ;
172+ let result = mac. finalize ( ) ;
173+ let result_bytes = result. into_bytes ( ) ;
174+ Some ( result_bytes)
175+ } else {
176+ None
177+ } ;
178+
179+ if let Some ( extensions) = & execution_request. extensions {
180+ let mut first = true ;
181+ for ( extension_name, extension_value) in extensions {
182+ if first {
183+ body. put ( COMMA ) ;
184+ body. put ( FIRST_EXTENSION_STR ) ;
185+ first = false ;
186+ } else {
187+ body. put ( COMMA ) ;
188+ }
189+ body. put ( QUOTE ) ;
190+ body. put ( extension_name. as_bytes ( ) ) ;
191+ body. put ( QUOTE ) ;
192+ body. put ( COLON ) ;
193+ let value_str = sonic_rs:: to_string ( extension_value) . map_err ( |err| {
194+ SubgraphExecutorError :: ExtensionSerializationFailure (
195+ extension_name. to_string ( ) ,
196+ err. to_string ( ) ,
197+ )
198+ } ) ?;
199+ body. put ( value_str. as_bytes ( ) ) ;
200+ }
201+ if let Some ( hmac_bytes) = hmac_signature_ext {
202+ if first {
203+ body. put ( COMMA ) ;
204+ body. put ( FIRST_EXTENSION_STR ) ;
205+ } else {
206+ body. put ( COMMA ) ;
207+ }
208+ body. put ( self . config . hmac_signature . extension_name . as_bytes ( ) ) ;
209+ let hmac_hex = hex:: encode ( hmac_bytes) ;
210+ body. put ( QUOTE ) ;
211+ body. put ( hmac_hex. as_bytes ( ) ) ;
212+ body. put ( QUOTE ) ;
128213 }
129214 }
130215
@@ -210,9 +295,9 @@ impl HTTPSubgraphExecutor {
210295#[ async_trait]
211296impl SubgraphExecutor for HTTPSubgraphExecutor {
212297 #[ tracing:: instrument( skip_all, fields( subgraph_name = self . subgraph_name) ) ]
213- async fn execute < ' a > (
298+ async fn execute < ' exec , ' req > (
214299 & self ,
215- execution_request : HttpExecutionRequest < ' a > ,
300+ execution_request : HttpExecutionRequest < ' exec , ' req > ,
216301 ) -> HttpExecutionResponse {
217302 let body = match self . build_request_body ( & execution_request) {
218303 Ok ( body) => body,
0 commit comments