Skip to content

Commit bbba9d6

Browse files
committed
New Expression type to handle VRL in one place
1 parent 00084d2 commit bbba9d6

File tree

14 files changed

+147
-167
lines changed

14 files changed

+147
-167
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/executor/src/executors/map.rs

Lines changed: 13 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::{
66

77
use bytes::{BufMut, Bytes, BytesMut};
88
use dashmap::DashMap;
9-
use hive_router_config::{override_subgraph_urls::UrlOrExpression, HiveRouterConfig};
9+
use hive_router_config::{
10+
override_subgraph_urls::UrlOrExpression, primitives::expression::Expression, HiveRouterConfig,
11+
};
1012
use http::Uri;
1113
use hyper_tls::HttpsConnector;
1214
use hyper_util::{
@@ -15,16 +17,7 @@ use hyper_util::{
1517
};
1618
use tokio::sync::{OnceCell, Semaphore};
1719
use tracing::error;
18-
use vrl::{
19-
compiler::compile as vrl_compile,
20-
compiler::Program as VrlProgram,
21-
compiler::TargetValue as VrlTargetValue,
22-
core::Value as VrlValue,
23-
prelude::Function as VrlFunction,
24-
prelude::{state::RuntimeState as VrlState, Context as VrlContext, TimeZone as VrlTimeZone},
25-
stdlib::all as vrl_build_functions,
26-
value::Secrets as VrlSecrets,
27-
};
20+
use vrl::core::Value as VrlValue;
2821

2922
use crate::{
3023
execution::client_request_details::ClientRequestDetails,
@@ -44,7 +37,7 @@ type SubgraphEndpoint = String;
4437
type ExecutorsBySubgraphMap =
4538
DashMap<SubgraphName, DashMap<SubgraphEndpoint, SubgraphExecutorBoxedArc>>;
4639
type EndpointsBySubgraphMap = DashMap<SubgraphName, SubgraphEndpoint>;
47-
type ExpressionsBySubgraphMap = HashMap<SubgraphName, VrlProgram>;
40+
type ExpressionsBySubgraphMap = HashMap<SubgraphName, Expression>;
4841

4942
pub struct SubgraphExecutorMap {
5043
executors_by_subgraph: ExecutorsBySubgraphMap,
@@ -54,8 +47,6 @@ pub struct SubgraphExecutorMap {
5447
/// Mapping from subgraph name to VRL expression program
5548
expressions_by_subgraph: ExpressionsBySubgraphMap,
5649
config: Arc<HiveRouterConfig>,
57-
/// Precompiled VRL functions to be used in endpoint expressions.
58-
vrl_functions: Vec<Box<dyn VrlFunction>>,
5950
client: Arc<HttpClient>,
6051
semaphores_by_origin: DashMap<String, Arc<Semaphore>>,
6152
max_connections_per_host: usize,
@@ -80,7 +71,6 @@ impl SubgraphExecutorMap {
8071
static_endpoints_by_subgraph: Default::default(),
8172
expressions_by_subgraph: Default::default(),
8273
config,
83-
vrl_functions: vrl_build_functions(),
8474
client: Arc::new(client),
8575
semaphores_by_origin: Default::default(),
8676
max_connections_per_host,
@@ -101,7 +91,7 @@ impl SubgraphExecutorMap {
10191

10292
let endpoint_str = match endpoint_str {
10393
Some(UrlOrExpression::Url(url)) => url,
104-
Some(UrlOrExpression::Expression { expression }) => {
94+
Some(UrlOrExpression::Expression(expression)) => {
10595
subgraph_executor_map.register_expression(&subgraph_name, expression)?;
10696
&original_endpoint_str
10797
}
@@ -194,21 +184,13 @@ impl SubgraphExecutorMap {
194184
SubgraphExecutorError::StaticEndpointNotFound(subgraph_name.to_string())
195185
})?,
196186
));
197-
let mut target = VrlTargetValue {
198-
value: VrlValue::Object(BTreeMap::from([
199-
("request".into(), client_request.into()),
200-
("original_url".into(), original_url_value),
201-
])),
202-
metadata: VrlValue::Object(BTreeMap::new()),
203-
secrets: VrlSecrets::default(),
204-
};
205-
206-
let mut state = VrlState::default();
207-
let timezone = VrlTimeZone::default();
208-
let mut ctx = VrlContext::new(&mut target, &mut state, &timezone);
187+
let value = VrlValue::Object(BTreeMap::from([
188+
("request".into(), client_request.into()),
189+
("original_url".into(), original_url_value),
190+
]));
209191

210192
// Resolve the expression to get an endpoint URL.
211-
let endpoint_result = expression.resolve(&mut ctx).map_err(|err| {
193+
let endpoint_result = expression.execute(value).map_err(|err| {
212194
SubgraphExecutorError::new_endpoint_expression_resolution_failure(
213195
subgraph_name.to_string(),
214196
err,
@@ -267,14 +249,10 @@ impl SubgraphExecutorMap {
267249
fn register_expression(
268250
&mut self,
269251
subgraph_name: &str,
270-
expression: &str,
252+
expression: &Expression,
271253
) -> Result<(), SubgraphExecutorError> {
272-
let compilation_result = vrl_compile(expression, &self.vrl_functions).map_err(|e| {
273-
SubgraphExecutorError::new_endpoint_expression_build(subgraph_name.to_string(), e)
274-
})?;
275-
276254
self.expressions_by_subgraph
277-
.insert(subgraph_name.to_string(), compilation_result.program);
255+
.insert(subgraph_name.to_string(), expression.clone());
278256

279257
Ok(())
280258
}

lib/executor/src/headers/compile.rs

Lines changed: 19 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,13 @@ use crate::headers::{
1212
use hive_router_config::headers as config;
1313
use http::HeaderName;
1414
use regex_automata::{meta, util::syntax::Config as SyntaxConfig};
15-
use vrl::{
16-
compiler::compile as vrl_compile, prelude::Function as VrlFunction,
17-
stdlib::all as vrl_build_functions,
18-
};
19-
20-
pub struct HeaderRuleCompilerContext {
21-
vrl_functions: Vec<Box<dyn VrlFunction>>,
22-
}
23-
24-
impl Default for HeaderRuleCompilerContext {
25-
fn default() -> Self {
26-
Self::new()
27-
}
28-
}
29-
30-
impl HeaderRuleCompilerContext {
31-
pub fn new() -> Self {
32-
Self {
33-
vrl_functions: vrl_build_functions(),
34-
}
35-
}
36-
}
3715

3816
pub trait HeaderRuleCompiler<A> {
39-
fn compile(
40-
&self,
41-
ctx: &HeaderRuleCompilerContext,
42-
actions: &mut A,
43-
) -> Result<(), HeaderRuleCompileError>;
17+
fn compile(&self, actions: &mut A) -> Result<(), HeaderRuleCompileError>;
4418
}
4519

4620
impl HeaderRuleCompiler<Vec<RequestHeaderRule>> for config::RequestHeaderRule {
47-
fn compile(
48-
&self,
49-
ctx: &HeaderRuleCompilerContext,
50-
actions: &mut Vec<RequestHeaderRule>,
51-
) -> Result<(), HeaderRuleCompileError> {
21+
fn compile(&self, actions: &mut Vec<RequestHeaderRule>) -> Result<(), HeaderRuleCompileError> {
5222
match self {
5323
config::RequestHeaderRule::Propagate(rule) => {
5424
let spec = materialize_match_spec(
@@ -78,16 +48,11 @@ impl HeaderRuleCompiler<Vec<RequestHeaderRule>> for config::RequestHeaderRule {
7848
value: build_header_value(&rule.name, value)?,
7949
}));
8050
}
81-
config::InsertSource::Expression { expression } => {
82-
let compilation_result =
83-
vrl_compile(expression, &ctx.vrl_functions).map_err(|e| {
84-
HeaderRuleCompileError::new_expression_build(rule.name.clone(), e)
85-
})?;
86-
51+
config::InsertSource::Expression(expression) => {
8752
actions.push(RequestHeaderRule::InsertExpression(
8853
RequestInsertExpression {
8954
name: build_header_name(&rule.name)?,
90-
expression: Box::new(compilation_result.program),
55+
expression: Box::new(expression.clone()),
9156
},
9257
));
9358
}
@@ -112,11 +77,7 @@ impl HeaderRuleCompiler<Vec<RequestHeaderRule>> for config::RequestHeaderRule {
11277
}
11378

11479
impl HeaderRuleCompiler<Vec<ResponseHeaderRule>> for config::ResponseHeaderRule {
115-
fn compile(
116-
&self,
117-
ctx: &HeaderRuleCompilerContext,
118-
actions: &mut Vec<ResponseHeaderRule>,
119-
) -> Result<(), HeaderRuleCompileError> {
80+
fn compile(&self, actions: &mut Vec<ResponseHeaderRule>) -> Result<(), HeaderRuleCompileError> {
12081
match self {
12182
config::ResponseHeaderRule::Propagate(rule) => {
12283
let aggregation_strategy = rule.algorithm.into();
@@ -153,21 +114,16 @@ impl HeaderRuleCompiler<Vec<ResponseHeaderRule>> for config::ResponseHeaderRule
153114
strategy: aggregation_strategy,
154115
}));
155116
}
156-
config::InsertSource::Expression { expression } => {
117+
config::InsertSource::Expression(expression) => {
157118
// NOTE: In case we ever need to improve performance and not pass the whole context
158119
// to VRL expressions, we can use:
159120
// - compilation_result.program.info().target_assignments
160121
// - compilation_result.program.info().target_queries
161122
// to determine what parts of the context are actually needed by the expression
162-
let compilation_result = vrl_compile(expression, &ctx.vrl_functions)
163-
.map_err(|e| {
164-
HeaderRuleCompileError::new_expression_build(rule.name.clone(), e)
165-
})?;
166-
167123
actions.push(ResponseHeaderRule::InsertExpression(
168124
ResponseInsertExpression {
169125
name: build_header_name(&rule.name)?,
170-
expression: Box::new(compilation_result.program),
126+
expression: Box::new(expression.clone()),
171127
strategy: aggregation_strategy,
172128
},
173129
));
@@ -196,19 +152,18 @@ impl HeaderRuleCompiler<Vec<ResponseHeaderRule>> for config::ResponseHeaderRule
196152
pub fn compile_headers_plan(
197153
cfg: &config::HeadersConfig,
198154
) -> Result<HeaderRulesPlan, HeaderRuleCompileError> {
199-
let ctx = HeaderRuleCompilerContext::new();
200155
let mut request_plan = RequestHeaderRules::default();
201156
let mut response_plan = ResponseHeaderRules::default();
202157

203158
if let Some(global_rules) = &cfg.all {
204-
request_plan.global = compile_request_header_rules(&ctx, global_rules)?;
205-
response_plan.global = compile_response_header_rules(&ctx, global_rules)?;
159+
request_plan.global = compile_request_header_rules(global_rules)?;
160+
response_plan.global = compile_response_header_rules(global_rules)?;
206161
}
207162

208163
if let Some(subgraph_rules_map) = &cfg.subgraphs {
209164
for (subgraph_name, subgraph_rules) in subgraph_rules_map {
210-
let request_actions = compile_request_header_rules(&ctx, subgraph_rules)?;
211-
let response_actions = compile_response_header_rules(&ctx, subgraph_rules)?;
165+
let request_actions = compile_request_header_rules(subgraph_rules)?;
166+
let response_actions = compile_response_header_rules(subgraph_rules)?;
212167
request_plan
213168
.by_subgraph
214169
.insert(subgraph_name.clone(), request_actions);
@@ -225,26 +180,24 @@ pub fn compile_headers_plan(
225180
}
226181

227182
fn compile_request_header_rules(
228-
ctx: &HeaderRuleCompilerContext,
229183
header_rules: &config::HeaderRules,
230184
) -> Result<Vec<RequestHeaderRule>, HeaderRuleCompileError> {
231185
let mut request_actions = Vec::new();
232186
if let Some(request_rule_entries) = &header_rules.request {
233187
for request_rule in request_rule_entries {
234-
request_rule.compile(ctx, &mut request_actions)?;
188+
request_rule.compile(&mut request_actions)?;
235189
}
236190
}
237191
Ok(request_actions)
238192
}
239193

240194
fn compile_response_header_rules(
241-
ctx: &HeaderRuleCompilerContext,
242195
header_rules: &config::HeaderRules,
243196
) -> Result<Vec<ResponseHeaderRule>, HeaderRuleCompileError> {
244197
let mut response_actions = Vec::new();
245198
if let Some(response_rule_entries) = &header_rules.response {
246199
for response_rule in response_rule_entries {
247-
response_rule.compile(ctx, &mut response_actions)?;
200+
response_rule.compile(&mut response_actions)?;
248201
}
249202
}
250203
Ok(response_actions)
@@ -358,7 +311,7 @@ mod tests {
358311
use http::HeaderName;
359312

360313
use crate::headers::{
361-
compile::{build_header_value, HeaderRuleCompiler, HeaderRuleCompilerContext},
314+
compile::{build_header_value, HeaderRuleCompiler},
362315
errors::HeaderRuleCompileError,
363316
plan::{HeaderAggregationStrategy, RequestHeaderRule, ResponseHeaderRule},
364317
};
@@ -378,9 +331,8 @@ mod tests {
378331
rename: None,
379332
default: None,
380333
});
381-
let ctx = HeaderRuleCompilerContext::new();
382334
let mut actions = Vec::new();
383-
rule.compile(&ctx, &mut actions).unwrap();
335+
rule.compile(&mut actions).unwrap();
384336
assert_eq!(actions.len(), 1);
385337
match &actions[0] {
386338
RequestHeaderRule::PropagateNamed(data) => {
@@ -401,8 +353,7 @@ mod tests {
401353
},
402354
});
403355
let mut actions = Vec::new();
404-
let ctx = HeaderRuleCompilerContext::new();
405-
rule.compile(&ctx, &mut actions).unwrap();
356+
rule.compile(&mut actions).unwrap();
406357
assert_eq!(actions.len(), 1);
407358
match &actions[0] {
408359
RequestHeaderRule::InsertStatic(data) => {
@@ -423,8 +374,7 @@ mod tests {
423374
},
424375
});
425376
let mut actions = Vec::new();
426-
let ctx = HeaderRuleCompilerContext::new();
427-
rule.compile(&ctx, &mut actions).unwrap();
377+
rule.compile(&mut actions).unwrap();
428378
assert_eq!(actions.len(), 1);
429379
match &actions[0] {
430380
RequestHeaderRule::RemoveNamed(data) => {
@@ -449,8 +399,7 @@ mod tests {
449399
default: Some("def".to_string()),
450400
});
451401
let mut actions = Vec::new();
452-
let ctx = HeaderRuleCompilerContext::new();
453-
let err = rule.compile(&ctx, &mut actions).unwrap_err();
402+
let err = rule.compile(&mut actions).unwrap_err();
454403
match err {
455404
HeaderRuleCompileError::InvalidDefault => {}
456405
_ => panic!("Expected InvalidDefault error"),
@@ -470,8 +419,7 @@ mod tests {
470419
algorithm: config::AggregationAlgo::First,
471420
});
472421
let mut actions = Vec::new();
473-
let ctx = HeaderRuleCompilerContext::new();
474-
rule.compile(&ctx, &mut actions).unwrap();
422+
rule.compile(&mut actions).unwrap();
475423
assert_eq!(actions.len(), 1);
476424
match &actions[0] {
477425
ResponseHeaderRule::PropagateNamed(data) => {

lib/executor/src/headers/errors.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use http::header::{InvalidHeaderName, InvalidHeaderValue};
22
use regex_automata::meta::BuildError;
3-
use vrl::{diagnostic::DiagnosticList, prelude::ExpressionError};
3+
use vrl::prelude::ExpressionError;
44

55
#[derive(thiserror::Error, Debug)]
66
pub enum HeaderRuleCompileError {
@@ -27,16 +27,8 @@ pub enum HeaderRuleRuntimeError {
2727
}
2828

2929
impl HeaderRuleCompileError {
30-
pub fn new_expression_build(header_name: String, diagnostics: DiagnosticList) -> Self {
31-
HeaderRuleCompileError::ExpressionBuild(
32-
header_name,
33-
diagnostics
34-
.errors()
35-
.into_iter()
36-
.map(|d| d.code.to_string() + ": " + &d.message)
37-
.collect::<Vec<_>>()
38-
.join(", "),
39-
)
30+
pub fn new_expression_build(header_name: String, err: String) -> Self {
31+
HeaderRuleCompileError::ExpressionBuild(header_name, err)
4032
}
4133
}
4234

lib/executor/src/headers/plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use ahash::HashMap;
2+
use hive_router_config::primitives::expression::Expression;
23
use http::{HeaderName, HeaderValue};
34
use regex_automata::meta::Regex;
4-
use vrl::compiler::Program as VrlProgram;
55

66
#[derive(Clone)]
77
pub struct HeaderRulesPlan {
@@ -62,13 +62,13 @@ pub struct ResponseInsertStatic {
6262
#[derive(Clone)]
6363
pub struct RequestInsertExpression {
6464
pub name: HeaderName,
65-
pub expression: Box<VrlProgram>,
65+
pub expression: Box<Expression>,
6666
}
6767

6868
#[derive(Clone)]
6969
pub struct ResponseInsertExpression {
7070
pub name: HeaderName,
71-
pub expression: Box<VrlProgram>,
71+
pub expression: Box<Expression>,
7272
pub strategy: HeaderAggregationStrategy,
7373
}
7474

0 commit comments

Comments
 (0)