Skip to content

Commit 6625e3a

Browse files
committed
Response Cache Plugin
1 parent 4c405b7 commit 6625e3a

File tree

6 files changed

+227
-0
lines changed

6 files changed

+227
-0
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/executor/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ ahash = "0.8.12"
3333
regex-automata = "0.4.10"
3434
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }
3535

36+
ntex = { version = "2", features = ["tokio"] }
3637
ntex-http = "0.1.15"
3738
hyper-tls = { version = "0.6.0", features = ["vendored"] }
3839
hyper-util = { version = "0.1.16", features = [
@@ -47,6 +48,7 @@ itoa = "1.0.15"
4748
ryu = "1.0.20"
4849
indexmap = "2.10.0"
4950
bumpalo = "3.19.0"
51+
redis = "0.32.7"
5052

5153
[dev-dependencies]
5254
subgraphs = { path = "../../bench/subgraphs" }

lib/executor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod projection;
88
pub mod response;
99
pub mod utils;
1010
pub mod variables;
11+
pub mod plugins;
1112

1213
pub use execution::plan::execute_query_plan;
1314
pub use executors::map::SubgraphExecutorMap;

lib/executor/src/plugins/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
mod traits;
2+
mod response_cache;
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use bytes::Bytes;
2+
use dashmap::DashMap;
3+
use ntex::web::HttpResponse;
4+
use redis::Commands;
5+
use sonic_rs::json;
6+
7+
use crate::{
8+
plugins::traits::{
9+
ControlFlow, OnExecuteEnd, OnExecuteEndPayload, OnExecuteStart, OnExecuteStartPayload,
10+
OnSchemaReload, OnSchemaReloadPayload,
11+
}, response::value::Value, utils::consts::TYPENAME_FIELD_NAME
12+
};
13+
14+
pub struct ResponseCachePlugin {
15+
redis_client: redis::Client,
16+
ttl_per_type: DashMap<String, u64>,
17+
}
18+
19+
impl ResponseCachePlugin {
20+
pub fn try_new(redis_url: &str) -> Result<Self, redis::RedisError> {
21+
let redis_client = redis::Client::open(redis_url)?;
22+
Ok(Self {
23+
redis_client,
24+
ttl_per_type: DashMap::new(),
25+
})
26+
}
27+
}
28+
29+
struct ResponseCacheContext {
30+
key: String,
31+
}
32+
33+
impl OnExecuteStart for ResponseCachePlugin {
34+
fn on_execute_start(&self, mut payload: OnExecuteStartPayload) -> ControlFlow {
35+
let key = format!(
36+
"response_cache:{}:{:?}",
37+
payload.query_plan, payload.variable_values
38+
);
39+
payload
40+
.router_http_request
41+
.extensions_mut()
42+
.insert(ResponseCacheContext { key: key.clone() });
43+
if let Some(mut conn) = self.redis_client.get_connection().ok() {
44+
let cached_response: Option<Vec<u8>> = conn.get(&key).ok();
45+
if let Some(cached_response) = cached_response {
46+
return ControlFlow::Break(
47+
HttpResponse::Ok()
48+
.header("Content-Type", "application/json")
49+
.body(cached_response),
50+
);
51+
}
52+
}
53+
ControlFlow::Continue
54+
}
55+
}
56+
57+
impl OnExecuteEnd for ResponseCachePlugin {
58+
fn on_execute_end(&self, mut payload: OnExecuteEndPayload) -> ControlFlow {
59+
// Do not cache if there are errors
60+
if payload.errors.len() > 0 {
61+
return ControlFlow::Continue;
62+
}
63+
if let Some(key) = payload
64+
.router_http_request
65+
.extensions()
66+
.get::<ResponseCacheContext>()
67+
.map(|ctx| &ctx.key)
68+
{
69+
if let Some(mut conn) = self.redis_client.get_connection().ok() {
70+
if let Some(serialized) = sonic_rs::to_vec(&payload.data).ok() {
71+
// Decide on the ttl somehow
72+
// Get the type names
73+
let mut max_ttl = 0;
74+
75+
// Imagine this code is traversing the response data to find type names
76+
if let Some(obj) = payload.data.as_object() {
77+
if let Some(typename) = obj
78+
.iter()
79+
.position(|(k, _)| k == &TYPENAME_FIELD_NAME)
80+
.and_then(|idx| obj[idx].1.as_str())
81+
{
82+
if let Some(ttl) = self.ttl_per_type.get(typename).map(|v| *v) {
83+
max_ttl = max_ttl.max(ttl);
84+
}
85+
}
86+
}
87+
88+
// If no ttl found, default to 60 seconds
89+
if max_ttl == 0 {
90+
max_ttl = 60;
91+
}
92+
93+
// Insert the ttl into extensions for client awareness
94+
payload.extensions
95+
.insert("response_cache_ttl".to_string(), json!(max_ttl));
96+
97+
// Set the cache with the decided ttl
98+
let _: () = conn.set_ex(key, serialized, max_ttl).unwrap_or(());
99+
}
100+
}
101+
}
102+
ControlFlow::Continue
103+
}
104+
}
105+
106+
impl OnSchemaReload for ResponseCachePlugin {
107+
fn on_schema_reload(&self, payload: OnSchemaReloadPayload) {
108+
// Visit the schema and update ttl_per_type based on some directive
109+
payload
110+
.new_schema
111+
.document
112+
.definitions
113+
.iter()
114+
.for_each(|def| {
115+
if let graphql_parser::schema::Definition::TypeDefinition(type_def) = def {
116+
if let graphql_parser::schema::TypeDefinition::Object(obj_type) = type_def {
117+
for directive in &obj_type.directives {
118+
if directive.name == "cacheControl" {
119+
for arg in &directive.arguments {
120+
if arg.0 == "maxAge" {
121+
if let graphql_parser::query::Value::Int(max_age) = &arg.1 {
122+
if let Some(max_age) = max_age.as_i64() {
123+
self.ttl_per_type
124+
.insert(obj_type.name.clone(), max_age as u64);
125+
}
126+
}
127+
}
128+
}
129+
}
130+
}
131+
}
132+
}
133+
});
134+
}
135+
}

lib/executor/src/plugins/traits.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use async_trait::async_trait;
4+
use hive_router_query_planner::consumer_schema::ConsumerSchema;
5+
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
6+
use ntex::web::HttpRequest;
7+
use ntex::web::HttpResponse;
8+
9+
use crate::response::graphql_error::GraphQLError;
10+
use crate::response::value::Value;
11+
12+
pub enum ControlFlow {
13+
Continue,
14+
Break(HttpResponse)
15+
}
16+
17+
pub struct ExecutionResult<'exec> {
18+
pub data: &'exec mut Value<'exec>,
19+
pub errors: &'exec mut Vec<GraphQLError>,
20+
pub extensions: &'exec mut Option<HashMap<String, Value<'exec>>>,
21+
}
22+
23+
pub struct OnExecuteStartPayload<'exec> {
24+
pub router_http_request: &'exec HttpRequest,
25+
pub query_plan: Arc<QueryPlan>,
26+
27+
pub data: &'exec mut Value<'exec>,
28+
pub errors: &'exec mut Vec<GraphQLError>,
29+
pub extensions: Option<&'exec mut sonic_rs::Value>,
30+
31+
pub skip_execution: bool,
32+
33+
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
34+
}
35+
36+
pub trait OnExecuteStart {
37+
fn on_execute_start(&self, payload: OnExecuteStartPayload) -> ControlFlow;
38+
}
39+
40+
pub struct OnExecuteEndPayload<'exec> {
41+
pub router_http_request: &'exec HttpRequest,
42+
pub query_plan: Arc<QueryPlan>,
43+
44+
pub data: &'exec Value<'exec>,
45+
pub errors: &'exec Vec<GraphQLError>,
46+
pub extensions: &'exec mut HashMap<String, sonic_rs::Value>,
47+
48+
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
49+
}
50+
51+
pub trait OnExecuteEnd {
52+
fn on_execute_end(&self, payload: OnExecuteEndPayload) -> ControlFlow;
53+
}
54+
55+
pub struct OnSchemaReloadPayload {
56+
pub old_schema: &'static ConsumerSchema,
57+
pub new_schema: &'static mut ConsumerSchema,
58+
}
59+
60+
pub trait OnSchemaReload {
61+
fn on_schema_reload(&self, payload: OnSchemaReloadPayload);
62+
}
63+

0 commit comments

Comments
 (0)