From b45e012e52b950456b00b422d6afd21088e99e12 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 13:10:44 +0300 Subject: [PATCH 01/22] New `Expression` type to handle VRL in one place --- Cargo.lock | 1 + lib/executor/src/executors/map.rs | 48 +++------- lib/executor/src/headers/compile.rs | 90 ++++-------------- lib/executor/src/headers/errors.rs | 14 +-- lib/executor/src/headers/plan.rs | 6 +- lib/executor/src/headers/request.rs | 20 +--- lib/executor/src/headers/response.rs | 20 +--- lib/router-config/Cargo.toml | 1 + lib/router-config/src/headers.rs | 4 +- lib/router-config/src/lib.rs | 6 +- .../src/override_subgraph_urls.rs | 8 +- .../src/primitives/expression.rs | 91 +++++++++++++++++++ lib/router-config/src/primitives/mod.rs | 1 + lib/router-config/src/traffic_shaping.rs | 4 +- 14 files changed, 147 insertions(+), 167 deletions(-) create mode 100644 lib/router-config/src/primitives/expression.rs diff --git a/Cargo.lock b/Cargo.lock index 360f7cd81..f12a6ff43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2064,6 +2064,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tracing", + "vrl", ] [[package]] diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index a3c297ad1..0c25544d3 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,7 +6,9 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; +use hive_router_config::{ + override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig, +}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -15,16 +17,7 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::{ - compiler::compile as vrl_compile, - compiler::Program as VrlProgram, - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::Function as VrlFunction, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; +use vrl::core::Value as VrlValue; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -44,7 +37,7 @@ type SubgraphEndpoint = String; type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; -type ExpressionsBySubgraphMap = HashMap; +type ExpressionsBySubgraphMap = HashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -54,8 +47,6 @@ pub struct SubgraphExecutorMap { /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, config: Arc, - /// Precompiled VRL functions to be used in endpoint expressions. - vrl_functions: Vec>, client: Arc, semaphores_by_origin: DashMap>, max_connections_per_host: usize, @@ -80,7 +71,6 @@ impl SubgraphExecutorMap { static_endpoints_by_subgraph: Default::default(), expressions_by_subgraph: Default::default(), config, - vrl_functions: vrl_build_functions(), client: Arc::new(client), semaphores_by_origin: Default::default(), max_connections_per_host, @@ -101,7 +91,7 @@ impl SubgraphExecutorMap { let endpoint_str = match endpoint_str { Some(UrlOrExpression::Url(url)) => url, - Some(UrlOrExpression::Expression { expression }) => { + Some(UrlOrExpression::Expression(expression)) => { subgraph_executor_map.register_expression(&subgraph_name, expression)?; &original_endpoint_str } @@ -194,21 +184,13 @@ impl SubgraphExecutorMap { SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) })?, )); - let mut target = VrlTargetValue { - value: VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.resolve(&mut ctx).map_err(|err| { + let endpoint_result = expression.execute(value).map_err(|err| { SubgraphExecutorError::new_endpoint_expression_resolution_failure( subgraph_name.to_string(), err, @@ -267,14 +249,10 @@ impl SubgraphExecutorMap { fn register_expression( &mut self, subgraph_name: &str, - expression: &str, + expression: &Expression, ) -> Result<(), SubgraphExecutorError> { - let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| { - SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e) - })?; - self.expressions_by_subgraph - .insert(subgraph_name.to_string(), compilation_result.program); + .insert(subgraph_name.to_string(), expression.clone()); Ok(()) } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 5b1b14a9f..1cf29c8c9 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -12,43 +12,13 @@ use crate::headers::{ use hive_router_config::headers as config; use http::HeaderName; use regex_automata::{meta, util::syntax::Config as SyntaxConfig}; -use vrl::{ - compiler::compile as vrl_compile, prelude::Function as VrlFunction, - stdlib::all as vrl_build_functions, -}; - -pub struct HeaderRuleCompilerContext { - vrl_functions: Vec>, -} - -impl Default for HeaderRuleCompilerContext { - fn default() -> Self { - Self::new() - } -} - -impl HeaderRuleCompilerContext { - pub fn new() -> Self { - Self { - vrl_functions: vrl_build_functions(), - } - } -} pub trait HeaderRuleCompiler { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut A, - ) -> Result<(), HeaderRuleCompileError>; + fn compile(&self, actions: &mut A) -> Result<(), HeaderRuleCompileError>; } impl HeaderRuleCompiler> for config::RequestHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::RequestHeaderRule::Propagate(rule) => { let spec = materialize_match_spec( @@ -78,16 +48,11 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { value: build_header_value(&rule.name, value)?, })); } - config::InsertSource::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &ctx.vrl_functions).map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + config::InsertSource::Expression(expression) => { actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(expression.clone()), }, )); } @@ -112,11 +77,7 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { } impl HeaderRuleCompiler> for config::ResponseHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::ResponseHeaderRule::Propagate(rule) => { let aggregation_strategy = rule.algorithm.into(); @@ -153,21 +114,16 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule strategy: aggregation_strategy, })); } - config::InsertSource::Expression { expression } => { + config::InsertSource::Expression(expression) => { // NOTE: In case we ever need to improve performance and not pass the whole context // to VRL expressions, we can use: // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression - let compilation_result = vrl_compile(expression, &ctx.vrl_functions) - .map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(expression.clone()), strategy: aggregation_strategy, }, )); @@ -196,19 +152,18 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule pub fn compile_headers_plan( cfg: &config::HeadersConfig, ) -> Result { - let ctx = HeaderRuleCompilerContext::new(); let mut request_plan = RequestHeaderRules::default(); let mut response_plan = ResponseHeaderRules::default(); if let Some(global_rules) = &cfg.all { - request_plan.global = compile_request_header_rules(&ctx, global_rules)?; - response_plan.global = compile_response_header_rules(&ctx, global_rules)?; + request_plan.global = compile_request_header_rules(global_rules)?; + response_plan.global = compile_response_header_rules(global_rules)?; } if let Some(subgraph_rules_map) = &cfg.subgraphs { for (subgraph_name, subgraph_rules) in subgraph_rules_map { - let request_actions = compile_request_header_rules(&ctx, subgraph_rules)?; - let response_actions = compile_response_header_rules(&ctx, subgraph_rules)?; + let request_actions = compile_request_header_rules(subgraph_rules)?; + let response_actions = compile_response_header_rules(subgraph_rules)?; request_plan .by_subgraph .insert(subgraph_name.clone(), request_actions); @@ -225,26 +180,24 @@ pub fn compile_headers_plan( } fn compile_request_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut request_actions = Vec::new(); if let Some(request_rule_entries) = &header_rules.request { for request_rule in request_rule_entries { - request_rule.compile(ctx, &mut request_actions)?; + request_rule.compile(&mut request_actions)?; } } Ok(request_actions) } fn compile_response_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut response_actions = Vec::new(); if let Some(response_rule_entries) = &header_rules.response { for response_rule in response_rule_entries { - response_rule.compile(ctx, &mut response_actions)?; + response_rule.compile(&mut response_actions)?; } } Ok(response_actions) @@ -358,7 +311,7 @@ mod tests { use http::HeaderName; use crate::headers::{ - compile::{build_header_value, HeaderRuleCompiler, HeaderRuleCompilerContext}, + compile::{build_header_value, HeaderRuleCompiler}, errors::HeaderRuleCompileError, plan::{HeaderAggregationStrategy, RequestHeaderRule, ResponseHeaderRule}, }; @@ -378,9 +331,8 @@ mod tests { rename: None, default: None, }); - let ctx = HeaderRuleCompilerContext::new(); let mut actions = Vec::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::PropagateNamed(data) => { @@ -401,8 +353,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::InsertStatic(data) => { @@ -423,8 +374,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::RemoveNamed(data) => { @@ -449,8 +399,7 @@ mod tests { default: Some("def".to_string()), }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - let err = rule.compile(&ctx, &mut actions).unwrap_err(); + let err = rule.compile(&mut actions).unwrap_err(); match err { HeaderRuleCompileError::InvalidDefault => {} _ => panic!("Expected InvalidDefault error"), @@ -470,8 +419,7 @@ mod tests { algorithm: config::AggregationAlgo::First, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { ResponseHeaderRule::PropagateNamed(data) => { diff --git a/lib/executor/src/headers/errors.rs b/lib/executor/src/headers/errors.rs index d53444877..6c2a806d2 100644 --- a/lib/executor/src/headers/errors.rs +++ b/lib/executor/src/headers/errors.rs @@ -1,6 +1,6 @@ use http::header::{InvalidHeaderName, InvalidHeaderValue}; use regex_automata::meta::BuildError; -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; #[derive(thiserror::Error, Debug)] pub enum HeaderRuleCompileError { @@ -27,16 +27,8 @@ pub enum HeaderRuleRuntimeError { } impl HeaderRuleCompileError { - pub fn new_expression_build(header_name: String, diagnostics: DiagnosticList) -> Self { - HeaderRuleCompileError::ExpressionBuild( - header_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) + pub fn new_expression_build(header_name: String, err: String) -> Self { + HeaderRuleCompileError::ExpressionBuild(header_name, err) } } diff --git a/lib/executor/src/headers/plan.rs b/lib/executor/src/headers/plan.rs index a1a3cf990..69e0edffd 100644 --- a/lib/executor/src/headers/plan.rs +++ b/lib/executor/src/headers/plan.rs @@ -1,7 +1,7 @@ use ahash::HashMap; +use hive_router_config::primitives::expression::Expression; use http::{HeaderName, HeaderValue}; use regex_automata::meta::Regex; -use vrl::compiler::Program as VrlProgram; #[derive(Clone)] pub struct HeaderRulesPlan { @@ -62,13 +62,13 @@ pub struct ResponseInsertStatic { #[derive(Clone)] pub struct RequestInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, } #[derive(Clone)] pub struct ResponseInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, pub strategy: HeaderAggregationStrategy, } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 637ab0d58..925789367 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -1,12 +1,4 @@ -use std::collections::BTreeMap; - use http::HeaderMap; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -174,17 +166,7 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = self.expression.execute(ctx.into()).map_err(|err| { HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 6a5c34444..5f7c895bd 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, iter::once}; +use std::iter::once; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -17,12 +17,6 @@ use crate::{ use super::sanitizer::is_never_join_header; use http::{header::InvalidHeaderValue, HeaderMap, HeaderName, HeaderValue}; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; pub fn apply_subgraph_response_headers( header_rule_plan: &HeaderRulesPlan, @@ -194,17 +188,7 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = self.expression.execute(ctx.into()).map_err(|err| { HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 93edcc3b5..56c6eb981 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -23,6 +23,7 @@ http = { workspace = true } jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } +vrl = { workspace = true } schemars = "1.0.4" humantime-serde = "1.1.1" diff --git a/lib/router-config/src/headers.rs b/lib/router-config/src/headers.rs index bd0913f6f..8d30675b7 100644 --- a/lib/router-config/src/headers.rs +++ b/lib/router-config/src/headers.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::expression::Expression; + type HeaderName = String; type RegExp = String; @@ -248,7 +250,7 @@ pub enum InsertSource { /// name: x-auth-scheme /// expression: 'split(.request.headers.authorization, " ")[0] ?? "none"' /// ``` - Expression { expression: String }, + Expression(Expression), } /// Helper to allow `one` or `many` values for ergonomics (OR semantics). diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 537244c9e..de74d754d 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -29,7 +29,7 @@ use crate::{ primitives::file_path::with_start_path, query_planner::QueryPlannerConfig, supergraph::SupergraphSource, - traffic_shaping::TrafficShapingExecutorConfig, + traffic_shaping::TrafficShapingConfig, }; #[derive(Debug, Deserialize, Serialize, JsonSchema)] @@ -62,9 +62,9 @@ pub struct HiveRouterConfig { #[serde(default)] pub http: HttpServerConfig, - /// Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. + /// Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. #[serde(default)] - pub traffic_shaping: TrafficShapingExecutorConfig, + pub traffic_shaping: TrafficShapingConfig, /// Configuration for the headers. #[serde(default)] diff --git a/lib/router-config/src/override_subgraph_urls.rs b/lib/router-config/src/override_subgraph_urls.rs index ddacfe044..dc32bb783 100644 --- a/lib/router-config/src/override_subgraph_urls.rs +++ b/lib/router-config/src/override_subgraph_urls.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::expression::Expression; + /// Configuration for how the Router should override subgraph URLs. /// This can be used to point to different subgraph endpoints based on environment, /// or to use dynamic expressions to determine the URL at runtime. @@ -52,7 +54,7 @@ pub enum UrlOrExpression { /// A static URL string. Url(String), /// A dynamic value computed by a VRL expression. - Expression { expression: String }, + Expression(Expression), } fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { @@ -75,9 +77,7 @@ fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { subgraphs.insert( "products".to_string(), PerSubgraphConfig { - url: UrlOrExpression::Expression { - expression: expression.to_string(), - }, + url: UrlOrExpression::Expression(expression.to_string().try_into().unwrap()), }, ); diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs new file mode 100644 index 000000000..2df64b768 --- /dev/null +++ b/lib/router-config/src/primitives/expression.rs @@ -0,0 +1,91 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use schemars::{JsonSchema, Schema, SchemaGenerator}; +use serde::{Deserialize, Serialize}; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +#[derive(Debug, Clone)] +pub struct Expression { + expression: String, + program: Box, +} + +impl Expression { + pub fn try_new(expression: String) -> Result { + let vrl_functions = vrl_build_functions(); + + let compilation_result = + vrl_compile(&expression, &vrl_functions).map_err(|diagnostics| { + diagnostics + .errors() + .into_iter() + .map(|d| d.code.to_string() + ": " + &d.message) + .collect::>() + .join(", ") + })?; + + Ok(Self { + expression, + program: Box::new(compilation_result.program), + }) + } + + pub fn execute(&self, value: VrlValue) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let timezone = VrlTimeZone::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + + self.program.resolve(&mut ctx) + } +} + +impl<'de> Deserialize<'de> for Expression { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let expression = String::deserialize(deserializer)?; + Expression::try_new(expression).map_err(serde::de::Error::custom) + } +} + +impl Serialize for Expression { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.expression) + } +} + +impl JsonSchema for Expression { + fn schema_name() -> Cow<'static, str> { + "Expression".into() + } + + fn json_schema(gen: &mut SchemaGenerator) -> Schema { + String::json_schema(gen) + } +} + +impl TryFrom for Expression { + type Error = String; + fn try_from(value: String) -> Result { + Expression::try_new(value) + } +} diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index 972582fc3..aa591928b 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -1,3 +1,4 @@ +pub mod expression; pub mod file_path; pub mod http_header; pub mod retry_policy; diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 02ed5ecdd..58dd7e049 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] -pub struct TrafficShapingExecutorConfig { +pub struct TrafficShapingConfig { /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, @@ -20,7 +20,7 @@ pub struct TrafficShapingExecutorConfig { pub dedupe_enabled: bool, } -impl Default for TrafficShapingExecutorConfig { +impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), From 65b7617936e3046cb0054873a44f8c2b0b0d24ad Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:24:59 +0300 Subject: [PATCH 02/22] Fix --- Cargo.lock | 1 + .../src/pipeline/progressive_override.rs | 29 +++----- lib/executor/src/executors/error.rs | 17 +---- lib/executor/src/executors/map.rs | 2 +- lib/executor/src/headers/request.rs | 12 +++- lib/executor/src/headers/response.rs | 9 ++- lib/router-config/Cargo.toml | 1 + lib/router-config/src/override_labels.rs | 7 +- .../src/primitives/expression.rs | 68 +++++++++++++++---- 9 files changed, 85 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f12a6ff43..9b55ebe70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,6 +2058,7 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", + "once_cell", "retry-policies", "schemars 1.0.5", "serde", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index d0b09c183..eae790709 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; +use hive_router_config::{ + override_labels::{LabelOverrideValue, OverrideLabelsConfig}, + primitives::expression::Expression, +}; use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, @@ -8,13 +11,12 @@ use hive_router_query_planner::{ }; use rand::Rng; use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, TimeZone as VrlTimeZone, }, - stdlib::all as vrl_build_functions, value::Secrets as VrlSecrets, }; @@ -117,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -126,27 +128,14 @@ impl OverrideLabelsEvaluator { ) -> Result { let mut static_enabled_labels = HashSet::new(); let mut expressions = HashMap::new(); - let vrl_functions = vrl_build_functions(); for (label, value) in override_labels_config.iter() { match value { LabelOverrideValue::Boolean(true) => { static_enabled_labels.insert(label.clone()); } - LabelOverrideValue::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &vrl_functions).map_err(|diagnostics| { - OverrideLabelsCompileError { - label: label.clone(), - error: diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - } - })?; - expressions.insert(label.clone(), compilation_result.program); + LabelOverrideValue::Expression(expression) => { + expressions.insert(label.clone(), expression.clone()); } _ => {} // Skip false booleans } @@ -179,7 +168,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.resolve(&mut ctx) { + match expression.execute_with_context(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..de69d47cb 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,4 +1,4 @@ -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions}; @@ -34,21 +34,6 @@ impl From for GraphQLError { } impl SubgraphExecutorError { - pub fn new_endpoint_expression_build( - subgraph_name: String, - diagnostics: DiagnosticList, - ) -> Self { - SubgraphExecutorError::EndpointExpressionBuild( - subgraph_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) - } - pub fn new_endpoint_expression_resolution_failure( subgraph_name: String, error: ExpressionError, diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 0c25544d3..e4e858d96 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -190,7 +190,7 @@ impl SubgraphExecutorMap { ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.execute(value).map_err(|err| { + let endpoint_result = expression.execute_with_value(value).map_err(|err| { SubgraphExecutorError::new_endpoint_expression_resolution_failure( subgraph_name.to_string(), err, diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 925789367..aa3960c05 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -166,9 +166,15 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self.expression.execute(ctx.into()).map_err(|err| { - HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) - })?; + let value = self + .expression + .execute_with_value(ctx.into()) + .map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation( + self.name.to_string(), + Box::new(err), + ) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { if is_never_join_header(&self.name) { diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 5f7c895bd..3c2085644 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -188,9 +188,12 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self.expression.execute(ctx.into()).map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) - })?; + let value = self + .expression + .execute_with_value(ctx.into()) + .map_err(|err| { + HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 56c6eb981..9efaf9405 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -25,6 +25,7 @@ retry-policies = { workspace = true} tracing = { workspace = true } vrl = { workspace = true } +once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index b3dc01a75..a7006f8b1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,6 +2,8 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use crate::primitives::expression::Expression; + /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -15,10 +17,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { - /// An expression that must evaluate to a boolean. If true, the label will be applied. - expression: String, - }, + Expression(Expression), } impl LabelOverrideValue { diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index 2df64b768..b2bd226ac 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,12 +1,12 @@ -use std::{borrow::Cow, collections::BTreeMap}; - +use once_cell::sync::Lazy; use schemars::{JsonSchema, Schema, SchemaGenerator}; use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, core::Value as VrlValue, prelude::{ - state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, TimeZone as VrlTimeZone, }, stdlib::all as vrl_build_functions, @@ -19,16 +19,17 @@ pub struct Expression { program: Box, } +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + impl Expression { pub fn try_new(expression: String) -> Result { - let vrl_functions = vrl_build_functions(); - let compilation_result = - vrl_compile(&expression, &vrl_functions).map_err(|diagnostics| { + vrl_compile(&expression, &VRL_FUNCTIONS).map_err(|diagnostics| { diagnostics .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) .collect::>() .join(", ") })?; @@ -39,7 +40,7 @@ impl Expression { }) } - pub fn execute(&self, value: VrlValue) -> Result { + pub fn execute_with_value(&self, value: VrlValue) -> Result { let mut target = VrlTargetValue { value, metadata: VrlValue::Object(BTreeMap::new()), @@ -47,10 +48,13 @@ impl Expression { }; let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + self.execute_with_context(&mut ctx) + } - self.program.resolve(&mut ctx) + pub fn execute_with_context(&self, ctx: &mut VrlContext) -> Result { + self.program.resolve(ctx) } } @@ -59,8 +63,44 @@ impl<'de> Deserialize<'de> for Expression { where D: serde::Deserializer<'de>, { - let expression = String::deserialize(deserializer)?; - Expression::try_new(expression).map_err(serde::de::Error::custom) + struct ExpressionVisitor; + impl<'de> serde::de::Visitor<'de> for ExpressionVisitor { + type Value = Expression; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a map for Expression") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut expression_str: Option = None; + + while let Some(key) = map.next_key::()? { + match key.as_str() { + "expression" => { + if expression_str.is_some() { + return Err(serde::de::Error::duplicate_field("expression")); + } + expression_str = Some(map.next_value()?); + } + other_key => { + return Err(serde::de::Error::unknown_field( + other_key, + &["expression"], + )); + } + } + } + + let expression_str = + expression_str.ok_or_else(|| serde::de::Error::missing_field("expression"))?; + + Expression::try_new(expression_str).map_err(serde::de::Error::custom) + } + } + deserializer.deserialize_map(ExpressionVisitor) } } From c9ca4841221ca763ed7c26ae77d94cf209545159 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:36:56 +0300 Subject: [PATCH 03/22] Readme --- docs/README.md | 12 +++++----- .../src/primitives/expression.rs | 23 +++++++++++++++---- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/docs/README.md b/docs/README.md index fb474b9d2..1b0c8e40e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| **Additional Properties:** not allowed **Example** @@ -641,7 +641,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes|
@@ -863,7 +863,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1116,7 +1116,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1338,7 +1338,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1808,7 +1808,7 @@ Request timeout for the Hive Console CDN requests. ## traffic\_shaping: object -Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. +Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. **Properties** diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index b2bd226ac..44da62957 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use schemars::{JsonSchema, Schema, SchemaGenerator}; -use serde::{Deserialize, Serialize}; +use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema}; +use serde::{Deserialize, Serialize, ser::SerializeStruct}; use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, @@ -109,7 +109,9 @@ impl Serialize for Expression { where S: serde::Serializer, { - serializer.serialize_str(&self.expression) + let mut state = serializer.serialize_struct("Expression", 1)?; + state.serialize_field("expression", &self.expression)?; + state.end() } } @@ -118,8 +120,19 @@ impl JsonSchema for Expression { "Expression".into() } - fn json_schema(gen: &mut SchemaGenerator) -> Schema { - String::json_schema(gen) + fn json_schema(_gen: &mut SchemaGenerator) -> Schema { + json_schema!({ + "type": "object", + "description": "A VRL expression used for dynamic evaluations.", + "properties": { + "expression": { + "type": "string", + "description": "The VRL expression string." + } + }, + "required": ["expression"], + "additionalProperties": false + }) } } From dae1e066a8f1efdd70d4b0ef8d7e3078cd8ecb1e Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:38:14 +0300 Subject: [PATCH 04/22] Format --- lib/router-config/src/primitives/expression.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index 44da62957..5b60113b7 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema}; -use serde::{Deserialize, Serialize, ser::SerializeStruct}; +use schemars::{json_schema, JsonSchema, Schema, SchemaGenerator}; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, From cb637333168e704d9f12b1f07231de2780bf994d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 18:01:37 +0300 Subject: [PATCH 05/22] Revert --- Cargo.lock | 4 +- .../src/pipeline/progressive_override.rs | 22 ++- docs/README.md | 8 +- lib/executor/Cargo.toml | 2 + lib/executor/src/executors/map.rs | 31 ++-- lib/executor/src/headers/compile.rs | 34 +++-- lib/executor/src/headers/plan.rs | 6 +- lib/executor/src/headers/request.rs | 13 +- lib/executor/src/headers/response.rs | 11 +- lib/executor/src/utils/expression.rs | 49 ++++++ lib/executor/src/utils/mod.rs | 1 + lib/router-config/Cargo.toml | 2 - lib/router-config/src/headers.rs | 4 +- lib/router-config/src/override_labels.rs | 4 +- .../src/override_subgraph_urls.rs | 8 +- .../src/primitives/expression.rs | 144 ------------------ lib/router-config/src/primitives/mod.rs | 1 - 17 files changed, 128 insertions(+), 216 deletions(-) create mode 100644 lib/executor/src/utils/expression.rs delete mode 100644 lib/router-config/src/primitives/expression.rs diff --git a/Cargo.lock b/Cargo.lock index 9b55ebe70..261fea428 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,14 +2058,12 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", - "once_cell", "retry-policies", "schemars 1.0.5", "serde", "serde_json", "thiserror 2.0.17", "tracing", - "vrl", ] [[package]] @@ -2092,9 +2090,11 @@ dependencies = [ "insta", "itoa", "ntex-http", + "once_cell", "ordered-float", "regex-automata", "ryu", + "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index eae790709..0c2ad9c15 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,16 +1,16 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use hive_router_config::{ - override_labels::{LabelOverrideValue, OverrideLabelsConfig}, - primitives::expression::Expression, +use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; +use hive_router_plan_executor::{ + execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression, }; -use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, state::supergraph_state::SupergraphState, }; use rand::Rng; use vrl::{ + compiler::Program as VrlProgram, compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ @@ -119,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -134,8 +134,14 @@ impl OverrideLabelsEvaluator { LabelOverrideValue::Boolean(true) => { static_enabled_labels.insert(label.clone()); } - LabelOverrideValue::Expression(expression) => { - expressions.insert(label.clone(), expression.clone()); + LabelOverrideValue::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + OverrideLabelsCompileError { + label: label.clone(), + error: err.to_string(), + } + })?; + expressions.insert(label.clone(), program); } _ => {} // Skip false booleans } @@ -168,7 +174,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.execute_with_context(&mut ctx) { + match expression.resolve(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/docs/README.md b/docs/README.md index 1b0c8e40e..a01d56293 100644 --- a/docs/README.md +++ b/docs/README.md @@ -641,7 +641,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -863,7 +863,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -1116,7 +1116,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -1338,7 +1338,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index c3f6f9117..06bcf2904 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -49,6 +49,8 @@ itoa = "1.0.15" ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" +once_cell = "1.21.3" +schemars = "1.0.4" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index e4e858d96..93247ec80 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,9 +6,7 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{ - override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig, -}; +use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -17,7 +15,7 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::core::Value as VrlValue; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -30,6 +28,7 @@ use crate::{ http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, + utils::expression::{compile_expression, execute_expression_with_value}, }; type SubgraphName = String; @@ -37,7 +36,7 @@ type SubgraphEndpoint = String; type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; -type ExpressionsBySubgraphMap = HashMap; +type ExpressionsBySubgraphMap = HashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -91,7 +90,7 @@ impl SubgraphExecutorMap { let endpoint_str = match endpoint_str { Some(UrlOrExpression::Url(url)) => url, - Some(UrlOrExpression::Expression(expression)) => { + Some(UrlOrExpression::Expression { expression }) => { subgraph_executor_map.register_expression(&subgraph_name, expression)?; &original_endpoint_str } @@ -190,12 +189,13 @@ impl SubgraphExecutorMap { ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.execute_with_value(value).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; + let endpoint_result = + execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; let endpoint_str = match endpoint_result.as_str() { Some(s) => s.to_string(), None => { @@ -249,10 +249,13 @@ impl SubgraphExecutorMap { fn register_expression( &mut self, subgraph_name: &str, - expression: &Expression, + expression: &str, ) -> Result<(), SubgraphExecutorError> { + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::EndpointExpressionBuild(subgraph_name.to_string(), err) + })?; self.expressions_by_subgraph - .insert(subgraph_name.to_string(), expression.clone()); + .insert(subgraph_name.to_string(), program); Ok(()) } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 1cf29c8c9..587cbee60 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -1,12 +1,16 @@ -use crate::headers::{ - errors::HeaderRuleCompileError, - plan::{ - HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, - RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, RequestPropagateRegex, - RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, ResponseHeaderRules, - ResponseInsertExpression, ResponseInsertStatic, ResponsePropagateNamed, - ResponsePropagateRegex, ResponseRemoveNamed, ResponseRemoveRegex, +use crate::{ + headers::{ + errors::HeaderRuleCompileError, + plan::{ + HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, + RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, + RequestPropagateRegex, RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, + ResponseHeaderRules, ResponseInsertExpression, ResponseInsertStatic, + ResponsePropagateNamed, ResponsePropagateRegex, ResponseRemoveNamed, + ResponseRemoveRegex, + }, }, + utils::expression::compile_expression, }; use hive_router_config::headers as config; @@ -48,11 +52,14 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { value: build_header_value(&rule.name, value)?, })); } - config::InsertSource::Expression(expression) => { + config::InsertSource::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(expression.clone()), + expression: Box::new(program), }, )); } @@ -114,16 +121,19 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule strategy: aggregation_strategy, })); } - config::InsertSource::Expression(expression) => { + config::InsertSource::Expression { expression } => { // NOTE: In case we ever need to improve performance and not pass the whole context // to VRL expressions, we can use: // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(expression.clone()), + expression: Box::new(program), strategy: aggregation_strategy, }, )); diff --git a/lib/executor/src/headers/plan.rs b/lib/executor/src/headers/plan.rs index 69e0edffd..a1a3cf990 100644 --- a/lib/executor/src/headers/plan.rs +++ b/lib/executor/src/headers/plan.rs @@ -1,7 +1,7 @@ use ahash::HashMap; -use hive_router_config::primitives::expression::Expression; use http::{HeaderName, HeaderValue}; use regex_automata::meta::Regex; +use vrl::compiler::Program as VrlProgram; #[derive(Clone)] pub struct HeaderRulesPlan { @@ -62,13 +62,13 @@ pub struct ResponseInsertStatic { #[derive(Clone)] pub struct RequestInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, } #[derive(Clone)] pub struct ResponseInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, pub strategy: HeaderAggregationStrategy, } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index aa3960c05..ec22de509 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -11,6 +11,7 @@ use crate::{ }, sanitizer::{is_denied_header, is_never_join_header}, }, + utils::expression::execute_expression_with_value, }; pub fn modify_subgraph_request_headers( @@ -166,15 +167,9 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self - .expression - .execute_with_value(ctx.into()) - .map_err(|err| { - HeaderRuleRuntimeError::new_expression_evaluation( - self.name.to_string(), - Box::new(err), - ) - })?; + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { if is_never_join_header(&self.name) { diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 3c2085644..9cf45a768 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -13,6 +13,7 @@ use crate::{ }, sanitizer::is_denied_header, }, + utils::expression::execute_expression_with_value, }; use super::sanitizer::is_never_join_header; @@ -188,13 +189,9 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self - .expression - .execute_with_value(ctx.into()) - .map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) - })?; - + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { HeaderAggregationStrategy::Append diff --git a/lib/executor/src/utils/expression.rs b/lib/executor/src/utils/expression.rs new file mode 100644 index 000000000..faee2dc3b --- /dev/null +++ b/lib/executor/src/utils/expression.rs @@ -0,0 +1,49 @@ +use once_cell::sync::Lazy; +use std::collections::BTreeMap; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + +pub fn compile_expression( + expression: &str, + functions: Option<&[Box]>, +) -> Result { + let functions = functions.unwrap_or(&VRL_FUNCTIONS); + + let compilation_result = vrl_compile(expression, functions).map_err(|diagnostics| { + diagnostics + .errors() + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) + .collect::>() + .join(", ") + })?; + + Ok(compilation_result.program) +} + +pub fn execute_expression_with_value( + program: &VrlProgram, + value: VrlValue, +) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + program.resolve(&mut ctx) +} diff --git a/lib/executor/src/utils/mod.rs b/lib/executor/src/utils/mod.rs index fc4226984..0461bb8a8 100644 --- a/lib/executor/src/utils/mod.rs +++ b/lib/executor/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod expression; pub mod traverse; pub mod vrl; diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 9efaf9405..93edcc3b5 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -23,9 +23,7 @@ http = { workspace = true } jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } -vrl = { workspace = true } -once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/headers.rs b/lib/router-config/src/headers.rs index 8d30675b7..bd0913f6f 100644 --- a/lib/router-config/src/headers.rs +++ b/lib/router-config/src/headers.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::primitives::expression::Expression; - type HeaderName = String; type RegExp = String; @@ -250,7 +248,7 @@ pub enum InsertSource { /// name: x-auth-scheme /// expression: 'split(.request.headers.authorization, " ")[0] ?? "none"' /// ``` - Expression(Expression), + Expression { expression: String }, } /// Helper to allow `one` or `many` values for ergonomics (OR semantics). diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index a7006f8b1..cbb7f28d1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,8 +2,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::primitives::expression::Expression; - /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -17,7 +15,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression(Expression), + Expression { expression: String }, } impl LabelOverrideValue { diff --git a/lib/router-config/src/override_subgraph_urls.rs b/lib/router-config/src/override_subgraph_urls.rs index dc32bb783..ddacfe044 100644 --- a/lib/router-config/src/override_subgraph_urls.rs +++ b/lib/router-config/src/override_subgraph_urls.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::primitives::expression::Expression; - /// Configuration for how the Router should override subgraph URLs. /// This can be used to point to different subgraph endpoints based on environment, /// or to use dynamic expressions to determine the URL at runtime. @@ -54,7 +52,7 @@ pub enum UrlOrExpression { /// A static URL string. Url(String), /// A dynamic value computed by a VRL expression. - Expression(Expression), + Expression { expression: String }, } fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { @@ -77,7 +75,9 @@ fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { subgraphs.insert( "products".to_string(), PerSubgraphConfig { - url: UrlOrExpression::Expression(expression.to_string().try_into().unwrap()), + url: UrlOrExpression::Expression { + expression: expression.to_string(), + }, }, ); diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs deleted file mode 100644 index 5b60113b7..000000000 --- a/lib/router-config/src/primitives/expression.rs +++ /dev/null @@ -1,144 +0,0 @@ -use once_cell::sync::Lazy; -use schemars::{json_schema, JsonSchema, Schema, SchemaGenerator}; -use serde::{ser::SerializeStruct, Deserialize, Serialize}; -use std::{borrow::Cow, collections::BTreeMap}; -use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, - core::Value as VrlValue, - prelude::{ - state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, - TimeZone as VrlTimeZone, - }, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; - -#[derive(Debug, Clone)] -pub struct Expression { - expression: String, - program: Box, -} - -static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); -static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); - -impl Expression { - pub fn try_new(expression: String) -> Result { - let compilation_result = - vrl_compile(&expression, &VRL_FUNCTIONS).map_err(|diagnostics| { - diagnostics - .errors() - .iter() - .map(|d| format!("{}: {}", d.code, d.message)) - .collect::>() - .join(", ") - })?; - - Ok(Self { - expression, - program: Box::new(compilation_result.program), - }) - } - - pub fn execute_with_value(&self, value: VrlValue) -> Result { - let mut target = VrlTargetValue { - value, - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); - - self.execute_with_context(&mut ctx) - } - - pub fn execute_with_context(&self, ctx: &mut VrlContext) -> Result { - self.program.resolve(ctx) - } -} - -impl<'de> Deserialize<'de> for Expression { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct ExpressionVisitor; - impl<'de> serde::de::Visitor<'de> for ExpressionVisitor { - type Value = Expression; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a map for Expression") - } - - fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { - let mut expression_str: Option = None; - - while let Some(key) = map.next_key::()? { - match key.as_str() { - "expression" => { - if expression_str.is_some() { - return Err(serde::de::Error::duplicate_field("expression")); - } - expression_str = Some(map.next_value()?); - } - other_key => { - return Err(serde::de::Error::unknown_field( - other_key, - &["expression"], - )); - } - } - } - - let expression_str = - expression_str.ok_or_else(|| serde::de::Error::missing_field("expression"))?; - - Expression::try_new(expression_str).map_err(serde::de::Error::custom) - } - } - deserializer.deserialize_map(ExpressionVisitor) - } -} - -impl Serialize for Expression { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("Expression", 1)?; - state.serialize_field("expression", &self.expression)?; - state.end() - } -} - -impl JsonSchema for Expression { - fn schema_name() -> Cow<'static, str> { - "Expression".into() - } - - fn json_schema(_gen: &mut SchemaGenerator) -> Schema { - json_schema!({ - "type": "object", - "description": "A VRL expression used for dynamic evaluations.", - "properties": { - "expression": { - "type": "string", - "description": "The VRL expression string." - } - }, - "required": ["expression"], - "additionalProperties": false - }) - } -} - -impl TryFrom for Expression { - type Error = String; - fn try_from(value: String) -> Result { - Expression::try_new(value) - } -} diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index aa591928b..972582fc3 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -1,4 +1,3 @@ -pub mod expression; pub mod file_path; pub mod http_header; pub mod retry_policy; From e1111dcb320e36de162a919a230b35e27e47c907 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 13:34:20 +0300 Subject: [PATCH 06/22] Remove extra dep and apply comments --- Cargo.lock | 1 - lib/executor/Cargo.toml | 1 - lib/router-config/src/override_labels.rs | 5 ++++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 261fea428..86befc23f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2094,7 +2094,6 @@ dependencies = [ "ordered-float", "regex-automata", "ryu", - "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 06bcf2904..a1da6754f 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -50,7 +50,6 @@ ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" once_cell = "1.21.3" -schemars = "1.0.4" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index cbb7f28d1..b3dc01a75 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -15,7 +15,10 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { expression: String }, + Expression { + /// An expression that must evaluate to a boolean. If true, the label will be applied. + expression: String, + }, } impl LabelOverrideValue { From 766bd0edbf13ef049f0a508d088362d9fe06028e Mon Sep 17 00:00:00 2001 From: "knope-bot[bot]" <152252888+knope-bot[bot]@users.noreply.github.com> Date: Mon, 3 Nov 2025 10:40:23 +0000 Subject: [PATCH 07/22] Auto generate changeset --- .changeset/shared_utilities_to_handle_vrl_expressions.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/shared_utilities_to_handle_vrl_expressions.md diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md new file mode 100644 index 000000000..f1e52b5c2 --- /dev/null +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -0,0 +1,9 @@ +--- +default: patch +--- + +# Shared utilities to handle VRL expressions + +#540 by @ardatan + + From 6b9ee81b8418d9e089cb9507e498a76033df41f0 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:12:46 +0300 Subject: [PATCH 08/22] Duration --- lib/executor/src/executors/map.rs | 5 +---- lib/router-config/src/traffic_shaping.rs | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 93247ec80..fc3d6905a 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,7 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, - time::Duration, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -57,9 +56,7 @@ impl SubgraphExecutorMap { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) - .pool_idle_timeout(Duration::from_secs( - config.traffic_shaping.pool_idle_timeout_seconds, - )) + .pool_idle_timeout(config.traffic_shaping.pool_idle_timeout) .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host) .build(https); diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 58dd7e049..95d824910 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -9,8 +11,13 @@ pub struct TrafficShapingConfig { pub max_connections_per_host: usize, /// Timeout for idle sockets being kept-alive. - #[serde(default = "default_pool_idle_timeout_seconds")] - pub pool_idle_timeout_seconds: u64, + #[serde( + default = "default_pool_idle_timeout", + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout: Duration, /// Enables/disables request deduplication to subgraphs. /// @@ -24,7 +31,7 @@ impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), - pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), + pool_idle_timeout: default_pool_idle_timeout(), dedupe_enabled: default_dedupe_enabled(), } } @@ -34,8 +41,8 @@ fn default_max_connections_per_host() -> usize { 100 } -fn default_pool_idle_timeout_seconds() -> u64 { - 50 +fn default_pool_idle_timeout() -> Duration { + Duration::from_secs(50) } fn default_dedupe_enabled() -> bool { From 47f829784ebf955f24897f94ca7c7871ae992a4b Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:19:01 +0300 Subject: [PATCH 09/22] Fix config' --- docs/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/README.md b/docs/README.md index a01d56293..0a2fc6727 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout":"50s"}`
|| **Additional Properties:** not allowed **Example** @@ -109,7 +109,7 @@ supergraph: {} traffic_shaping: dedupe_enabled: true max_connections_per_host: 100 - pool_idle_timeout_seconds: 50 + pool_idle_timeout: 50s ``` @@ -1817,7 +1817,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| -|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| **Additional Properties:** not allowed **Example** @@ -1825,7 +1825,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations ```yaml dedupe_enabled: true max_connections_per_host: 100 -pool_idle_timeout_seconds: 50 +pool_idle_timeout: 50s ``` From 7ebe2d95083d101dd428bd007fc876d38cffe870 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:52:58 +0300 Subject: [PATCH 10/22] Update traffic shaping configuration for VRL expressions Removed `pool_idle_timeout_seconds` from `traffic_shaping` and replaced it with `pool_idle_timeout` using duration format. --- .../shared_utilities_to_handle_vrl_expressions.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md index f1e52b5c2..97c07aa3a 100644 --- a/.changeset/shared_utilities_to_handle_vrl_expressions.md +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -1,9 +1,15 @@ --- -default: patch +default: minor --- -# Shared utilities to handle VRL expressions +# Breaking -#540 by @ardatan +Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format. +```diff +traffic_shaping: +- pool_idle_timeout_seconds: 30 ++ pool_idle_timeout: 30s +``` +#540 by @ardatan From 68945ff5cf94e898ad5d071cfc2011d6d7c40702 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 17:26:39 +0300 Subject: [PATCH 11/22] Avoid cloning endpoint multiple times --- lib/executor/src/executors/map.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index fc3d6905a..6c6d3c0c5 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -194,19 +194,17 @@ impl SubgraphExecutorMap { ) })?; let endpoint_str = match endpoint_result.as_str() { - Some(s) => s.to_string(), - None => { - return Err(SubgraphExecutorError::EndpointExpressionWrongType( - subgraph_name.to_string(), - )); - } - }; + Some(s) => Ok(s), + None => Err(SubgraphExecutorError::EndpointExpressionWrongType( + subgraph_name.to_string(), + )), + }?; // Check if an executor for this endpoint already exists. let existing_executor = self .executors_by_subgraph .get(subgraph_name) - .and_then(|endpoints| endpoints.get(&endpoint_str).map(|e| e.clone())); + .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); if let Some(executor) = existing_executor { return Ok(Some(executor)); @@ -219,7 +217,7 @@ impl SubgraphExecutorMap { .executors_by_subgraph .get(subgraph_name) .expect("Executor was just registered, should be present"); - return Ok(endpoints.get(&endpoint_str).map(|e| e.clone())); + return Ok(endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); } Ok(None) From 5522cca969f15270d4144d87d521c3feacadd573 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 17:50:31 +0300 Subject: [PATCH 12/22] Simplify --- lib/executor/src/executors/map.rs | 123 +++++++++++++----------------- 1 file changed, 53 insertions(+), 70 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 6c6d3c0c5..026fef092 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -108,7 +108,7 @@ impl SubgraphExecutorMap { client_request: &ClientRequestDetails<'a, 'req>, ) -> HttpExecutionResponse { match self.get_or_create_executor(subgraph_name, client_request) { - Ok(Some(executor)) => executor.execute(execution_request).await, + Ok(executor) => executor.execute(execution_request).await, Err(err) => { error!( "Subgraph executor error for subgraph '{}': {}", @@ -116,13 +116,6 @@ impl SubgraphExecutorMap { ); self.internal_server_error_response(err.into(), subgraph_name) } - Ok(None) => { - error!( - "Subgraph executor not found for subgraph '{}'", - subgraph_name - ); - self.internal_server_error_response("Internal server error".into(), subgraph_name) - } } } @@ -151,15 +144,17 @@ impl SubgraphExecutorMap { &self, subgraph_name: &str, client_request: &ClientRequestDetails<'_, '_>, - ) -> Result, SubgraphExecutorError> { - let from_expression = - self.get_or_create_executor_from_expression(subgraph_name, client_request)?; - - if from_expression.is_some() { - return Ok(from_expression); - } - - Ok(self.get_executor_from_static_endpoint(subgraph_name)) + ) -> Result { + self.expressions_by_subgraph + .get(subgraph_name) + .map(|expression| { + self.get_or_create_executor_from_expression( + subgraph_name, + expression, + client_request, + ) + }) + .unwrap_or_else(|| self.get_executor_from_static_endpoint(subgraph_name)) } /// Looks up a subgraph executor, @@ -169,65 +164,50 @@ impl SubgraphExecutorMap { fn get_or_create_executor_from_expression( &self, subgraph_name: &str, + expression: &VrlProgram, client_request: &ClientRequestDetails<'_, '_>, - ) -> Result, SubgraphExecutorError> { - if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) { - let original_url_value = VrlValue::Bytes(Bytes::from( - self.static_endpoints_by_subgraph - .get(subgraph_name) - .map(|endpoint| endpoint.value().clone()) - .ok_or_else(|| { - SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) - })?, - )); - let value = VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])); - - // Resolve the expression to get an endpoint URL. - let endpoint_result = - execute_expression_with_value(expression, value).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; - let endpoint_str = match endpoint_result.as_str() { - Some(s) => Ok(s), - None => Err(SubgraphExecutorError::EndpointExpressionWrongType( - subgraph_name.to_string(), - )), - }?; - - // Check if an executor for this endpoint already exists. - let existing_executor = self - .executors_by_subgraph + ) -> Result { + let original_url_value = VrlValue::Bytes(Bytes::from( + self.static_endpoints_by_subgraph .get(subgraph_name) - .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); - - if let Some(executor) = existing_executor { - return Ok(Some(executor)); - } - + .map(|endpoint| endpoint.value().clone()) + .ok_or_else(|| { + SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) + })?, + )); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); + + // Resolve the expression to get an endpoint URL. + let endpoint_result = execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; + let endpoint_str = match endpoint_result.as_str() { + Some(s) => Ok(s), + None => Err(SubgraphExecutorError::EndpointExpressionWrongType( + subgraph_name.to_string(), + )), + }?; + + // Check if an executor for this endpoint already exists. + self.executors_by_subgraph + .get(subgraph_name) + .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())) + .map(Ok) // If not, create and register a new one. - self.register_executor(subgraph_name, &endpoint_str)?; - - let endpoints = self - .executors_by_subgraph - .get(subgraph_name) - .expect("Executor was just registered, should be present"); - return Ok(endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); - } - - Ok(None) + .unwrap_or_else(|| self.register_executor(subgraph_name, endpoint_str.as_ref())) } /// Looks up a subgraph executor based on a static endpoint URL. fn get_executor_from_static_endpoint( &self, subgraph_name: &str, - ) -> Option { + ) -> Result { self.static_endpoints_by_subgraph .get(subgraph_name) .and_then(|endpoint_ref| { @@ -236,6 +216,7 @@ impl SubgraphExecutorMap { .get(subgraph_name) .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone())) }) + .ok_or_else(|| SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())) } /// Registers a VRL expression for the given subgraph name. @@ -269,7 +250,7 @@ impl SubgraphExecutorMap { &self, subgraph_name: &str, endpoint_str: &str, - ) -> Result<(), SubgraphExecutorError> { + ) -> Result { let endpoint_uri = endpoint_str.parse::().map_err(|e| { SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string()) })?; @@ -302,11 +283,13 @@ impl SubgraphExecutorMap { self.in_flight_requests.clone(), ); + let executor_arc = executor.to_boxed_arc(); + self.executors_by_subgraph .entry(subgraph_name.to_string()) .or_default() - .insert(endpoint_str.to_string(), executor.to_boxed_arc()); + .insert(endpoint_str.to_string(), executor_arc.clone()); - Ok(()) + Ok(executor_arc) } } From 293c11d0721142d05a288fa2c9670e37840644e5 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 15:24:02 +0300 Subject: [PATCH 13/22] feat(router): HMAC based Subgraph Auth --- Cargo.lock | 3 + docs/README.md | 21 +++++ lib/executor/Cargo.toml | 3 + lib/executor/src/execution/plan.rs | 3 +- lib/executor/src/executors/common.rs | 17 ++-- lib/executor/src/executors/error.rs | 13 +++ lib/executor/src/executors/http.rs | 114 +++++++++++++++++++++--- lib/executor/src/executors/map.rs | 24 +++-- lib/router-config/src/hmac_signature.rs | 57 ++++++++++++ lib/router-config/src/lib.rs | 7 ++ 10 files changed, 238 insertions(+), 24 deletions(-) create mode 100644 lib/router-config/src/hmac_signature.rs diff --git a/Cargo.lock b/Cargo.lock index 86befc23f..ed8e46897 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2079,8 +2079,10 @@ dependencies = [ "futures", "graphql-parser", "graphql-tools", + "hex", "hive-router-config", "hive-router-query-planner", + "hmac", "http", "http-body-util", "hyper", @@ -2095,6 +2097,7 @@ dependencies = [ "regex-automata", "ryu", "serde", + "sha2", "sonic-rs", "strum 0.27.2", "subgraphs", diff --git a/docs/README.md b/docs/README.md index 0a2fc6727..30c90af24 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,6 +8,7 @@ |[**csrf**](#csrf)|`object`|Configuration for CSRF prevention.
Default: `{"enabled":false,"required_headers":[]}`
|| |[**graphiql**](#graphiql)|`object`|Configuration for the GraphiQL interface.
Default: `{"enabled":true}`
|| |[**headers**](#headers)|`object`|Configuration for the headers.
Default: `{}`
|| +|[**hmac\_signature**](#hmac_signature)|`object`||yes| |[**http**](#http)|`object`|Configuration for the HTTP server/listener.
Default: `{"host":"0.0.0.0","port":4000}`
|| |[**jwt**](#jwt)|`object`|Configuration for JWT authentication plugin.
|yes| |[**log**](#log)|`object`|The router logger configuration.
Default: `{"filter":null,"format":"json","level":"info"}`
|| @@ -57,6 +58,8 @@ headers: default: unknown named: x-tenant-id rename: x-acct-tenant +hmac_signature: + extension_name: hmac_signature http: host: 0.0.0.0 port: 4000 @@ -1341,6 +1344,24 @@ For more information on the available functions and syntax, see the |**expression**|`string`||yes| +
+## hmac\_signature: object + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**enabled**|||yes| +|**extension\_name**|`string`|Default: `"hmac_signature"`
|no| +|**secret**|`string`||yes| + +**Example** + +```yaml +extension_name: hmac_signature + +``` + ## http: object diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index a1da6754f..c38f491fe 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -50,6 +50,9 @@ ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" once_cell = "1.21.3" +hmac = "0.12.1" +sha2 = "0.10.9" +hex = "0.4.3" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/execution/plan.rs b/lib/executor/src/execution/plan.rs index f86356312..90102542c 100644 --- a/lib/executor/src/execution/plan.rs +++ b/lib/executor/src/execution/plan.rs @@ -708,6 +708,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { representations, headers: headers_map, extensions: None, + client_request: self.client_request, }; if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan { @@ -722,7 +723,7 @@ impl<'exec, 'req> Executor<'exec, 'req> { subgraph_name: node.service_name.clone(), response: self .executors - .execute(&node.service_name, subgraph_request, self.client_request) + .execute(&node.service_name, subgraph_request) .await .into(), })) diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index bdcd4d819..44fc455d6 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -5,11 +5,13 @@ use bytes::Bytes; use http::HeaderMap; use sonic_rs::Value; +use crate::execution::client_request_details::ClientRequestDetails; + #[async_trait] pub trait SubgraphExecutor { - async fn execute<'a>( + async fn execute<'exec, 'req>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse; fn to_boxed_arc<'a>(self) -> Arc> @@ -26,18 +28,19 @@ pub type SubgraphExecutorBoxedArc = Arc>; pub type SubgraphRequestExtensions = HashMap; -pub struct HttpExecutionRequest<'a> { - pub query: &'a str, +pub struct HttpExecutionRequest<'exec, 'req> { + pub query: &'exec str, pub dedupe: bool, - pub operation_name: Option<&'a str>, + pub operation_name: Option<&'exec str>, // TODO: variables could be stringified before even executing the request - pub variables: Option>, + pub variables: Option>, pub headers: HeaderMap, pub representations: Option>, pub extensions: Option, + pub client_request: &'exec ClientRequestDetails<'exec, 'req>, } -impl HttpExecutionRequest<'_> { +impl HttpExecutionRequest<'_, '_> { pub fn add_request_extensions_field(&mut self, key: String, value: Value) { self.extensions .get_or_insert_with(HashMap::new) diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index de69d47cb..01a67e601 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -20,6 +20,12 @@ pub enum SubgraphExecutorError { RequestFailure(String, String), #[error("Failed to serialize variable \"{0}\": {1}")] VariablesSerializationFailure(String, String), + #[error("Failed to serialize extension \"{0}\": {1}")] + ExtensionSerializationFailure(String, String), + #[error("Failed to build HMAC expression for subgraph '{0}'. Please check your VRL expression for syntax errors. Diagnostic: {1}")] + HMACExpressionBuild(String, String), + #[error("HMAC signature error: {0}")] + HMACSignatureError(String), } impl From for GraphQLError { @@ -61,6 +67,13 @@ impl SubgraphExecutorError { SubgraphExecutorError::VariablesSerializationFailure(_, _) => { "SUBGRAPH_VARIABLES_SERIALIZATION_FAILURE" } + SubgraphExecutorError::ExtensionSerializationFailure(_, _) => { + "SUBGRAPH_EXTENSION_SERIALIZATION_FAILURE" + } + SubgraphExecutorError::HMACSignatureError(_) => "SUBGRAPH_HMAC_SIGNATURE_ERROR", + SubgraphExecutorError::HMACExpressionBuild(_, _) => { + "SUBGRAPH_HMAC_EXPRESSION_BUILD_FAILURE" + } } } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 29b392567..83f39d3f1 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,7 +1,9 @@ +use std::collections::BTreeMap; use std::sync::Arc; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; +use crate::utils::expression::execute_expression_with_value; use dashmap::DashMap; use hive_router_config::HiveRouterConfig; use tokio::sync::OnceCell; @@ -9,6 +11,7 @@ use tokio::sync::OnceCell; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; +use hmac::{Hmac, Mac}; use http::HeaderMap; use http::HeaderValue; use http_body_util::BodyExt; @@ -16,8 +19,10 @@ use http_body_util::Full; use hyper::Version; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; +use sha2::Sha256; use tokio::sync::Semaphore; use tracing::debug; +use vrl::compiler::Program as VrlProgram; use crate::executors::common::HttpExecutionRequest; use crate::executors::error::SubgraphExecutorError; @@ -27,6 +32,7 @@ use crate::utils::consts::COLON; use crate::utils::consts::COMMA; use crate::utils::consts::QUOTE; use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string}; +use vrl::core::Value as VrlValue; #[derive(Debug)] pub struct HTTPSubgraphExecutor { @@ -37,13 +43,23 @@ pub struct HTTPSubgraphExecutor { pub semaphore: Arc, pub config: Arc, pub in_flight_requests: Arc>, ABuildHasher>>, + pub should_sign_hmac: BooleanOrProgram, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; const FIRST_QUOTE_STR: &[u8] = b"{\"query\":"; +const FIRST_EXTENSION_STR: &[u8] = b",\"extensions\":{"; pub type HttpClient = Client, Full>; +type HmacSha256 = Hmac; + +#[derive(Debug)] +pub enum BooleanOrProgram { + Boolean(bool), + Program(Box), +} + impl HTTPSubgraphExecutor { pub fn new( subgraph_name: String, @@ -52,6 +68,7 @@ impl HTTPSubgraphExecutor { semaphore: Arc, config: Arc, in_flight_requests: Arc>, ABuildHasher>>, + should_sign_hmac: BooleanOrProgram, ) -> Self { let mut header_map = HeaderMap::new(); header_map.insert( @@ -71,12 +88,13 @@ impl HTTPSubgraphExecutor { semaphore, config, in_flight_requests, + should_sign_hmac, } } - fn build_request_body<'a>( + fn build_request_body<'exec, 'req>( &self, - execution_request: &HttpExecutionRequest<'a>, + execution_request: &HttpExecutionRequest<'exec, 'req>, ) -> Result, SubgraphExecutorError> { let mut body = Vec::with_capacity(4096); body.put(FIRST_QUOTE_STR); @@ -118,13 +136,89 @@ impl HTTPSubgraphExecutor { body.put(CLOSE_BRACE); } - if let Some(extensions) = &execution_request.extensions { - if !extensions.is_empty() { - let as_value = sonic_rs::to_value(extensions).unwrap(); + let should_sign_hmac = match &self.should_sign_hmac { + BooleanOrProgram::Boolean(b) => *b, + BooleanOrProgram::Program(expr) => { + // .subgraph + let subgraph_value = VrlValue::Object(BTreeMap::from([( + "name".into(), + VrlValue::Bytes(Bytes::from(self.subgraph_name.to_owned())), + )])); + // .request + let request_value: VrlValue = execution_request.client_request.into(); + let target_value = VrlValue::Object(BTreeMap::from([ + ("subgraph".into(), subgraph_value), + ("request".into(), request_value), + ])); + let result = execute_expression_with_value(expr, target_value); + match result { + Ok(VrlValue::Boolean(b)) => b, + Ok(_) => { + return Err(SubgraphExecutorError::HMACSignatureError( + "HMAC signature expression did not evaluate to a boolean".to_string(), + )); + } + Err(e) => { + return Err(SubgraphExecutorError::HMACSignatureError(format!( + "HMAC signature expression evaluation error: {}", + e + ))); + } + } + } + }; - body.put(COMMA); - body.put("\"extensions\":".as_bytes()); - body.extend_from_slice(as_value.to_string().as_bytes()); + let hmac_signature_ext = if should_sign_hmac { + let mut mac = HmacSha256::new_from_slice(self.config.hmac_signature.secret.as_bytes()) + .map_err(|e| { + SubgraphExecutorError::HMACSignatureError(format!( + "Failed to create HMAC instance: {}", + e + )) + })?; + let mut body_without_extensions = body.clone(); + body_without_extensions.put(CLOSE_BRACE); + mac.update(&body_without_extensions); + let result = mac.finalize(); + let result_bytes = result.into_bytes(); + Some(result_bytes) + } else { + None + }; + + if let Some(extensions) = &execution_request.extensions { + let mut first = true; + if let Some(hmac_bytes) = hmac_signature_ext { + if first { + body.put(FIRST_EXTENSION_STR); + first = false; + } else { + body.put(COMMA); + } + body.put(self.config.hmac_signature.extension_name.as_bytes()); + let hmac_hex = hex::encode(hmac_bytes); + body.put(QUOTE); + body.put(hmac_hex.as_bytes()); + body.put(QUOTE); + } + for (extension_name, extension_value) in extensions { + if first { + body.put(FIRST_EXTENSION_STR); + first = false; + } else { + body.put(COMMA); + } + body.put(QUOTE); + body.put(extension_name.as_bytes()); + body.put(QUOTE); + body.put(COLON); + let value_str = sonic_rs::to_string(extension_value).map_err(|err| { + SubgraphExecutorError::ExtensionSerializationFailure( + extension_name.to_string(), + err.to_string(), + ) + })?; + body.put(value_str.as_bytes()); } } @@ -210,9 +304,9 @@ impl HTTPSubgraphExecutor { #[async_trait] impl SubgraphExecutor for HTTPSubgraphExecutor { #[tracing::instrument(skip_all, fields(subgraph_name = self.subgraph_name))] - async fn execute<'a>( + async fn execute<'exec, 'req>( &self, - execution_request: HttpExecutionRequest<'a>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse { let body = match self.build_request_body(&execution_request) { Ok(body) => body, diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 026fef092..30b793bc2 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -5,7 +5,9 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; +use hive_router_config::{ + hmac_signature::BooleanOrExpression, override_subgraph_urls::UrlOrExpression, HiveRouterConfig, +}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -24,7 +26,7 @@ use crate::{ }, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, - http::{HTTPSubgraphExecutor, HttpClient}, + http::{BooleanOrProgram, HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, utils::expression::{compile_expression, execute_expression_with_value}, @@ -101,13 +103,12 @@ impl SubgraphExecutorMap { Ok(subgraph_executor_map) } - pub async fn execute<'a, 'req>( + pub async fn execute<'exec, 'req>( &self, subgraph_name: &str, - execution_request: HttpExecutionRequest<'a>, - client_request: &ClientRequestDetails<'a, 'req>, + execution_request: HttpExecutionRequest<'exec, 'req>, ) -> HttpExecutionResponse { - match self.get_or_create_executor(subgraph_name, client_request) { + match self.get_or_create_executor(subgraph_name, execution_request.client_request) { Ok(executor) => executor.execute(execution_request).await, Err(err) => { error!( @@ -274,6 +275,16 @@ impl SubgraphExecutorMap { .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host))) .clone(); + let should_sign_hmac = match &self.config.hmac_signature.enabled { + BooleanOrExpression::Boolean(b) => BooleanOrProgram::Boolean(*b), + BooleanOrExpression::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::HMACExpressionBuild(subgraph_name.to_string(), err) + })?; + BooleanOrProgram::Program(Box::new(program)) + } + }; + let executor = HTTPSubgraphExecutor::new( subgraph_name.to_string(), endpoint_uri, @@ -281,6 +292,7 @@ impl SubgraphExecutorMap { semaphore, self.config.clone(), self.in_flight_requests.clone(), + should_sign_hmac, ); let executor_arc = executor.to_boxed_arc(); diff --git a/lib/router-config/src/hmac_signature.rs b/lib/router-config/src/hmac_signature.rs new file mode 100644 index 000000000..70509cbbd --- /dev/null +++ b/lib/router-config/src/hmac_signature.rs @@ -0,0 +1,57 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)] +pub struct HMACSignatureConfig { + // Whether to sign outgoing requests with HMAC signatures. + // Can be a boolean or a VRL expression that evaluates to a boolean. + // Example: + // hmac_signature: + // enabled: true + // or enable it conditionally based on the subgraph name: + // hmac_signature: + // enabled: | + // if .subgraph.name == "users" { + // true + // } else { + // false + // } + pub enabled: BooleanOrExpression, + + // The secret key used for HMAC signing and verification. + // It should be a random, opaque string shared between the Hive Router and the subgraph services. + pub secret: String, + + // The key name used in the extensions field of the outgoing requests to store the HMAC signature. + #[serde(default = "default_extension_name")] + pub extension_name: String, +} + +fn default_extension_name() -> String { + "hmac_signature".to_string() +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(untagged)] +pub enum BooleanOrExpression { + Boolean(bool), + Expression { expression: String }, +} + +impl Default for BooleanOrExpression { + fn default() -> Self { + BooleanOrExpression::Boolean(false) + } +} + +impl HMACSignatureConfig { + pub fn is_disabled(&self) -> bool { + match &self.enabled { + BooleanOrExpression::Boolean(b) => !*b, + BooleanOrExpression::Expression { expression: _ } => { + // If it's an expression, we consider it enabled for the purpose of this check. + false + } + } + } +} diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index de74d754d..b2af09431 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -3,6 +3,7 @@ pub mod csrf; mod env_overrides; pub mod graphiql; pub mod headers; +pub mod hmac_signature; pub mod http_server; pub mod jwt_auth; pub mod log; @@ -92,6 +93,12 @@ pub struct HiveRouterConfig { /// Configuration for overriding labels. #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub override_labels: OverrideLabelsConfig, + + #[serde( + default, + skip_serializing_if = "hmac_signature::HMACSignatureConfig::is_disabled" + )] + pub hmac_signature: hmac_signature::HMACSignatureConfig, } #[derive(Debug, thiserror::Error)] From 37b6b752df5ec2a25629e7479e576b115136b01f Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 17:13:09 +0300 Subject: [PATCH 14/22] Tests --- Cargo.lock | 3 ++ e2e/Cargo.toml | 4 ++ e2e/configs/hmac_forward.router.yaml | 3 ++ e2e/src/hmac.rs | 56 ++++++++++++++++++++++++++++ e2e/src/lib.rs | 2 + lib/executor/src/executors/http.rs | 42 +++++++++++++-------- 6 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 e2e/configs/hmac_forward.router.yaml create mode 100644 e2e/src/hmac.rs diff --git a/Cargo.lock b/Cargo.lock index ed8e46897..c03250a47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1371,14 +1371,17 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "e2e" version = "0.0.1" dependencies = [ + "hex", "hive-router", "hive-router-config", + "hmac", "insta", "jsonwebtoken", "lazy_static", "mockito", "ntex", "reqwest", + "sha2", "sonic-rs", "subgraphs", "tempfile", diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 5a604afc1..f26730317 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -24,3 +24,7 @@ subgraphs = { path = "../bench/subgraphs" } mockito = "1.7.0" tempfile = "3.23.0" + +hmac = "0.12.1" +sha2 = "0.10.9" +hex = "0.4.3" diff --git a/e2e/configs/hmac_forward.router.yaml b/e2e/configs/hmac_forward.router.yaml new file mode 100644 index 000000000..f029385d6 --- /dev/null +++ b/e2e/configs/hmac_forward.router.yaml @@ -0,0 +1,3 @@ +hmac_signature: + enabled: true + secret: VERY_SECRET \ No newline at end of file diff --git a/e2e/src/hmac.rs b/e2e/src/hmac.rs new file mode 100644 index 000000000..980526a67 --- /dev/null +++ b/e2e/src/hmac.rs @@ -0,0 +1,56 @@ +#[cfg(test)] +mod hmac_e2e_tests { + use crate::testkit::{ + init_graphql_request, init_router_from_config_file, wait_for_readiness, SubgraphsServer, + }; + + use ntex::web::test; + use sonic_rs::JsonValueTrait; + + fn create_hmac_signature(secret: &str, query: &str) -> String { + use hex; + use hmac::{Hmac, Mac}; + use sha2::Sha256; + + type HmacSha256 = Hmac; + + let mut mac = + HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); + let message = format!("{{\"query\":\"{}\"}}", query); + mac.update(message.as_bytes()); + let result = mac.finalize(); + let code_bytes = result.into_bytes(); + hex::encode(code_bytes) + } + + #[ntex::test] + async fn should_forward_hmac_signature_to_subgraph_via_extensions() { + let subgraphs_server = SubgraphsServer::start().await; + let app = init_router_from_config_file("configs/hmac_forward.router.yaml") + .await + .unwrap(); + wait_for_readiness(&app.app).await; + let query = "query{users{id}}"; + let req = init_graphql_request(query, None); + let resp: ntex::web::WebResponse = test::call_service(&app.app, req.to_request()).await; + + assert!(resp.status().is_success(), "Expected 200 OK"); + + let subgraph_requests = subgraphs_server + .get_subgraph_requests_log("accounts") + .await + .expect("expected requests sent to accounts subgraph"); + assert_eq!( + subgraph_requests.len(), + 1, + "expected 1 request to accounts subgraph" + ); + let extensions = subgraph_requests[0].request_body.get("extensions").unwrap(); + + let expected_signature = create_hmac_signature("VERY_SECRET", query); + assert_eq!( + extensions.get("hmac_signature").unwrap(), + &expected_signature + ); + } +} diff --git a/e2e/src/lib.rs b/e2e/src/lib.rs index 9086e01f4..ca4cd588c 100644 --- a/e2e/src/lib.rs +++ b/e2e/src/lib.rs @@ -3,6 +3,8 @@ mod file_supergraph; #[cfg(test)] mod hive_cdn_supergraph; #[cfg(test)] +mod hmac; +#[cfg(test)] mod jwt; #[cfg(test)] mod override_subgraph_urls; diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 83f39d3f1..288c8307b 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -186,25 +186,30 @@ impl HTTPSubgraphExecutor { None }; - if let Some(extensions) = &execution_request.extensions { - let mut first = true; - if let Some(hmac_bytes) = hmac_signature_ext { - if first { - body.put(FIRST_EXTENSION_STR); - first = false; - } else { - body.put(COMMA); - } - body.put(self.config.hmac_signature.extension_name.as_bytes()); - let hmac_hex = hex::encode(hmac_bytes); - body.put(QUOTE); - body.put(hmac_hex.as_bytes()); - body.put(QUOTE); + let mut first_extension = true; + + if let Some(hmac_bytes) = hmac_signature_ext { + if first_extension { + body.put(FIRST_EXTENSION_STR); + first_extension = false; + } else { + body.put(COMMA); } + body.put(QUOTE); + body.put(self.config.hmac_signature.extension_name.as_bytes()); + body.put(QUOTE); + body.put(COLON); + let hmac_hex = hex::encode(hmac_bytes); + body.put(QUOTE); + body.put(hmac_hex.as_bytes()); + body.put(QUOTE); + } + + if let Some(extensions) = &execution_request.extensions { for (extension_name, extension_value) in extensions { - if first { + if first_extension { body.put(FIRST_EXTENSION_STR); - first = false; + first_extension = false; } else { body.put(COMMA); } @@ -222,8 +227,13 @@ impl HTTPSubgraphExecutor { } } + if !first_extension { + body.put(CLOSE_BRACE); + } + body.put(CLOSE_BRACE); + println!("Built request body: {}", String::from_utf8_lossy(&body)); Ok(body) } From 042af2076f2e9a6039172a3ff8e2761f2d0f874a Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 17:23:14 +0300 Subject: [PATCH 15/22] Default is false --- docs/README.md | 4 +++- lib/executor/src/executors/http.rs | 5 +++++ lib/router-config/src/hmac_signature.rs | 5 +++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/README.md b/docs/README.md index 30c90af24..e3856feb4 100644 --- a/docs/README.md +++ b/docs/README.md @@ -59,6 +59,7 @@ headers: named: x-tenant-id rename: x-acct-tenant hmac_signature: + enabled: false extension_name: hmac_signature http: host: 0.0.0.0 @@ -1351,13 +1352,14 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**enabled**|||yes| +|**enabled**||Default: `false`
|no| |**extension\_name**|`string`|Default: `"hmac_signature"`
|no| |**secret**|`string`||yes| **Example** ```yaml +enabled: false extension_name: hmac_signature ``` diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 288c8307b..88e5dbe45 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -169,6 +169,11 @@ impl HTTPSubgraphExecutor { }; let hmac_signature_ext = if should_sign_hmac { + if self.config.hmac_signature.secret.is_empty() { + return Err(SubgraphExecutorError::HMACSignatureError( + "HMAC signature secret is empty".to_string(), + )); + } let mut mac = HmacSha256::new_from_slice(self.config.hmac_signature.secret.as_bytes()) .map_err(|e| { SubgraphExecutorError::HMACSignatureError(format!( diff --git a/lib/router-config/src/hmac_signature.rs b/lib/router-config/src/hmac_signature.rs index 70509cbbd..332b45dba 100644 --- a/lib/router-config/src/hmac_signature.rs +++ b/lib/router-config/src/hmac_signature.rs @@ -16,6 +16,7 @@ pub struct HMACSignatureConfig { // } else { // false // } + #[serde(default = "default_hmac_signature_enabled")] pub enabled: BooleanOrExpression, // The secret key used for HMAC signing and verification. @@ -55,3 +56,7 @@ impl HMACSignatureConfig { } } } + +fn default_hmac_signature_enabled() -> BooleanOrExpression { + BooleanOrExpression::Boolean(false) +} From 9b6d9e5c365e5dfbd4b505948ba2610aa1f6e798 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 4 Nov 2025 15:28:25 +0300 Subject: [PATCH 16/22] default is hmac-signature --- docs/README.md | 6 +++--- e2e/src/hmac.rs | 2 +- lib/router-config/src/hmac_signature.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/README.md b/docs/README.md index e3856feb4..548a983e1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -60,7 +60,7 @@ headers: rename: x-acct-tenant hmac_signature: enabled: false - extension_name: hmac_signature + extension_name: hmac-signature http: host: 0.0.0.0 port: 4000 @@ -1353,14 +1353,14 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| |**enabled**||Default: `false`
|no| -|**extension\_name**|`string`|Default: `"hmac_signature"`
|no| +|**extension\_name**|`string`|Default: `"hmac-signature"`
|no| |**secret**|`string`||yes| **Example** ```yaml enabled: false -extension_name: hmac_signature +extension_name: hmac-signature ``` diff --git a/e2e/src/hmac.rs b/e2e/src/hmac.rs index 980526a67..1487f674e 100644 --- a/e2e/src/hmac.rs +++ b/e2e/src/hmac.rs @@ -49,7 +49,7 @@ mod hmac_e2e_tests { let expected_signature = create_hmac_signature("VERY_SECRET", query); assert_eq!( - extensions.get("hmac_signature").unwrap(), + extensions.get("hmac-signature").unwrap(), &expected_signature ); } diff --git a/lib/router-config/src/hmac_signature.rs b/lib/router-config/src/hmac_signature.rs index 332b45dba..dc393b1bd 100644 --- a/lib/router-config/src/hmac_signature.rs +++ b/lib/router-config/src/hmac_signature.rs @@ -29,7 +29,7 @@ pub struct HMACSignatureConfig { } fn default_extension_name() -> String { - "hmac_signature".to_string() + "hmac-signature".to_string() } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] From f4136e5a695b8666156c4d78bd4e7daf1b3e6586 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 16:17:01 +0300 Subject: [PATCH 17/22] Simpler --- lib/executor/src/executors/http.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 88e5dbe45..aa74f2442 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -168,7 +168,9 @@ impl HTTPSubgraphExecutor { } }; - let hmac_signature_ext = if should_sign_hmac { + let mut first_extension = true; + + if should_sign_hmac { if self.config.hmac_signature.secret.is_empty() { return Err(SubgraphExecutorError::HMACSignatureError( "HMAC signature secret is empty".to_string(), @@ -186,14 +188,6 @@ impl HTTPSubgraphExecutor { mac.update(&body_without_extensions); let result = mac.finalize(); let result_bytes = result.into_bytes(); - Some(result_bytes) - } else { - None - }; - - let mut first_extension = true; - - if let Some(hmac_bytes) = hmac_signature_ext { if first_extension { body.put(FIRST_EXTENSION_STR); first_extension = false; @@ -204,7 +198,7 @@ impl HTTPSubgraphExecutor { body.put(self.config.hmac_signature.extension_name.as_bytes()); body.put(QUOTE); body.put(COLON); - let hmac_hex = hex::encode(hmac_bytes); + let hmac_hex = hex::encode(result_bytes); body.put(QUOTE); body.put(hmac_hex.as_bytes()); body.put(QUOTE); From d055014b4f3f09a58dc93e2135f5f40d4f4f2ede Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 16:21:41 +0300 Subject: [PATCH 18/22] Simpler --- lib/executor/src/executors/http.rs | 106 ++++++++++++++++------------- 1 file changed, 59 insertions(+), 47 deletions(-) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index aa74f2442..55bb3a747 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -92,50 +92,12 @@ impl HTTPSubgraphExecutor { } } - fn build_request_body<'exec, 'req>( + fn sign_hmac( &self, - execution_request: &HttpExecutionRequest<'exec, 'req>, - ) -> Result, SubgraphExecutorError> { - let mut body = Vec::with_capacity(4096); - body.put(FIRST_QUOTE_STR); - write_and_escape_string(&mut body, execution_request.query); - let mut first_variable = true; - if let Some(variables) = &execution_request.variables { - for (variable_name, variable_value) in variables { - if first_variable { - body.put(FIRST_VARIABLE_STR); - first_variable = false; - } else { - body.put(COMMA); - } - body.put(QUOTE); - body.put(variable_name.as_bytes()); - body.put(QUOTE); - body.put(COLON); - let value_str = sonic_rs::to_string(variable_value).map_err(|err| { - SubgraphExecutorError::VariablesSerializationFailure( - variable_name.to_string(), - err.to_string(), - ) - })?; - body.put(value_str.as_bytes()); - } - } - if let Some(representations) = &execution_request.representations { - if first_variable { - body.put(FIRST_VARIABLE_STR); - first_variable = false; - } else { - body.put(COMMA); - } - body.put("\"representations\":".as_bytes()); - body.extend_from_slice(representations); - } - // "first_variable" should be still true if there are no variables - if !first_variable { - body.put(CLOSE_BRACE); - } - + execution_request: &HttpExecutionRequest, + body: &mut Vec, + first_extension: &mut bool, + ) -> Result<(), SubgraphExecutorError> { let should_sign_hmac = match &self.should_sign_hmac { BooleanOrProgram::Boolean(b) => *b, BooleanOrProgram::Program(expr) => { @@ -168,8 +130,6 @@ impl HTTPSubgraphExecutor { } }; - let mut first_extension = true; - if should_sign_hmac { if self.config.hmac_signature.secret.is_empty() { return Err(SubgraphExecutorError::HMACSignatureError( @@ -188,9 +148,9 @@ impl HTTPSubgraphExecutor { mac.update(&body_without_extensions); let result = mac.finalize(); let result_bytes = result.into_bytes(); - if first_extension { + if *first_extension { body.put(FIRST_EXTENSION_STR); - first_extension = false; + *first_extension = false; } else { body.put(COMMA); } @@ -203,6 +163,58 @@ impl HTTPSubgraphExecutor { body.put(hmac_hex.as_bytes()); body.put(QUOTE); } + Ok(()) + } + + fn build_request_body<'exec, 'req>( + &self, + execution_request: &HttpExecutionRequest<'exec, 'req>, + ) -> Result, SubgraphExecutorError> { + let mut body = Vec::with_capacity(4096); + body.put(FIRST_QUOTE_STR); + write_and_escape_string(&mut body, execution_request.query); + let mut first_variable = true; + if let Some(variables) = &execution_request.variables { + for (variable_name, variable_value) in variables { + if first_variable { + body.put(FIRST_VARIABLE_STR); + first_variable = false; + } else { + body.put(COMMA); + } + body.put(QUOTE); + body.put(variable_name.as_bytes()); + body.put(QUOTE); + body.put(COLON); + let value_str = sonic_rs::to_string(variable_value).map_err(|err| { + SubgraphExecutorError::VariablesSerializationFailure( + variable_name.to_string(), + err.to_string(), + ) + })?; + body.put(value_str.as_bytes()); + } + } + if let Some(representations) = &execution_request.representations { + if first_variable { + body.put(FIRST_VARIABLE_STR); + first_variable = false; + } else { + body.put(COMMA); + } + body.put("\"representations\":".as_bytes()); + body.extend_from_slice(representations); + } + // "first_variable" should be still true if there are no variables + if !first_variable { + body.put(CLOSE_BRACE); + } + + let mut first_extension = true; + + if !self.config.hmac_signature.is_disabled() { + self.sign_hmac(execution_request, &mut body, &mut first_extension)?; + } if let Some(extensions) = &execution_request.extensions { for (extension_name, extension_value) in extensions { From 49f6c7327de0220d5d1208338b75e8c96caef1ce Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 16:26:59 +0300 Subject: [PATCH 19/22] Simpler --- lib/executor/src/executors/error.rs | 6 +++--- lib/executor/src/executors/http.rs | 6 +++--- lib/executor/src/executors/map.rs | 31 +++++++++++++++-------------- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 01a67e601..972448237 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -22,8 +22,8 @@ pub enum SubgraphExecutorError { VariablesSerializationFailure(String, String), #[error("Failed to serialize extension \"{0}\": {1}")] ExtensionSerializationFailure(String, String), - #[error("Failed to build HMAC expression for subgraph '{0}'. Please check your VRL expression for syntax errors. Diagnostic: {1}")] - HMACExpressionBuild(String, String), + #[error("Failed to build HMAC expression. Please check your VRL expression for syntax errors. Diagnostic: {0}")] + HMACExpressionBuild(String), #[error("HMAC signature error: {0}")] HMACSignatureError(String), } @@ -71,7 +71,7 @@ impl SubgraphExecutorError { "SUBGRAPH_EXTENSION_SERIALIZATION_FAILURE" } SubgraphExecutorError::HMACSignatureError(_) => "SUBGRAPH_HMAC_SIGNATURE_ERROR", - SubgraphExecutorError::HMACExpressionBuild(_, _) => { + SubgraphExecutorError::HMACExpressionBuild(_) => { "SUBGRAPH_HMAC_EXPRESSION_BUILD_FAILURE" } } diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 55bb3a747..8d8293941 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -43,7 +43,7 @@ pub struct HTTPSubgraphExecutor { pub semaphore: Arc, pub config: Arc, pub in_flight_requests: Arc>, ABuildHasher>>, - pub should_sign_hmac: BooleanOrProgram, + pub should_sign_hmac: Arc, } const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; @@ -68,7 +68,7 @@ impl HTTPSubgraphExecutor { semaphore: Arc, config: Arc, in_flight_requests: Arc>, ABuildHasher>>, - should_sign_hmac: BooleanOrProgram, + should_sign_hmac: Arc, ) -> Self { let mut header_map = HeaderMap::new(); header_map.insert( @@ -98,7 +98,7 @@ impl HTTPSubgraphExecutor { body: &mut Vec, first_extension: &mut bool, ) -> Result<(), SubgraphExecutorError> { - let should_sign_hmac = match &self.should_sign_hmac { + let should_sign_hmac = match &self.should_sign_hmac.as_ref() { BooleanOrProgram::Boolean(b) => *b, BooleanOrProgram::Program(expr) => { // .subgraph diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 30b793bc2..43e4e5ec5 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -51,10 +51,11 @@ pub struct SubgraphExecutorMap { semaphores_by_origin: DashMap>, max_connections_per_host: usize, in_flight_requests: Arc>, ABuildHasher>>, + should_sign_hmac: Arc, } impl SubgraphExecutorMap { - pub fn new(config: Arc) -> Self { + pub fn try_new(config: Arc) -> Result { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) @@ -64,7 +65,16 @@ impl SubgraphExecutorMap { let max_connections_per_host = config.traffic_shaping.max_connections_per_host; - SubgraphExecutorMap { + let should_sign_hmac = match &config.hmac_signature.enabled { + BooleanOrExpression::Boolean(b) => BooleanOrProgram::Boolean(*b), + BooleanOrExpression::Expression { expression } => { + let program = compile_expression(expression, None) + .map_err(|err| SubgraphExecutorError::HMACExpressionBuild(err))?; + BooleanOrProgram::Program(Box::new(program)) + } + }; + + Ok(SubgraphExecutorMap { executors_by_subgraph: Default::default(), static_endpoints_by_subgraph: Default::default(), expressions_by_subgraph: Default::default(), @@ -73,14 +83,15 @@ impl SubgraphExecutorMap { semaphores_by_origin: Default::default(), max_connections_per_host, in_flight_requests: Arc::new(DashMap::with_hasher(ABuildHasher::default())), - } + should_sign_hmac: Arc::new(should_sign_hmac), + }) } pub fn from_http_endpoint_map( subgraph_endpoint_map: HashMap, config: Arc, ) -> Result { - let mut subgraph_executor_map = SubgraphExecutorMap::new(config.clone()); + let mut subgraph_executor_map = SubgraphExecutorMap::try_new(config.clone())?; for (subgraph_name, original_endpoint_str) in subgraph_endpoint_map.into_iter() { let endpoint_str = config @@ -275,16 +286,6 @@ impl SubgraphExecutorMap { .or_insert_with(|| Arc::new(Semaphore::new(self.max_connections_per_host))) .clone(); - let should_sign_hmac = match &self.config.hmac_signature.enabled { - BooleanOrExpression::Boolean(b) => BooleanOrProgram::Boolean(*b), - BooleanOrExpression::Expression { expression } => { - let program = compile_expression(expression, None).map_err(|err| { - SubgraphExecutorError::HMACExpressionBuild(subgraph_name.to_string(), err) - })?; - BooleanOrProgram::Program(Box::new(program)) - } - }; - let executor = HTTPSubgraphExecutor::new( subgraph_name.to_string(), endpoint_uri, @@ -292,7 +293,7 @@ impl SubgraphExecutorMap { semaphore, self.config.clone(), self.in_flight_requests.clone(), - should_sign_hmac, + self.should_sign_hmac.clone(), ); let executor_arc = executor.to_boxed_arc(); From f262b4e8e62e752fd3c6b87e2f746f0581e8043e Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 16:50:59 +0300 Subject: [PATCH 20/22] Clippy --- lib/executor/src/executors/map.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 43e4e5ec5..992520d8a 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -69,7 +69,7 @@ impl SubgraphExecutorMap { BooleanOrExpression::Boolean(b) => BooleanOrProgram::Boolean(*b), BooleanOrExpression::Expression { expression } => { let program = compile_expression(expression, None) - .map_err(|err| SubgraphExecutorError::HMACExpressionBuild(err))?; + .map_err(SubgraphExecutorError::HMACExpressionBuild)?; BooleanOrProgram::Program(Box::new(program)) } }; From 806143d1b5fca35b8b08694567565112f05e4699 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 16:52:04 +0300 Subject: [PATCH 21/22] Remove extra println --- lib/executor/src/executors/http.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 8d8293941..c4d738ced 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -244,7 +244,6 @@ impl HTTPSubgraphExecutor { body.put(CLOSE_BRACE); - println!("Built request body: {}", String::from_utf8_lossy(&body)); Ok(body) } From 5ac835341d51d045ea069c407febb06408e4697b Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 17:02:28 +0300 Subject: [PATCH 22/22] One file for hmac --- lib/executor/src/execution/hmac.rs | 111 +++++++++++++++++++++++++++++ lib/executor/src/execution/mod.rs | 1 + lib/executor/src/executors/http.rs | 100 +++----------------------- lib/executor/src/executors/map.rs | 20 ++---- 4 files changed, 129 insertions(+), 103 deletions(-) create mode 100644 lib/executor/src/execution/hmac.rs diff --git a/lib/executor/src/execution/hmac.rs b/lib/executor/src/execution/hmac.rs new file mode 100644 index 000000000..6e708f7e2 --- /dev/null +++ b/lib/executor/src/execution/hmac.rs @@ -0,0 +1,111 @@ +use std::collections::BTreeMap; + +use bytes::{BufMut, Bytes}; +use hive_router_config::hmac_signature::{BooleanOrExpression, HMACSignatureConfig}; +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; + +use crate::{ + execution::client_request_details::ClientRequestDetails, + executors::{error::SubgraphExecutorError, http::FIRST_EXTENSION_STR}, + utils::{ + consts::{CLOSE_BRACE, COLON, COMMA, QUOTE}, + expression::{compile_expression, execute_expression_with_value}, + }, +}; + +#[derive(Debug)] +pub enum BooleanOrProgram { + Boolean(bool), + Program(Box), +} + +pub fn compile_hmac_config( + config: &HMACSignatureConfig, +) -> Result { + match &config.enabled { + BooleanOrExpression::Boolean(b) => Ok(BooleanOrProgram::Boolean(*b)), + BooleanOrExpression::Expression { expression } => { + let program = compile_expression(expression, None) + .map_err(SubgraphExecutorError::HMACExpressionBuild)?; + Ok(BooleanOrProgram::Program(Box::new(program))) + } + } +} +type HmacSha256 = Hmac; + +pub fn sign_hmac( + hmac_program: &BooleanOrProgram, + hmac_config: &HMACSignatureConfig, + subgraph_name: &str, + client_request: &ClientRequestDetails, + first_extension: &mut bool, + body: &mut Vec, +) -> Result<(), SubgraphExecutorError> { + let should_sign_hmac = match &hmac_program { + BooleanOrProgram::Boolean(b) => *b, + BooleanOrProgram::Program(expr) => { + // .subgraph + let subgraph_value = VrlValue::Object(BTreeMap::from([( + "name".into(), + VrlValue::Bytes(Bytes::from(subgraph_name.to_owned())), + )])); + // .request + let request_value: VrlValue = client_request.into(); + let target_value = VrlValue::Object(BTreeMap::from([ + ("subgraph".into(), subgraph_value), + ("request".into(), request_value), + ])); + let result = execute_expression_with_value(expr, target_value); + match result { + Ok(VrlValue::Boolean(b)) => b, + Ok(_) => { + return Err(SubgraphExecutorError::HMACSignatureError( + "HMAC signature expression did not evaluate to a boolean".to_string(), + )); + } + Err(e) => { + return Err(SubgraphExecutorError::HMACSignatureError(format!( + "HMAC signature expression evaluation error: {}", + e + ))); + } + } + } + }; + + if should_sign_hmac { + if hmac_config.secret.is_empty() { + return Err(SubgraphExecutorError::HMACSignatureError( + "HMAC signature secret is empty".to_string(), + )); + } + let mut mac = HmacSha256::new_from_slice(hmac_config.secret.as_bytes()).map_err(|e| { + SubgraphExecutorError::HMACSignatureError(format!( + "Failed to create HMAC instance: {}", + e + )) + })?; + let mut body_without_extensions = body.clone(); + body_without_extensions.put(CLOSE_BRACE); + mac.update(&body_without_extensions); + let result = mac.finalize(); + let result_bytes = result.into_bytes(); + if *first_extension { + body.put(FIRST_EXTENSION_STR); + *first_extension = false; + } else { + body.put(COMMA); + } + body.put(QUOTE); + body.put(hmac_config.extension_name.as_bytes()); + body.put(QUOTE); + body.put(COLON); + let hmac_hex = hex::encode(result_bytes); + body.put(QUOTE); + body.put(hmac_hex.as_bytes()); + body.put(QUOTE); + } + Ok(()) +} diff --git a/lib/executor/src/execution/mod.rs b/lib/executor/src/execution/mod.rs index 52dc59506..e66ab00d6 100644 --- a/lib/executor/src/execution/mod.rs +++ b/lib/executor/src/execution/mod.rs @@ -1,5 +1,6 @@ pub mod client_request_details; pub mod error; +pub mod hmac; pub mod jwt_forward; pub mod plan; pub mod rewrites; diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index c4d738ced..79bf186d9 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -1,9 +1,8 @@ -use std::collections::BTreeMap; use std::sync::Arc; +use crate::execution::hmac::{sign_hmac, BooleanOrProgram}; use crate::executors::common::HttpExecutionResponse; use crate::executors::dedupe::{request_fingerprint, ABuildHasher, SharedResponse}; -use crate::utils::expression::execute_expression_with_value; use dashmap::DashMap; use hive_router_config::HiveRouterConfig; use tokio::sync::OnceCell; @@ -11,7 +10,6 @@ use tokio::sync::OnceCell; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; -use hmac::{Hmac, Mac}; use http::HeaderMap; use http::HeaderValue; use http_body_util::BodyExt; @@ -19,10 +17,8 @@ use http_body_util::Full; use hyper::Version; use hyper_tls::HttpsConnector; use hyper_util::client::legacy::{connect::HttpConnector, Client}; -use sha2::Sha256; use tokio::sync::Semaphore; use tracing::debug; -use vrl::compiler::Program as VrlProgram; use crate::executors::common::HttpExecutionRequest; use crate::executors::error::SubgraphExecutorError; @@ -32,7 +28,6 @@ use crate::utils::consts::COLON; use crate::utils::consts::COMMA; use crate::utils::consts::QUOTE; use crate::{executors::common::SubgraphExecutor, json_writer::write_and_escape_string}; -use vrl::core::Value as VrlValue; #[derive(Debug)] pub struct HTTPSubgraphExecutor { @@ -48,18 +43,10 @@ pub struct HTTPSubgraphExecutor { const FIRST_VARIABLE_STR: &[u8] = b",\"variables\":{"; const FIRST_QUOTE_STR: &[u8] = b"{\"query\":"; -const FIRST_EXTENSION_STR: &[u8] = b",\"extensions\":{"; +pub const FIRST_EXTENSION_STR: &[u8] = b",\"extensions\":{"; pub type HttpClient = Client, Full>; -type HmacSha256 = Hmac; - -#[derive(Debug)] -pub enum BooleanOrProgram { - Boolean(bool), - Program(Box), -} - impl HTTPSubgraphExecutor { pub fn new( subgraph_name: String, @@ -92,80 +79,6 @@ impl HTTPSubgraphExecutor { } } - fn sign_hmac( - &self, - execution_request: &HttpExecutionRequest, - body: &mut Vec, - first_extension: &mut bool, - ) -> Result<(), SubgraphExecutorError> { - let should_sign_hmac = match &self.should_sign_hmac.as_ref() { - BooleanOrProgram::Boolean(b) => *b, - BooleanOrProgram::Program(expr) => { - // .subgraph - let subgraph_value = VrlValue::Object(BTreeMap::from([( - "name".into(), - VrlValue::Bytes(Bytes::from(self.subgraph_name.to_owned())), - )])); - // .request - let request_value: VrlValue = execution_request.client_request.into(); - let target_value = VrlValue::Object(BTreeMap::from([ - ("subgraph".into(), subgraph_value), - ("request".into(), request_value), - ])); - let result = execute_expression_with_value(expr, target_value); - match result { - Ok(VrlValue::Boolean(b)) => b, - Ok(_) => { - return Err(SubgraphExecutorError::HMACSignatureError( - "HMAC signature expression did not evaluate to a boolean".to_string(), - )); - } - Err(e) => { - return Err(SubgraphExecutorError::HMACSignatureError(format!( - "HMAC signature expression evaluation error: {}", - e - ))); - } - } - } - }; - - if should_sign_hmac { - if self.config.hmac_signature.secret.is_empty() { - return Err(SubgraphExecutorError::HMACSignatureError( - "HMAC signature secret is empty".to_string(), - )); - } - let mut mac = HmacSha256::new_from_slice(self.config.hmac_signature.secret.as_bytes()) - .map_err(|e| { - SubgraphExecutorError::HMACSignatureError(format!( - "Failed to create HMAC instance: {}", - e - )) - })?; - let mut body_without_extensions = body.clone(); - body_without_extensions.put(CLOSE_BRACE); - mac.update(&body_without_extensions); - let result = mac.finalize(); - let result_bytes = result.into_bytes(); - if *first_extension { - body.put(FIRST_EXTENSION_STR); - *first_extension = false; - } else { - body.put(COMMA); - } - body.put(QUOTE); - body.put(self.config.hmac_signature.extension_name.as_bytes()); - body.put(QUOTE); - body.put(COLON); - let hmac_hex = hex::encode(result_bytes); - body.put(QUOTE); - body.put(hmac_hex.as_bytes()); - body.put(QUOTE); - } - Ok(()) - } - fn build_request_body<'exec, 'req>( &self, execution_request: &HttpExecutionRequest<'exec, 'req>, @@ -213,7 +126,14 @@ impl HTTPSubgraphExecutor { let mut first_extension = true; if !self.config.hmac_signature.is_disabled() { - self.sign_hmac(execution_request, &mut body, &mut first_extension)?; + sign_hmac( + &self.should_sign_hmac, + &self.config.hmac_signature, + &self.subgraph_name, + execution_request.client_request, + &mut first_extension, + &mut body, + )?; } if let Some(extensions) = &execution_request.extensions { diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 992520d8a..16bcf46a6 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -5,9 +5,7 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{ - hmac_signature::BooleanOrExpression, override_subgraph_urls::UrlOrExpression, HiveRouterConfig, -}; +use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -19,14 +17,17 @@ use tracing::error; use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; use crate::{ - execution::client_request_details::ClientRequestDetails, + execution::{ + client_request_details::ClientRequestDetails, + hmac::{compile_hmac_config, BooleanOrProgram}, + }, executors::{ common::{ HttpExecutionRequest, HttpExecutionResponse, SubgraphExecutor, SubgraphExecutorBoxedArc, }, dedupe::{ABuildHasher, SharedResponse}, error::SubgraphExecutorError, - http::{BooleanOrProgram, HTTPSubgraphExecutor, HttpClient}, + http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, utils::expression::{compile_expression, execute_expression_with_value}, @@ -65,14 +66,7 @@ impl SubgraphExecutorMap { let max_connections_per_host = config.traffic_shaping.max_connections_per_host; - let should_sign_hmac = match &config.hmac_signature.enabled { - BooleanOrExpression::Boolean(b) => BooleanOrProgram::Boolean(*b), - BooleanOrExpression::Expression { expression } => { - let program = compile_expression(expression, None) - .map_err(SubgraphExecutorError::HMACExpressionBuild)?; - BooleanOrProgram::Program(Box::new(program)) - } - }; + let should_sign_hmac = compile_hmac_config(&config.hmac_signature)?; Ok(SubgraphExecutorMap { executors_by_subgraph: Default::default(),