From 809ba486ce71fe46f4119b5fab8aba19c23cad1d Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 13:10:44 +0300 Subject: [PATCH 01/13] New `Expression` type to handle VRL in one place --- Cargo.lock | 1 + lib/executor/src/executors/map.rs | 48 +++------- lib/executor/src/headers/compile.rs | 90 ++++-------------- lib/executor/src/headers/errors.rs | 14 +-- lib/executor/src/headers/plan.rs | 6 +- lib/executor/src/headers/request.rs | 20 +--- lib/executor/src/headers/response.rs | 20 +--- lib/router-config/Cargo.toml | 1 + lib/router-config/src/headers.rs | 4 +- lib/router-config/src/lib.rs | 6 +- .../src/override_subgraph_urls.rs | 8 +- .../src/primitives/expression.rs | 91 +++++++++++++++++++ lib/router-config/src/primitives/mod.rs | 1 + lib/router-config/src/traffic_shaping.rs | 4 +- 14 files changed, 147 insertions(+), 167 deletions(-) create mode 100644 lib/router-config/src/primitives/expression.rs diff --git a/Cargo.lock b/Cargo.lock index f405ed749..d555e8839 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2104,6 +2104,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tracing", + "vrl", ] [[package]] diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index a3c297ad1..0c25544d3 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,7 +6,9 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; +use hive_router_config::{ + override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig, +}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -15,16 +17,7 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::{ - compiler::compile as vrl_compile, - compiler::Program as VrlProgram, - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::Function as VrlFunction, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; +use vrl::core::Value as VrlValue; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -44,7 +37,7 @@ type SubgraphEndpoint = String; type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; -type ExpressionsBySubgraphMap = HashMap; +type ExpressionsBySubgraphMap = HashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -54,8 +47,6 @@ pub struct SubgraphExecutorMap { /// Mapping from subgraph name to VRL expression program expressions_by_subgraph: ExpressionsBySubgraphMap, config: Arc, - /// Precompiled VRL functions to be used in endpoint expressions. - vrl_functions: Vec>, client: Arc, semaphores_by_origin: DashMap>, max_connections_per_host: usize, @@ -80,7 +71,6 @@ impl SubgraphExecutorMap { static_endpoints_by_subgraph: Default::default(), expressions_by_subgraph: Default::default(), config, - vrl_functions: vrl_build_functions(), client: Arc::new(client), semaphores_by_origin: Default::default(), max_connections_per_host, @@ -101,7 +91,7 @@ impl SubgraphExecutorMap { let endpoint_str = match endpoint_str { Some(UrlOrExpression::Url(url)) => url, - Some(UrlOrExpression::Expression { expression }) => { + Some(UrlOrExpression::Expression(expression)) => { subgraph_executor_map.register_expression(&subgraph_name, expression)?; &original_endpoint_str } @@ -194,21 +184,13 @@ impl SubgraphExecutorMap { SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) })?, )); - let mut target = VrlTargetValue { - value: VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.resolve(&mut ctx).map_err(|err| { + let endpoint_result = expression.execute(value).map_err(|err| { SubgraphExecutorError::new_endpoint_expression_resolution_failure( subgraph_name.to_string(), err, @@ -267,14 +249,10 @@ impl SubgraphExecutorMap { fn register_expression( &mut self, subgraph_name: &str, - expression: &str, + expression: &Expression, ) -> Result<(), SubgraphExecutorError> { - let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| { - SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e) - })?; - self.expressions_by_subgraph - .insert(subgraph_name.to_string(), compilation_result.program); + .insert(subgraph_name.to_string(), expression.clone()); Ok(()) } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 5b1b14a9f..1cf29c8c9 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -12,43 +12,13 @@ use crate::headers::{ use hive_router_config::headers as config; use http::HeaderName; use regex_automata::{meta, util::syntax::Config as SyntaxConfig}; -use vrl::{ - compiler::compile as vrl_compile, prelude::Function as VrlFunction, - stdlib::all as vrl_build_functions, -}; - -pub struct HeaderRuleCompilerContext { - vrl_functions: Vec>, -} - -impl Default for HeaderRuleCompilerContext { - fn default() -> Self { - Self::new() - } -} - -impl HeaderRuleCompilerContext { - pub fn new() -> Self { - Self { - vrl_functions: vrl_build_functions(), - } - } -} pub trait HeaderRuleCompiler { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut A, - ) -> Result<(), HeaderRuleCompileError>; + fn compile(&self, actions: &mut A) -> Result<(), HeaderRuleCompileError>; } impl HeaderRuleCompiler> for config::RequestHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::RequestHeaderRule::Propagate(rule) => { let spec = materialize_match_spec( @@ -78,16 +48,11 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { value: build_header_value(&rule.name, value)?, })); } - config::InsertSource::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &ctx.vrl_functions).map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - + config::InsertSource::Expression(expression) => { actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(expression.clone()), }, )); } @@ -112,11 +77,7 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { } impl HeaderRuleCompiler> for config::ResponseHeaderRule { - fn compile( - &self, - ctx: &HeaderRuleCompilerContext, - actions: &mut Vec, - ) -> Result<(), HeaderRuleCompileError> { + fn compile(&self, actions: &mut Vec) -> Result<(), HeaderRuleCompileError> { match self { config::ResponseHeaderRule::Propagate(rule) => { let aggregation_strategy = rule.algorithm.into(); @@ -153,21 +114,16 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule strategy: aggregation_strategy, })); } - config::InsertSource::Expression { expression } => { + config::InsertSource::Expression(expression) => { // NOTE: In case we ever need to improve performance and not pass the whole context // to VRL expressions, we can use: // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression - let compilation_result = vrl_compile(expression, &ctx.vrl_functions) - .map_err(|e| { - HeaderRuleCompileError::new_expression_build(rule.name.clone(), e) - })?; - actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(compilation_result.program), + expression: Box::new(expression.clone()), strategy: aggregation_strategy, }, )); @@ -196,19 +152,18 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule pub fn compile_headers_plan( cfg: &config::HeadersConfig, ) -> Result { - let ctx = HeaderRuleCompilerContext::new(); let mut request_plan = RequestHeaderRules::default(); let mut response_plan = ResponseHeaderRules::default(); if let Some(global_rules) = &cfg.all { - request_plan.global = compile_request_header_rules(&ctx, global_rules)?; - response_plan.global = compile_response_header_rules(&ctx, global_rules)?; + request_plan.global = compile_request_header_rules(global_rules)?; + response_plan.global = compile_response_header_rules(global_rules)?; } if let Some(subgraph_rules_map) = &cfg.subgraphs { for (subgraph_name, subgraph_rules) in subgraph_rules_map { - let request_actions = compile_request_header_rules(&ctx, subgraph_rules)?; - let response_actions = compile_response_header_rules(&ctx, subgraph_rules)?; + let request_actions = compile_request_header_rules(subgraph_rules)?; + let response_actions = compile_response_header_rules(subgraph_rules)?; request_plan .by_subgraph .insert(subgraph_name.clone(), request_actions); @@ -225,26 +180,24 @@ pub fn compile_headers_plan( } fn compile_request_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut request_actions = Vec::new(); if let Some(request_rule_entries) = &header_rules.request { for request_rule in request_rule_entries { - request_rule.compile(ctx, &mut request_actions)?; + request_rule.compile(&mut request_actions)?; } } Ok(request_actions) } fn compile_response_header_rules( - ctx: &HeaderRuleCompilerContext, header_rules: &config::HeaderRules, ) -> Result, HeaderRuleCompileError> { let mut response_actions = Vec::new(); if let Some(response_rule_entries) = &header_rules.response { for response_rule in response_rule_entries { - response_rule.compile(ctx, &mut response_actions)?; + response_rule.compile(&mut response_actions)?; } } Ok(response_actions) @@ -358,7 +311,7 @@ mod tests { use http::HeaderName; use crate::headers::{ - compile::{build_header_value, HeaderRuleCompiler, HeaderRuleCompilerContext}, + compile::{build_header_value, HeaderRuleCompiler}, errors::HeaderRuleCompileError, plan::{HeaderAggregationStrategy, RequestHeaderRule, ResponseHeaderRule}, }; @@ -378,9 +331,8 @@ mod tests { rename: None, default: None, }); - let ctx = HeaderRuleCompilerContext::new(); let mut actions = Vec::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::PropagateNamed(data) => { @@ -401,8 +353,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::InsertStatic(data) => { @@ -423,8 +374,7 @@ mod tests { }, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { RequestHeaderRule::RemoveNamed(data) => { @@ -449,8 +399,7 @@ mod tests { default: Some("def".to_string()), }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - let err = rule.compile(&ctx, &mut actions).unwrap_err(); + let err = rule.compile(&mut actions).unwrap_err(); match err { HeaderRuleCompileError::InvalidDefault => {} _ => panic!("Expected InvalidDefault error"), @@ -470,8 +419,7 @@ mod tests { algorithm: config::AggregationAlgo::First, }); let mut actions = Vec::new(); - let ctx = HeaderRuleCompilerContext::new(); - rule.compile(&ctx, &mut actions).unwrap(); + rule.compile(&mut actions).unwrap(); assert_eq!(actions.len(), 1); match &actions[0] { ResponseHeaderRule::PropagateNamed(data) => { diff --git a/lib/executor/src/headers/errors.rs b/lib/executor/src/headers/errors.rs index d53444877..6c2a806d2 100644 --- a/lib/executor/src/headers/errors.rs +++ b/lib/executor/src/headers/errors.rs @@ -1,6 +1,6 @@ use http::header::{InvalidHeaderName, InvalidHeaderValue}; use regex_automata::meta::BuildError; -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; #[derive(thiserror::Error, Debug)] pub enum HeaderRuleCompileError { @@ -27,16 +27,8 @@ pub enum HeaderRuleRuntimeError { } impl HeaderRuleCompileError { - pub fn new_expression_build(header_name: String, diagnostics: DiagnosticList) -> Self { - HeaderRuleCompileError::ExpressionBuild( - header_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) + pub fn new_expression_build(header_name: String, err: String) -> Self { + HeaderRuleCompileError::ExpressionBuild(header_name, err) } } diff --git a/lib/executor/src/headers/plan.rs b/lib/executor/src/headers/plan.rs index a1a3cf990..69e0edffd 100644 --- a/lib/executor/src/headers/plan.rs +++ b/lib/executor/src/headers/plan.rs @@ -1,7 +1,7 @@ use ahash::HashMap; +use hive_router_config::primitives::expression::Expression; use http::{HeaderName, HeaderValue}; use regex_automata::meta::Regex; -use vrl::compiler::Program as VrlProgram; #[derive(Clone)] pub struct HeaderRulesPlan { @@ -62,13 +62,13 @@ pub struct ResponseInsertStatic { #[derive(Clone)] pub struct RequestInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, } #[derive(Clone)] pub struct ResponseInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, pub strategy: HeaderAggregationStrategy, } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 637ab0d58..925789367 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -1,12 +1,4 @@ -use std::collections::BTreeMap; - use http::HeaderMap; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -174,17 +166,7 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = self.expression.execute(ctx.into()).map_err(|err| { HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 6a5c34444..5f7c895bd 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, iter::once}; +use std::iter::once; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -17,12 +17,6 @@ use crate::{ use super::sanitizer::is_never_join_header; use http::{header::InvalidHeaderValue, HeaderMap, HeaderName, HeaderValue}; -use vrl::{ - compiler::TargetValue as VrlTargetValue, - core::Value as VrlValue, - prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone}, - value::Secrets as VrlSecrets, -}; pub fn apply_subgraph_response_headers( header_rule_plan: &HeaderRulesPlan, @@ -194,17 +188,7 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - - let mut target = VrlTargetValue { - value: ctx.into(), - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); - let value = self.expression.resolve(&mut ctx).map_err(|err| { + let value = self.expression.execute(ctx.into()).map_err(|err| { HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) })?; diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 8fc448071..78589a094 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -23,6 +23,7 @@ http = { workspace = true } jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } +vrl = { workspace = true } schemars = "1.0.4" humantime-serde = "1.1.1" diff --git a/lib/router-config/src/headers.rs b/lib/router-config/src/headers.rs index bd0913f6f..8d30675b7 100644 --- a/lib/router-config/src/headers.rs +++ b/lib/router-config/src/headers.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::expression::Expression; + type HeaderName = String; type RegExp = String; @@ -248,7 +250,7 @@ pub enum InsertSource { /// name: x-auth-scheme /// expression: 'split(.request.headers.authorization, " ")[0] ?? "none"' /// ``` - Expression { expression: String }, + Expression(Expression), } /// Helper to allow `one` or `many` values for ergonomics (OR semantics). diff --git a/lib/router-config/src/lib.rs b/lib/router-config/src/lib.rs index 537244c9e..de74d754d 100644 --- a/lib/router-config/src/lib.rs +++ b/lib/router-config/src/lib.rs @@ -29,7 +29,7 @@ use crate::{ primitives::file_path::with_start_path, query_planner::QueryPlannerConfig, supergraph::SupergraphSource, - traffic_shaping::TrafficShapingExecutorConfig, + traffic_shaping::TrafficShapingConfig, }; #[derive(Debug, Deserialize, Serialize, JsonSchema)] @@ -62,9 +62,9 @@ pub struct HiveRouterConfig { #[serde(default)] pub http: HttpServerConfig, - /// Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. + /// Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. #[serde(default)] - pub traffic_shaping: TrafficShapingExecutorConfig, + pub traffic_shaping: TrafficShapingConfig, /// Configuration for the headers. #[serde(default)] diff --git a/lib/router-config/src/override_subgraph_urls.rs b/lib/router-config/src/override_subgraph_urls.rs index ddacfe044..dc32bb783 100644 --- a/lib/router-config/src/override_subgraph_urls.rs +++ b/lib/router-config/src/override_subgraph_urls.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::expression::Expression; + /// Configuration for how the Router should override subgraph URLs. /// This can be used to point to different subgraph endpoints based on environment, /// or to use dynamic expressions to determine the URL at runtime. @@ -52,7 +54,7 @@ pub enum UrlOrExpression { /// A static URL string. Url(String), /// A dynamic value computed by a VRL expression. - Expression { expression: String }, + Expression(Expression), } fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { @@ -75,9 +77,7 @@ fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { subgraphs.insert( "products".to_string(), PerSubgraphConfig { - url: UrlOrExpression::Expression { - expression: expression.to_string(), - }, + url: UrlOrExpression::Expression(expression.to_string().try_into().unwrap()), }, ); diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs new file mode 100644 index 000000000..2df64b768 --- /dev/null +++ b/lib/router-config/src/primitives/expression.rs @@ -0,0 +1,91 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use schemars::{JsonSchema, Schema, SchemaGenerator}; +use serde::{Deserialize, Serialize}; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +#[derive(Debug, Clone)] +pub struct Expression { + expression: String, + program: Box, +} + +impl Expression { + pub fn try_new(expression: String) -> Result { + let vrl_functions = vrl_build_functions(); + + let compilation_result = + vrl_compile(&expression, &vrl_functions).map_err(|diagnostics| { + diagnostics + .errors() + .into_iter() + .map(|d| d.code.to_string() + ": " + &d.message) + .collect::>() + .join(", ") + })?; + + Ok(Self { + expression, + program: Box::new(compilation_result.program), + }) + } + + pub fn execute(&self, value: VrlValue) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let timezone = VrlTimeZone::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + + self.program.resolve(&mut ctx) + } +} + +impl<'de> Deserialize<'de> for Expression { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let expression = String::deserialize(deserializer)?; + Expression::try_new(expression).map_err(serde::de::Error::custom) + } +} + +impl Serialize for Expression { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.expression) + } +} + +impl JsonSchema for Expression { + fn schema_name() -> Cow<'static, str> { + "Expression".into() + } + + fn json_schema(gen: &mut SchemaGenerator) -> Schema { + String::json_schema(gen) + } +} + +impl TryFrom for Expression { + type Error = String; + fn try_from(value: String) -> Result { + Expression::try_new(value) + } +} diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index 972582fc3..aa591928b 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -1,3 +1,4 @@ +pub mod expression; pub mod file_path; pub mod http_header; pub mod retry_policy; diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 02ed5ecdd..58dd7e049 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] -pub struct TrafficShapingExecutorConfig { +pub struct TrafficShapingConfig { /// Limits the concurrent amount of requests/connections per host/subgraph. #[serde(default = "default_max_connections_per_host")] pub max_connections_per_host: usize, @@ -20,7 +20,7 @@ pub struct TrafficShapingExecutorConfig { pub dedupe_enabled: bool, } -impl Default for TrafficShapingExecutorConfig { +impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), From 383f35da123ff7be38cda105249baa2bc280c047 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:24:59 +0300 Subject: [PATCH 02/13] Fix --- Cargo.lock | 1 + .../src/pipeline/progressive_override.rs | 29 +++----- lib/executor/src/executors/error.rs | 17 +---- lib/executor/src/executors/map.rs | 2 +- lib/executor/src/headers/request.rs | 12 +++- lib/executor/src/headers/response.rs | 9 ++- lib/router-config/Cargo.toml | 1 + lib/router-config/src/override_labels.rs | 7 +- .../src/primitives/expression.rs | 68 +++++++++++++++---- 9 files changed, 85 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d555e8839..22d4f57a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2098,6 +2098,7 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", + "once_cell", "retry-policies", "schemars 1.0.5", "serde", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index d0b09c183..eae790709 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; +use hive_router_config::{ + override_labels::{LabelOverrideValue, OverrideLabelsConfig}, + primitives::expression::Expression, +}; use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, @@ -8,13 +11,12 @@ use hive_router_query_planner::{ }; use rand::Rng; use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, TimeZone as VrlTimeZone, }, - stdlib::all as vrl_build_functions, value::Secrets as VrlSecrets, }; @@ -117,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -126,27 +128,14 @@ impl OverrideLabelsEvaluator { ) -> Result { let mut static_enabled_labels = HashSet::new(); let mut expressions = HashMap::new(); - let vrl_functions = vrl_build_functions(); for (label, value) in override_labels_config.iter() { match value { LabelOverrideValue::Boolean(true) => { static_enabled_labels.insert(label.clone()); } - LabelOverrideValue::Expression { expression } => { - let compilation_result = - vrl_compile(expression, &vrl_functions).map_err(|diagnostics| { - OverrideLabelsCompileError { - label: label.clone(), - error: diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - } - })?; - expressions.insert(label.clone(), compilation_result.program); + LabelOverrideValue::Expression(expression) => { + expressions.insert(label.clone(), expression.clone()); } _ => {} // Skip false booleans } @@ -179,7 +168,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.resolve(&mut ctx) { + match expression.execute_with_context(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 2234f524c..de69d47cb 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,4 +1,4 @@ -use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError}; +use vrl::prelude::ExpressionError; use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions}; @@ -34,21 +34,6 @@ impl From for GraphQLError { } impl SubgraphExecutorError { - pub fn new_endpoint_expression_build( - subgraph_name: String, - diagnostics: DiagnosticList, - ) -> Self { - SubgraphExecutorError::EndpointExpressionBuild( - subgraph_name, - diagnostics - .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) - .collect::>() - .join(", "), - ) - } - pub fn new_endpoint_expression_resolution_failure( subgraph_name: String, error: ExpressionError, diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 0c25544d3..e4e858d96 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -190,7 +190,7 @@ impl SubgraphExecutorMap { ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.execute(value).map_err(|err| { + let endpoint_result = expression.execute_with_value(value).map_err(|err| { SubgraphExecutorError::new_endpoint_expression_resolution_failure( subgraph_name.to_string(), err, diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index 925789367..aa3960c05 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -166,9 +166,15 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self.expression.execute(ctx.into()).map_err(|err| { - HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) - })?; + let value = self + .expression + .execute_with_value(ctx.into()) + .map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation( + self.name.to_string(), + Box::new(err), + ) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { if is_never_join_header(&self.name) { diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 5f7c895bd..3c2085644 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -188,9 +188,12 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self.expression.execute(ctx.into()).map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) - })?; + let value = self + .expression + .execute_with_value(ctx.into()) + .map_err(|err| { + HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 78589a094..8ade7d01c 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -25,6 +25,7 @@ retry-policies = { workspace = true} tracing = { workspace = true } vrl = { workspace = true } +once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index b3dc01a75..a7006f8b1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,6 +2,8 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use crate::primitives::expression::Expression; + /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -15,10 +17,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { - /// An expression that must evaluate to a boolean. If true, the label will be applied. - expression: String, - }, + Expression(Expression), } impl LabelOverrideValue { diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index 2df64b768..b2bd226ac 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,12 +1,12 @@ -use std::{borrow::Cow, collections::BTreeMap}; - +use once_cell::sync::Lazy; use schemars::{JsonSchema, Schema, SchemaGenerator}; use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, core::Value as VrlValue, prelude::{ - state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, TimeZone as VrlTimeZone, }, stdlib::all as vrl_build_functions, @@ -19,16 +19,17 @@ pub struct Expression { program: Box, } +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + impl Expression { pub fn try_new(expression: String) -> Result { - let vrl_functions = vrl_build_functions(); - let compilation_result = - vrl_compile(&expression, &vrl_functions).map_err(|diagnostics| { + vrl_compile(&expression, &VRL_FUNCTIONS).map_err(|diagnostics| { diagnostics .errors() - .into_iter() - .map(|d| d.code.to_string() + ": " + &d.message) + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) .collect::>() .join(", ") })?; @@ -39,7 +40,7 @@ impl Expression { }) } - pub fn execute(&self, value: VrlValue) -> Result { + pub fn execute_with_value(&self, value: VrlValue) -> Result { let mut target = VrlTargetValue { value, metadata: VrlValue::Object(BTreeMap::new()), @@ -47,10 +48,13 @@ impl Expression { }; let mut state = VrlState::default(); - let timezone = VrlTimeZone::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + self.execute_with_context(&mut ctx) + } - self.program.resolve(&mut ctx) + pub fn execute_with_context(&self, ctx: &mut VrlContext) -> Result { + self.program.resolve(ctx) } } @@ -59,8 +63,44 @@ impl<'de> Deserialize<'de> for Expression { where D: serde::Deserializer<'de>, { - let expression = String::deserialize(deserializer)?; - Expression::try_new(expression).map_err(serde::de::Error::custom) + struct ExpressionVisitor; + impl<'de> serde::de::Visitor<'de> for ExpressionVisitor { + type Value = Expression; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a map for Expression") + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut expression_str: Option = None; + + while let Some(key) = map.next_key::()? { + match key.as_str() { + "expression" => { + if expression_str.is_some() { + return Err(serde::de::Error::duplicate_field("expression")); + } + expression_str = Some(map.next_value()?); + } + other_key => { + return Err(serde::de::Error::unknown_field( + other_key, + &["expression"], + )); + } + } + } + + let expression_str = + expression_str.ok_or_else(|| serde::de::Error::missing_field("expression"))?; + + Expression::try_new(expression_str).map_err(serde::de::Error::custom) + } + } + deserializer.deserialize_map(ExpressionVisitor) } } From 21daa6c0d5181545b406a0481454ac444be8b60f Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:36:56 +0300 Subject: [PATCH 03/13] Readme --- docs/README.md | 12 +++++----- .../src/primitives/expression.rs | 23 +++++++++++++++---- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/docs/README.md b/docs/README.md index fb474b9d2..1b0c8e40e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| **Additional Properties:** not allowed **Example** @@ -641,7 +641,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes|
@@ -863,7 +863,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1116,7 +1116,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1338,7 +1338,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`||yes| +|**expression**|`string`|The VRL expression string.
|yes| @@ -1808,7 +1808,7 @@ Request timeout for the Hive Console CDN requests. ## traffic\_shaping: object -Configuration for the traffic-shaper executor. Use these configurations to control how requests are being executed to subgraphs. +Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs. **Properties** diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index b2bd226ac..44da62957 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use schemars::{JsonSchema, Schema, SchemaGenerator}; -use serde::{Deserialize, Serialize}; +use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema}; +use serde::{Deserialize, Serialize, ser::SerializeStruct}; use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, @@ -109,7 +109,9 @@ impl Serialize for Expression { where S: serde::Serializer, { - serializer.serialize_str(&self.expression) + let mut state = serializer.serialize_struct("Expression", 1)?; + state.serialize_field("expression", &self.expression)?; + state.end() } } @@ -118,8 +120,19 @@ impl JsonSchema for Expression { "Expression".into() } - fn json_schema(gen: &mut SchemaGenerator) -> Schema { - String::json_schema(gen) + fn json_schema(_gen: &mut SchemaGenerator) -> Schema { + json_schema!({ + "type": "object", + "description": "A VRL expression used for dynamic evaluations.", + "properties": { + "expression": { + "type": "string", + "description": "The VRL expression string." + } + }, + "required": ["expression"], + "additionalProperties": false + }) } } From 284bff1f8a07c6ef6edae75f0f5e8003799a97c7 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 14:38:14 +0300 Subject: [PATCH 04/13] Format --- lib/router-config/src/primitives/expression.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs index 44da62957..5b60113b7 100644 --- a/lib/router-config/src/primitives/expression.rs +++ b/lib/router-config/src/primitives/expression.rs @@ -1,6 +1,6 @@ use once_cell::sync::Lazy; -use schemars::{JsonSchema, Schema, SchemaGenerator, json_schema}; -use serde::{Deserialize, Serialize, ser::SerializeStruct}; +use schemars::{json_schema, JsonSchema, Schema, SchemaGenerator}; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; use std::{borrow::Cow, collections::BTreeMap}; use vrl::{ compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, From 13cd7b6d221ded1f2ae75205b142495996d74c57 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 31 Oct 2025 18:01:37 +0300 Subject: [PATCH 05/13] Revert --- Cargo.lock | 4 +- .../src/pipeline/progressive_override.rs | 22 ++- docs/README.md | 8 +- lib/executor/Cargo.toml | 2 + lib/executor/src/executors/map.rs | 31 ++-- lib/executor/src/headers/compile.rs | 34 +++-- lib/executor/src/headers/plan.rs | 6 +- lib/executor/src/headers/request.rs | 13 +- lib/executor/src/headers/response.rs | 11 +- lib/executor/src/utils/expression.rs | 49 ++++++ lib/executor/src/utils/mod.rs | 1 + lib/router-config/Cargo.toml | 2 - lib/router-config/src/headers.rs | 4 +- lib/router-config/src/override_labels.rs | 4 +- .../src/override_subgraph_urls.rs | 8 +- .../src/primitives/expression.rs | 144 ------------------ lib/router-config/src/primitives/mod.rs | 1 - 17 files changed, 128 insertions(+), 216 deletions(-) create mode 100644 lib/executor/src/utils/expression.rs delete mode 100644 lib/router-config/src/primitives/expression.rs diff --git a/Cargo.lock b/Cargo.lock index 22d4f57a6..5eecc18e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2098,14 +2098,12 @@ dependencies = [ "http", "humantime-serde", "jsonwebtoken", - "once_cell", "retry-policies", "schemars 1.0.5", "serde", "serde_json", "thiserror 2.0.17", "tracing", - "vrl", ] [[package]] @@ -2132,9 +2130,11 @@ dependencies = [ "insta", "itoa", "ntex-http", + "once_cell", "ordered-float", "regex-automata", "ryu", + "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/bin/router/src/pipeline/progressive_override.rs b/bin/router/src/pipeline/progressive_override.rs index eae790709..0c2ad9c15 100644 --- a/bin/router/src/pipeline/progressive_override.rs +++ b/bin/router/src/pipeline/progressive_override.rs @@ -1,16 +1,16 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use hive_router_config::{ - override_labels::{LabelOverrideValue, OverrideLabelsConfig}, - primitives::expression::Expression, +use hive_router_config::override_labels::{LabelOverrideValue, OverrideLabelsConfig}; +use hive_router_plan_executor::{ + execution::client_request_details::ClientRequestDetails, utils::expression::compile_expression, }; -use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails; use hive_router_query_planner::{ graph::{PlannerOverrideContext, PERCENTAGE_SCALE_FACTOR}, state::supergraph_state::SupergraphState, }; use rand::Rng; use vrl::{ + compiler::Program as VrlProgram, compiler::TargetValue as VrlTargetValue, core::Value as VrlValue, prelude::{ @@ -119,7 +119,7 @@ impl StableOverrideContext { /// It's intended to be used as a shared state in the router. pub struct OverrideLabelsEvaluator { static_enabled_labels: HashSet, - expressions: HashMap, + expressions: HashMap, } impl OverrideLabelsEvaluator { @@ -134,8 +134,14 @@ impl OverrideLabelsEvaluator { LabelOverrideValue::Boolean(true) => { static_enabled_labels.insert(label.clone()); } - LabelOverrideValue::Expression(expression) => { - expressions.insert(label.clone(), expression.clone()); + LabelOverrideValue::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + OverrideLabelsCompileError { + label: label.clone(), + error: err.to_string(), + } + })?; + expressions.insert(label.clone(), program); } _ => {} // Skip false booleans } @@ -168,7 +174,7 @@ impl OverrideLabelsEvaluator { let mut ctx = VrlContext::new(&mut target, &mut state, &timezone); for (label, expression) in &self.expressions { - match expression.execute_with_context(&mut ctx) { + match expression.resolve(&mut ctx) { Ok(evaluated_value) => match evaluated_value { VrlValue::Boolean(true) => { active_flags.insert(label.clone()); diff --git a/docs/README.md b/docs/README.md index 1b0c8e40e..a01d56293 100644 --- a/docs/README.md +++ b/docs/README.md @@ -641,7 +641,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -863,7 +863,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -1116,7 +1116,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| @@ -1338,7 +1338,7 @@ For more information on the available functions and syntax, see the |Name|Type|Description|Required| |----|----|-----------|--------| -|**expression**|`string`|The VRL expression string.
|yes| +|**expression**|`string`||yes| diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 27f7af1bf..a47a32188 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -49,6 +49,8 @@ itoa = "1.0.15" ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" +once_cell = "1.21.3" +schemars = "1.0.4" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index e4e858d96..93247ec80 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -6,9 +6,7 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; use dashmap::DashMap; -use hive_router_config::{ - override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig, -}; +use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig}; use http::Uri; use hyper_tls::HttpsConnector; use hyper_util::{ @@ -17,7 +15,7 @@ use hyper_util::{ }; use tokio::sync::{OnceCell, Semaphore}; use tracing::error; -use vrl::core::Value as VrlValue; +use vrl::{compiler::Program as VrlProgram, core::Value as VrlValue}; use crate::{ execution::client_request_details::ClientRequestDetails, @@ -30,6 +28,7 @@ use crate::{ http::{HTTPSubgraphExecutor, HttpClient}, }, response::graphql_error::GraphQLError, + utils::expression::{compile_expression, execute_expression_with_value}, }; type SubgraphName = String; @@ -37,7 +36,7 @@ type SubgraphEndpoint = String; type ExecutorsBySubgraphMap = DashMap>; type EndpointsBySubgraphMap = DashMap; -type ExpressionsBySubgraphMap = HashMap; +type ExpressionsBySubgraphMap = HashMap; pub struct SubgraphExecutorMap { executors_by_subgraph: ExecutorsBySubgraphMap, @@ -91,7 +90,7 @@ impl SubgraphExecutorMap { let endpoint_str = match endpoint_str { Some(UrlOrExpression::Url(url)) => url, - Some(UrlOrExpression::Expression(expression)) => { + Some(UrlOrExpression::Expression { expression }) => { subgraph_executor_map.register_expression(&subgraph_name, expression)?; &original_endpoint_str } @@ -190,12 +189,13 @@ impl SubgraphExecutorMap { ])); // Resolve the expression to get an endpoint URL. - let endpoint_result = expression.execute_with_value(value).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; + let endpoint_result = + execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; let endpoint_str = match endpoint_result.as_str() { Some(s) => s.to_string(), None => { @@ -249,10 +249,13 @@ impl SubgraphExecutorMap { fn register_expression( &mut self, subgraph_name: &str, - expression: &Expression, + expression: &str, ) -> Result<(), SubgraphExecutorError> { + let program = compile_expression(expression, None).map_err(|err| { + SubgraphExecutorError::EndpointExpressionBuild(subgraph_name.to_string(), err) + })?; self.expressions_by_subgraph - .insert(subgraph_name.to_string(), expression.clone()); + .insert(subgraph_name.to_string(), program); Ok(()) } diff --git a/lib/executor/src/headers/compile.rs b/lib/executor/src/headers/compile.rs index 1cf29c8c9..587cbee60 100644 --- a/lib/executor/src/headers/compile.rs +++ b/lib/executor/src/headers/compile.rs @@ -1,12 +1,16 @@ -use crate::headers::{ - errors::HeaderRuleCompileError, - plan::{ - HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, - RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, RequestPropagateRegex, - RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, ResponseHeaderRules, - ResponseInsertExpression, ResponseInsertStatic, ResponsePropagateNamed, - ResponsePropagateRegex, ResponseRemoveNamed, ResponseRemoveRegex, +use crate::{ + headers::{ + errors::HeaderRuleCompileError, + plan::{ + HeaderAggregationStrategy, HeaderRulesPlan, RequestHeaderRule, RequestHeaderRules, + RequestInsertExpression, RequestInsertStatic, RequestPropagateNamed, + RequestPropagateRegex, RequestRemoveNamed, RequestRemoveRegex, ResponseHeaderRule, + ResponseHeaderRules, ResponseInsertExpression, ResponseInsertStatic, + ResponsePropagateNamed, ResponsePropagateRegex, ResponseRemoveNamed, + ResponseRemoveRegex, + }, }, + utils::expression::compile_expression, }; use hive_router_config::headers as config; @@ -48,11 +52,14 @@ impl HeaderRuleCompiler> for config::RequestHeaderRule { value: build_header_value(&rule.name, value)?, })); } - config::InsertSource::Expression(expression) => { + config::InsertSource::Expression { expression } => { + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(RequestHeaderRule::InsertExpression( RequestInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(expression.clone()), + expression: Box::new(program), }, )); } @@ -114,16 +121,19 @@ impl HeaderRuleCompiler> for config::ResponseHeaderRule strategy: aggregation_strategy, })); } - config::InsertSource::Expression(expression) => { + config::InsertSource::Expression { expression } => { // NOTE: In case we ever need to improve performance and not pass the whole context // to VRL expressions, we can use: // - compilation_result.program.info().target_assignments // - compilation_result.program.info().target_queries // to determine what parts of the context are actually needed by the expression + let program = compile_expression(expression, None).map_err(|err| { + HeaderRuleCompileError::ExpressionBuild(rule.name.clone(), err) + })?; actions.push(ResponseHeaderRule::InsertExpression( ResponseInsertExpression { name: build_header_name(&rule.name)?, - expression: Box::new(expression.clone()), + expression: Box::new(program), strategy: aggregation_strategy, }, )); diff --git a/lib/executor/src/headers/plan.rs b/lib/executor/src/headers/plan.rs index 69e0edffd..a1a3cf990 100644 --- a/lib/executor/src/headers/plan.rs +++ b/lib/executor/src/headers/plan.rs @@ -1,7 +1,7 @@ use ahash::HashMap; -use hive_router_config::primitives::expression::Expression; use http::{HeaderName, HeaderValue}; use regex_automata::meta::Regex; +use vrl::compiler::Program as VrlProgram; #[derive(Clone)] pub struct HeaderRulesPlan { @@ -62,13 +62,13 @@ pub struct ResponseInsertStatic { #[derive(Clone)] pub struct RequestInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, } #[derive(Clone)] pub struct ResponseInsertExpression { pub name: HeaderName, - pub expression: Box, + pub expression: Box, pub strategy: HeaderAggregationStrategy, } diff --git a/lib/executor/src/headers/request.rs b/lib/executor/src/headers/request.rs index aa3960c05..ec22de509 100644 --- a/lib/executor/src/headers/request.rs +++ b/lib/executor/src/headers/request.rs @@ -11,6 +11,7 @@ use crate::{ }, sanitizer::{is_denied_header, is_never_join_header}, }, + utils::expression::execute_expression_with_value, }; pub fn modify_subgraph_request_headers( @@ -166,15 +167,9 @@ impl ApplyRequestHeader for RequestInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self - .expression - .execute_with_value(ctx.into()) - .map_err(|err| { - HeaderRuleRuntimeError::new_expression_evaluation( - self.name.to_string(), - Box::new(err), - ) - })?; + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { if is_never_join_header(&self.name) { diff --git a/lib/executor/src/headers/response.rs b/lib/executor/src/headers/response.rs index 3c2085644..9cf45a768 100644 --- a/lib/executor/src/headers/response.rs +++ b/lib/executor/src/headers/response.rs @@ -13,6 +13,7 @@ use crate::{ }, sanitizer::is_denied_header, }, + utils::expression::execute_expression_with_value, }; use super::sanitizer::is_never_join_header; @@ -188,13 +189,9 @@ impl ApplyResponseHeader for ResponseInsertExpression { if is_denied_header(&self.name) { return Ok(()); } - let value = self - .expression - .execute_with_value(ctx.into()) - .map_err(|err| { - HeaderRuleRuntimeError::ExpressionEvaluation(self.name.to_string(), Box::new(err)) - })?; - + let value = execute_expression_with_value(&self.expression, ctx.into()).map_err(|err| { + HeaderRuleRuntimeError::new_expression_evaluation(self.name.to_string(), Box::new(err)) + })?; if let Some(header_value) = vrl_value_to_header_value(value) { let strategy = if is_never_join_header(&self.name) { HeaderAggregationStrategy::Append diff --git a/lib/executor/src/utils/expression.rs b/lib/executor/src/utils/expression.rs new file mode 100644 index 000000000..faee2dc3b --- /dev/null +++ b/lib/executor/src/utils/expression.rs @@ -0,0 +1,49 @@ +use once_cell::sync::Lazy; +use std::collections::BTreeMap; +use vrl::{ + compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, + core::Value as VrlValue, + prelude::{ + state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, + TimeZone as VrlTimeZone, + }, + stdlib::all as vrl_build_functions, + value::Secrets as VrlSecrets, +}; + +static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); +static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); + +pub fn compile_expression( + expression: &str, + functions: Option<&[Box]>, +) -> Result { + let functions = functions.unwrap_or(&VRL_FUNCTIONS); + + let compilation_result = vrl_compile(expression, functions).map_err(|diagnostics| { + diagnostics + .errors() + .iter() + .map(|d| format!("{}: {}", d.code, d.message)) + .collect::>() + .join(", ") + })?; + + Ok(compilation_result.program) +} + +pub fn execute_expression_with_value( + program: &VrlProgram, + value: VrlValue, +) -> Result { + let mut target = VrlTargetValue { + value, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), + }; + + let mut state = VrlState::default(); + let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); + + program.resolve(&mut ctx) +} diff --git a/lib/executor/src/utils/mod.rs b/lib/executor/src/utils/mod.rs index fc4226984..0461bb8a8 100644 --- a/lib/executor/src/utils/mod.rs +++ b/lib/executor/src/utils/mod.rs @@ -1,3 +1,4 @@ pub mod consts; +pub mod expression; pub mod traverse; pub mod vrl; diff --git a/lib/router-config/Cargo.toml b/lib/router-config/Cargo.toml index 8ade7d01c..8fc448071 100644 --- a/lib/router-config/Cargo.toml +++ b/lib/router-config/Cargo.toml @@ -23,9 +23,7 @@ http = { workspace = true } jsonwebtoken = { workspace = true } retry-policies = { workspace = true} tracing = { workspace = true } -vrl = { workspace = true } -once_cell = "1.21.3" schemars = "1.0.4" humantime-serde = "1.1.1" config = { version = "0.15.14", features = ["yaml", "json", "json5"] } diff --git a/lib/router-config/src/headers.rs b/lib/router-config/src/headers.rs index 8d30675b7..bd0913f6f 100644 --- a/lib/router-config/src/headers.rs +++ b/lib/router-config/src/headers.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::primitives::expression::Expression; - type HeaderName = String; type RegExp = String; @@ -250,7 +248,7 @@ pub enum InsertSource { /// name: x-auth-scheme /// expression: 'split(.request.headers.authorization, " ")[0] ?? "none"' /// ``` - Expression(Expression), + Expression { expression: String }, } /// Helper to allow `one` or `many` values for ergonomics (OR semantics). diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index a7006f8b1..cbb7f28d1 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -2,8 +2,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::primitives::expression::Expression; - /// A map of label names to their override configuration. pub type OverrideLabelsConfig = HashMap; @@ -17,7 +15,7 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression(Expression), + Expression { expression: String }, } impl LabelOverrideValue { diff --git a/lib/router-config/src/override_subgraph_urls.rs b/lib/router-config/src/override_subgraph_urls.rs index dc32bb783..ddacfe044 100644 --- a/lib/router-config/src/override_subgraph_urls.rs +++ b/lib/router-config/src/override_subgraph_urls.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::primitives::expression::Expression; - /// Configuration for how the Router should override subgraph URLs. /// This can be used to point to different subgraph endpoints based on environment, /// or to use dynamic expressions to determine the URL at runtime. @@ -54,7 +52,7 @@ pub enum UrlOrExpression { /// A static URL string. Url(String), /// A dynamic value computed by a VRL expression. - Expression(Expression), + Expression { expression: String }, } fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { @@ -77,7 +75,9 @@ fn override_subgraph_urls_example_1() -> OverrideSubgraphUrlsConfig { subgraphs.insert( "products".to_string(), PerSubgraphConfig { - url: UrlOrExpression::Expression(expression.to_string().try_into().unwrap()), + url: UrlOrExpression::Expression { + expression: expression.to_string(), + }, }, ); diff --git a/lib/router-config/src/primitives/expression.rs b/lib/router-config/src/primitives/expression.rs deleted file mode 100644 index 5b60113b7..000000000 --- a/lib/router-config/src/primitives/expression.rs +++ /dev/null @@ -1,144 +0,0 @@ -use once_cell::sync::Lazy; -use schemars::{json_schema, JsonSchema, Schema, SchemaGenerator}; -use serde::{ser::SerializeStruct, Deserialize, Serialize}; -use std::{borrow::Cow, collections::BTreeMap}; -use vrl::{ - compiler::{compile as vrl_compile, Program as VrlProgram, TargetValue as VrlTargetValue}, - core::Value as VrlValue, - prelude::{ - state::RuntimeState as VrlState, Context as VrlContext, ExpressionError, Function, - TimeZone as VrlTimeZone, - }, - stdlib::all as vrl_build_functions, - value::Secrets as VrlSecrets, -}; - -#[derive(Debug, Clone)] -pub struct Expression { - expression: String, - program: Box, -} - -static VRL_FUNCTIONS: Lazy>> = Lazy::new(vrl_build_functions); -static VRL_TIMEZONE: Lazy = Lazy::new(VrlTimeZone::default); - -impl Expression { - pub fn try_new(expression: String) -> Result { - let compilation_result = - vrl_compile(&expression, &VRL_FUNCTIONS).map_err(|diagnostics| { - diagnostics - .errors() - .iter() - .map(|d| format!("{}: {}", d.code, d.message)) - .collect::>() - .join(", ") - })?; - - Ok(Self { - expression, - program: Box::new(compilation_result.program), - }) - } - - pub fn execute_with_value(&self, value: VrlValue) -> Result { - let mut target = VrlTargetValue { - value, - metadata: VrlValue::Object(BTreeMap::new()), - secrets: VrlSecrets::default(), - }; - - let mut state = VrlState::default(); - let mut ctx = VrlContext::new(&mut target, &mut state, &VRL_TIMEZONE); - - self.execute_with_context(&mut ctx) - } - - pub fn execute_with_context(&self, ctx: &mut VrlContext) -> Result { - self.program.resolve(ctx) - } -} - -impl<'de> Deserialize<'de> for Expression { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct ExpressionVisitor; - impl<'de> serde::de::Visitor<'de> for ExpressionVisitor { - type Value = Expression; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a map for Expression") - } - - fn visit_map(self, mut map: A) -> Result - where - A: serde::de::MapAccess<'de>, - { - let mut expression_str: Option = None; - - while let Some(key) = map.next_key::()? { - match key.as_str() { - "expression" => { - if expression_str.is_some() { - return Err(serde::de::Error::duplicate_field("expression")); - } - expression_str = Some(map.next_value()?); - } - other_key => { - return Err(serde::de::Error::unknown_field( - other_key, - &["expression"], - )); - } - } - } - - let expression_str = - expression_str.ok_or_else(|| serde::de::Error::missing_field("expression"))?; - - Expression::try_new(expression_str).map_err(serde::de::Error::custom) - } - } - deserializer.deserialize_map(ExpressionVisitor) - } -} - -impl Serialize for Expression { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let mut state = serializer.serialize_struct("Expression", 1)?; - state.serialize_field("expression", &self.expression)?; - state.end() - } -} - -impl JsonSchema for Expression { - fn schema_name() -> Cow<'static, str> { - "Expression".into() - } - - fn json_schema(_gen: &mut SchemaGenerator) -> Schema { - json_schema!({ - "type": "object", - "description": "A VRL expression used for dynamic evaluations.", - "properties": { - "expression": { - "type": "string", - "description": "The VRL expression string." - } - }, - "required": ["expression"], - "additionalProperties": false - }) - } -} - -impl TryFrom for Expression { - type Error = String; - fn try_from(value: String) -> Result { - Expression::try_new(value) - } -} diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index aa591928b..972582fc3 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -1,4 +1,3 @@ -pub mod expression; pub mod file_path; pub mod http_header; pub mod retry_policy; From 1ce6fae9a223ade1924d7d1eb36971fa6a9e05f5 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 13:34:20 +0300 Subject: [PATCH 06/13] Remove extra dep and apply comments --- Cargo.lock | 1 - lib/executor/Cargo.toml | 1 - lib/router-config/src/override_labels.rs | 5 ++++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5eecc18e6..51f64278e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2134,7 +2134,6 @@ dependencies = [ "ordered-float", "regex-automata", "ryu", - "schemars 1.0.4", "serde", "sonic-rs", "strum 0.27.2", diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index a47a32188..7d5652524 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -50,7 +50,6 @@ ryu = "1.0.20" indexmap = "2.10.0" bumpalo = "3.19.0" once_cell = "1.21.3" -schemars = "1.0.4" [dev-dependencies] subgraphs = { path = "../../bench/subgraphs" } diff --git a/lib/router-config/src/override_labels.rs b/lib/router-config/src/override_labels.rs index cbb7f28d1..b3dc01a75 100644 --- a/lib/router-config/src/override_labels.rs +++ b/lib/router-config/src/override_labels.rs @@ -15,7 +15,10 @@ pub enum LabelOverrideValue { /// A static boolean value to enable or disable the label. Boolean(bool), /// A dynamic value computed by an expression. - Expression { expression: String }, + Expression { + /// An expression that must evaluate to a boolean. If true, the label will be applied. + expression: String, + }, } impl LabelOverrideValue { From e0ad7c4de81d8e85b8ad2dba8529d3cbf20b928c Mon Sep 17 00:00:00 2001 From: "knope-bot[bot]" <152252888+knope-bot[bot]@users.noreply.github.com> Date: Mon, 3 Nov 2025 10:40:23 +0000 Subject: [PATCH 07/13] Auto generate changeset --- .changeset/shared_utilities_to_handle_vrl_expressions.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/shared_utilities_to_handle_vrl_expressions.md diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md new file mode 100644 index 000000000..f1e52b5c2 --- /dev/null +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -0,0 +1,9 @@ +--- +default: patch +--- + +# Shared utilities to handle VRL expressions + +#540 by @ardatan + + From 01f66d7f5c3bf99afa5b8afe5593c4387c1c2a32 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:12:46 +0300 Subject: [PATCH 08/13] Duration --- lib/executor/src/executors/map.rs | 5 +---- lib/router-config/src/traffic_shaping.rs | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 93247ec80..fc3d6905a 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,7 +1,6 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, - time::Duration, }; use bytes::{BufMut, Bytes, BytesMut}; @@ -57,9 +56,7 @@ impl SubgraphExecutorMap { let https = HttpsConnector::new(); let client: HttpClient = Client::builder(TokioExecutor::new()) .pool_timer(TokioTimer::new()) - .pool_idle_timeout(Duration::from_secs( - config.traffic_shaping.pool_idle_timeout_seconds, - )) + .pool_idle_timeout(config.traffic_shaping.pool_idle_timeout) .pool_max_idle_per_host(config.traffic_shaping.max_connections_per_host) .build(https); diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 58dd7e049..95d824910 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -9,8 +11,13 @@ pub struct TrafficShapingConfig { pub max_connections_per_host: usize, /// Timeout for idle sockets being kept-alive. - #[serde(default = "default_pool_idle_timeout_seconds")] - pub pool_idle_timeout_seconds: u64, + #[serde( + default = "default_pool_idle_timeout", + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub pool_idle_timeout: Duration, /// Enables/disables request deduplication to subgraphs. /// @@ -24,7 +31,7 @@ impl Default for TrafficShapingConfig { fn default() -> Self { Self { max_connections_per_host: default_max_connections_per_host(), - pool_idle_timeout_seconds: default_pool_idle_timeout_seconds(), + pool_idle_timeout: default_pool_idle_timeout(), dedupe_enabled: default_dedupe_enabled(), } } @@ -34,8 +41,8 @@ fn default_max_connections_per_host() -> usize { 100 } -fn default_pool_idle_timeout_seconds() -> u64 { - 50 +fn default_pool_idle_timeout() -> Duration { + Duration::from_secs(50) } fn default_dedupe_enabled() -> bool { From 16311402790403937f18c21b75fc58d75d6de33f Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:19:01 +0300 Subject: [PATCH 09/13] Fix config' --- docs/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/README.md b/docs/README.md index a01d56293..0a2fc6727 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,7 +15,7 @@ |[**override\_subgraph\_urls**](#override_subgraph_urls)|`object`|Configuration for overriding subgraph URLs.
Default: `{}`
|| |[**query\_planner**](#query_planner)|`object`|Query planning configuration.
Default: `{"allow_expose":false,"timeout":"10s"}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout_seconds":50}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"dedupe_enabled":true,"max_connections_per_host":100,"pool_idle_timeout":"50s"}`
|| **Additional Properties:** not allowed **Example** @@ -109,7 +109,7 @@ supergraph: {} traffic_shaping: dedupe_enabled: true max_connections_per_host: 100 - pool_idle_timeout_seconds: 50 + pool_idle_timeout: 50s ``` @@ -1817,7 +1817,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations |----|----|-----------|--------| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| -|**pool\_idle\_timeout\_seconds**|`integer`|Timeout for idle sockets being kept-alive.
Default: `50`
Format: `"uint64"`
Minimum: `0`
|| +|**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| **Additional Properties:** not allowed **Example** @@ -1825,7 +1825,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations ```yaml dedupe_enabled: true max_connections_per_host: 100 -pool_idle_timeout_seconds: 50 +pool_idle_timeout: 50s ``` From f1f60baf59c700eae0ab68974ab0c505cd1e1b78 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 3 Nov 2025 15:52:58 +0300 Subject: [PATCH 10/13] Update traffic shaping configuration for VRL expressions Removed `pool_idle_timeout_seconds` from `traffic_shaping` and replaced it with `pool_idle_timeout` using duration format. --- .../shared_utilities_to_handle_vrl_expressions.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.changeset/shared_utilities_to_handle_vrl_expressions.md b/.changeset/shared_utilities_to_handle_vrl_expressions.md index f1e52b5c2..97c07aa3a 100644 --- a/.changeset/shared_utilities_to_handle_vrl_expressions.md +++ b/.changeset/shared_utilities_to_handle_vrl_expressions.md @@ -1,9 +1,15 @@ --- -default: patch +default: minor --- -# Shared utilities to handle VRL expressions +# Breaking -#540 by @ardatan +Removed `pool_idle_timeout_seconds` from `traffic_shaping`, instead use `pool_idle_timeout` with duration format. +```diff +traffic_shaping: +- pool_idle_timeout_seconds: 30 ++ pool_idle_timeout: 30s +``` +#540 by @ardatan From f19621e081f98adddf624ae0b5d46491daa14a7c Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 17:26:39 +0300 Subject: [PATCH 11/13] Avoid cloning endpoint multiple times --- lib/executor/src/executors/map.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index fc3d6905a..6c6d3c0c5 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -194,19 +194,17 @@ impl SubgraphExecutorMap { ) })?; let endpoint_str = match endpoint_result.as_str() { - Some(s) => s.to_string(), - None => { - return Err(SubgraphExecutorError::EndpointExpressionWrongType( - subgraph_name.to_string(), - )); - } - }; + Some(s) => Ok(s), + None => Err(SubgraphExecutorError::EndpointExpressionWrongType( + subgraph_name.to_string(), + )), + }?; // Check if an executor for this endpoint already exists. let existing_executor = self .executors_by_subgraph .get(subgraph_name) - .and_then(|endpoints| endpoints.get(&endpoint_str).map(|e| e.clone())); + .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); if let Some(executor) = existing_executor { return Ok(Some(executor)); @@ -219,7 +217,7 @@ impl SubgraphExecutorMap { .executors_by_subgraph .get(subgraph_name) .expect("Executor was just registered, should be present"); - return Ok(endpoints.get(&endpoint_str).map(|e| e.clone())); + return Ok(endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); } Ok(None) From 854aa50412add2c5c6fa075b006764da8e1bbee2 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 17:50:31 +0300 Subject: [PATCH 12/13] Simplify --- lib/executor/src/executors/map.rs | 123 +++++++++++++----------------- 1 file changed, 53 insertions(+), 70 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 6c6d3c0c5..026fef092 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -108,7 +108,7 @@ impl SubgraphExecutorMap { client_request: &ClientRequestDetails<'a, 'req>, ) -> HttpExecutionResponse { match self.get_or_create_executor(subgraph_name, client_request) { - Ok(Some(executor)) => executor.execute(execution_request).await, + Ok(executor) => executor.execute(execution_request).await, Err(err) => { error!( "Subgraph executor error for subgraph '{}': {}", @@ -116,13 +116,6 @@ impl SubgraphExecutorMap { ); self.internal_server_error_response(err.into(), subgraph_name) } - Ok(None) => { - error!( - "Subgraph executor not found for subgraph '{}'", - subgraph_name - ); - self.internal_server_error_response("Internal server error".into(), subgraph_name) - } } } @@ -151,15 +144,17 @@ impl SubgraphExecutorMap { &self, subgraph_name: &str, client_request: &ClientRequestDetails<'_, '_>, - ) -> Result, SubgraphExecutorError> { - let from_expression = - self.get_or_create_executor_from_expression(subgraph_name, client_request)?; - - if from_expression.is_some() { - return Ok(from_expression); - } - - Ok(self.get_executor_from_static_endpoint(subgraph_name)) + ) -> Result { + self.expressions_by_subgraph + .get(subgraph_name) + .map(|expression| { + self.get_or_create_executor_from_expression( + subgraph_name, + expression, + client_request, + ) + }) + .unwrap_or_else(|| self.get_executor_from_static_endpoint(subgraph_name)) } /// Looks up a subgraph executor, @@ -169,65 +164,50 @@ impl SubgraphExecutorMap { fn get_or_create_executor_from_expression( &self, subgraph_name: &str, + expression: &VrlProgram, client_request: &ClientRequestDetails<'_, '_>, - ) -> Result, SubgraphExecutorError> { - if let Some(expression) = self.expressions_by_subgraph.get(subgraph_name) { - let original_url_value = VrlValue::Bytes(Bytes::from( - self.static_endpoints_by_subgraph - .get(subgraph_name) - .map(|endpoint| endpoint.value().clone()) - .ok_or_else(|| { - SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) - })?, - )); - let value = VrlValue::Object(BTreeMap::from([ - ("request".into(), client_request.into()), - ("original_url".into(), original_url_value), - ])); - - // Resolve the expression to get an endpoint URL. - let endpoint_result = - execute_expression_with_value(expression, value).map_err(|err| { - SubgraphExecutorError::new_endpoint_expression_resolution_failure( - subgraph_name.to_string(), - err, - ) - })?; - let endpoint_str = match endpoint_result.as_str() { - Some(s) => Ok(s), - None => Err(SubgraphExecutorError::EndpointExpressionWrongType( - subgraph_name.to_string(), - )), - }?; - - // Check if an executor for this endpoint already exists. - let existing_executor = self - .executors_by_subgraph + ) -> Result { + let original_url_value = VrlValue::Bytes(Bytes::from( + self.static_endpoints_by_subgraph .get(subgraph_name) - .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); - - if let Some(executor) = existing_executor { - return Ok(Some(executor)); - } - + .map(|endpoint| endpoint.value().clone()) + .ok_or_else(|| { + SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) + })?, + )); + let value = VrlValue::Object(BTreeMap::from([ + ("request".into(), client_request.into()), + ("original_url".into(), original_url_value), + ])); + + // Resolve the expression to get an endpoint URL. + let endpoint_result = execute_expression_with_value(expression, value).map_err(|err| { + SubgraphExecutorError::new_endpoint_expression_resolution_failure( + subgraph_name.to_string(), + err, + ) + })?; + let endpoint_str = match endpoint_result.as_str() { + Some(s) => Ok(s), + None => Err(SubgraphExecutorError::EndpointExpressionWrongType( + subgraph_name.to_string(), + )), + }?; + + // Check if an executor for this endpoint already exists. + self.executors_by_subgraph + .get(subgraph_name) + .and_then(|endpoints| endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())) + .map(Ok) // If not, create and register a new one. - self.register_executor(subgraph_name, &endpoint_str)?; - - let endpoints = self - .executors_by_subgraph - .get(subgraph_name) - .expect("Executor was just registered, should be present"); - return Ok(endpoints.get(endpoint_str.as_ref()).map(|e| e.clone())); - } - - Ok(None) + .unwrap_or_else(|| self.register_executor(subgraph_name, endpoint_str.as_ref())) } /// Looks up a subgraph executor based on a static endpoint URL. fn get_executor_from_static_endpoint( &self, subgraph_name: &str, - ) -> Option { + ) -> Result { self.static_endpoints_by_subgraph .get(subgraph_name) .and_then(|endpoint_ref| { @@ -236,6 +216,7 @@ impl SubgraphExecutorMap { .get(subgraph_name) .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone())) }) + .ok_or_else(|| SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())) } /// Registers a VRL expression for the given subgraph name. @@ -269,7 +250,7 @@ impl SubgraphExecutorMap { &self, subgraph_name: &str, endpoint_str: &str, - ) -> Result<(), SubgraphExecutorError> { + ) -> Result { let endpoint_uri = endpoint_str.parse::().map_err(|e| { SubgraphExecutorError::EndpointParseFailure(endpoint_str.to_string(), e.to_string()) })?; @@ -302,11 +283,13 @@ impl SubgraphExecutorMap { self.in_flight_requests.clone(), ); + let executor_arc = executor.to_boxed_arc(); + self.executors_by_subgraph .entry(subgraph_name.to_string()) .or_default() - .insert(endpoint_str.to_string(), executor.to_boxed_arc()); + .insert(endpoint_str.to_string(), executor_arc.clone()); - Ok(()) + Ok(executor_arc) } } From 86508093fb5550af9e852a10b8833222aab99a84 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 5 Nov 2025 17:55:52 +0300 Subject: [PATCH 13/13] Less diff --- lib/executor/src/executors/map.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 026fef092..49b936f21 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -154,7 +154,12 @@ impl SubgraphExecutorMap { client_request, ) }) - .unwrap_or_else(|| self.get_executor_from_static_endpoint(subgraph_name)) + .unwrap_or_else(|| { + self.get_executor_from_static_endpoint(subgraph_name) + .ok_or_else(|| { + SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string()) + }) + }) } /// Looks up a subgraph executor, @@ -207,7 +212,7 @@ impl SubgraphExecutorMap { fn get_executor_from_static_endpoint( &self, subgraph_name: &str, - ) -> Result { + ) -> Option { self.static_endpoints_by_subgraph .get(subgraph_name) .and_then(|endpoint_ref| { @@ -216,7 +221,6 @@ impl SubgraphExecutorMap { .get(subgraph_name) .and_then(|endpoints| endpoints.get(endpoint_str).map(|e| e.clone())) }) - .ok_or_else(|| SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())) } /// Registers a VRL expression for the given subgraph name.