Skip to content

Commit 9421157

Browse files
committed
fix: resolve semver variables and invalidate cache on flag updates
Signed-off-by: Eren Atas <[email protected]>
1 parent ad5ab8f commit 9421157

File tree

4 files changed

+161
-48
lines changed

4 files changed

+161
-48
lines changed

crates/flagd/src/cache/service.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,13 @@ where
245245
cache.add(cache_key, entry)
246246
}
247247

248+
pub async fn purge(&self) {
249+
if self.enabled {
250+
let mut cache = self.cache.write().await;
251+
cache.purge();
252+
}
253+
}
254+
248255
pub fn disable(&mut self) {
249256
if self.enabled {
250257
self.enabled = false;

crates/flagd/src/resolver/in_process/resolver/file.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
use crate::resolver::in_process::model::value_converter::ValueConverter;
22
use crate::resolver::in_process::storage::connector::file::FileConnector;
3-
use crate::resolver::in_process::storage::{FlagStore, StorageState};
3+
use crate::resolver::in_process::storage::{FlagStore, StorageState, StorageStateChange};
44
use crate::resolver::in_process::targeting::Operator;
55
use crate::{CacheService, CacheSettings};
66
use anyhow::Result;
77
use async_trait::async_trait;
88
use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetails};
99
use open_feature::{EvaluationContext, EvaluationError, EvaluationErrorCode, StructValue, Value};
1010
use std::sync::Arc;
11+
use tokio::sync::Mutex;
1112
use tracing::debug;
1213

1314
pub struct FileResolver {
1415
store: Arc<FlagStore>,
1516
operator: Operator,
1617
metadata: ProviderMetadata,
1718
cache: Option<Arc<CacheService<Value>>>,
19+
state_receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<StorageStateChange>>>,
1820
}
1921
impl FileResolver {
2022
pub async fn new(source_path: String, cache_settings: Option<CacheSettings>) -> Result<Self> {
@@ -35,21 +37,51 @@ impl FileResolver {
3537
return Err(anyhow::anyhow!("Timeout waiting for initial flag state"));
3638
}
3739

40+
let cache = cache_settings.map(|settings| Arc::new(CacheService::new(settings)));
41+
3842
Ok(Self {
3943
store,
4044
operator: Operator::new(),
4145
metadata: ProviderMetadata::new("flagd"),
42-
cache: cache_settings.map(|settings| Arc::new(CacheService::new(settings))),
46+
cache,
47+
state_receiver: Arc::new(Mutex::new(state_receiver)),
4348
})
4449
}
4550

51+
/// Check for flag updates and clear cache if needed (non-blocking)
52+
async fn check_for_updates(&self) {
53+
if self.cache.is_none() {
54+
return;
55+
}
56+
57+
let mut receiver = self.state_receiver.lock().await;
58+
59+
// Drain all pending state changes (non-blocking)
60+
let mut should_clear = false;
61+
while let Ok(state_change) = receiver.try_recv() {
62+
if state_change.storage_state == StorageState::Ok {
63+
should_clear = true;
64+
}
65+
}
66+
67+
if should_clear {
68+
debug!("Flag store updated, clearing cache");
69+
if let Some(cache) = &self.cache {
70+
cache.purge().await;
71+
}
72+
}
73+
}
74+
4675
async fn resolve_value<T>(
4776
&self,
4877
flag_key: &str,
4978
context: &EvaluationContext,
5079
value_converter: impl Fn(&serde_json::Value) -> Option<T>,
5180
type_name: &str,
5281
) -> Result<ResolutionDetails<T>, EvaluationError> {
82+
// Check for flag updates and clear cache if needed
83+
self.check_for_updates().await;
84+
5385
if let Some(cache) = &self.cache
5486
&& let Some(cached_value) = cache.get(flag_key, context).await
5587
{

crates/flagd/src/resolver/in_process/resolver/grpc.rs

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,59 +8,83 @@ use open_feature::provider::{FeatureProvider, ProviderMetadata, ResolutionDetail
88
use open_feature::{EvaluationContext, EvaluationError, EvaluationErrorCode, StructValue, Value};
99
use serde_json::Value as JsonValue;
1010
use std::sync::Arc;
11+
use tokio::sync::Mutex;
12+
use tracing::debug;
1113

12-
use crate::resolver::in_process::storage::FlagStore;
1314
use crate::resolver::in_process::storage::connector::grpc::GrpcStreamConnector;
15+
use crate::resolver::in_process::storage::{FlagStore, StorageState, StorageStateChange};
1416

1517
pub struct InProcessResolver {
1618
store: Arc<FlagStore>,
1719
operator: Operator,
1820
metadata: ProviderMetadata,
1921
cache: Option<Arc<CacheService<Value>>>,
22+
state_receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<StorageStateChange>>>,
2023
}
2124

2225
impl InProcessResolver {
2326
pub async fn new(options: &FlagdOptions) -> Result<Self> {
24-
let store = match &options.socket_path {
27+
let (store, state_receiver) = match &options.socket_path {
2528
Some(_) => Self::create_unix_socket_store(options).await?,
2629
None => Self::create_tcp_store(options).await?,
2730
};
2831

32+
let cache = options
33+
.cache_settings
34+
.clone()
35+
.map(|settings| Arc::new(CacheService::new(settings)));
36+
2937
Ok(Self {
3038
store,
3139
operator: Operator::new(),
3240
metadata: ProviderMetadata::new("flagd"),
33-
cache: options
34-
.cache_settings
35-
.clone()
36-
.map(|settings| Arc::new(CacheService::new(settings))),
41+
cache,
42+
state_receiver: Arc::new(Mutex::new(state_receiver)),
3743
})
3844
}
3945

40-
async fn create_unix_socket_store(_options: &FlagdOptions) -> Result<Arc<FlagStore>> {
41-
// let socket_path = options.socket_path.as_ref().unwrap().clone();
42-
// let socket_path_for_connector = socket_path.clone();
46+
/// Check for flag updates and clear cache if needed (non-blocking)
47+
async fn check_for_updates(&self) {
48+
if self.cache.is_none() {
49+
return;
50+
}
51+
52+
let mut receiver = self.state_receiver.lock().await;
4353

44-
// let _channel = Endpoint::try_from("http://[::]:50051")?
45-
// .connect_with_connector(service_fn(move |_: Uri| {
46-
// let path = socket_path.clone();
47-
// async move {
48-
// let stream = UnixStream::connect(path).await?;
49-
// Ok::<_, std::io::Error>(TokioIo::new(stream))
50-
// }
51-
// }))
52-
// .await?;
54+
// Drain all pending state changes (non-blocking)
55+
let mut should_clear = false;
56+
while let Ok(state_change) = receiver.try_recv() {
57+
if state_change.storage_state == StorageState::Ok {
58+
should_clear = true;
59+
}
60+
}
5361

54-
// let connector =
55-
// GrpcStreamConnector::new(socket_path_for_connector, options.selector.clone(), options);
56-
// let (store, _state_receiver) = FlagStore::new(Arc::new(connector));
57-
// let store = Arc::new(store);
58-
// store.init().await?;
59-
// Ok(store)
60-
todo!("Unix socket store for in-process is not implemented")
62+
if should_clear {
63+
debug!("Flag store updated, clearing cache");
64+
if let Some(cache) = &self.cache {
65+
cache.purge().await;
66+
}
67+
}
6168
}
6269

63-
async fn create_tcp_store(options: &FlagdOptions) -> Result<Arc<FlagStore>> {
70+
async fn create_unix_socket_store(
71+
_options: &FlagdOptions,
72+
) -> Result<(
73+
Arc<FlagStore>,
74+
tokio::sync::mpsc::Receiver<crate::resolver::in_process::storage::StorageStateChange>,
75+
)> {
76+
// Unix socket store for in-process is not implemented
77+
Err(anyhow::anyhow!(
78+
"Unix socket store for in-process is not implemented"
79+
))
80+
}
81+
82+
async fn create_tcp_store(
83+
options: &FlagdOptions,
84+
) -> Result<(
85+
Arc<FlagStore>,
86+
tokio::sync::mpsc::Receiver<crate::resolver::in_process::storage::StorageStateChange>,
87+
)> {
6488
let target = options
6589
.target_uri
6690
.clone()
@@ -73,10 +97,10 @@ impl InProcessResolver {
7397
upstream_config.authority().to_string(),
7498
);
7599

76-
let (store, _state_receiver) = FlagStore::new(Arc::new(connector));
100+
let (store, state_receiver) = FlagStore::new(Arc::new(connector));
77101
let store = Arc::new(store);
78102
store.init().await?;
79-
Ok(store)
103+
Ok((store, state_receiver))
80104
}
81105

82106
async fn get_cached_value<T>(
@@ -100,6 +124,9 @@ impl InProcessResolver {
100124
value_converter: impl Fn(&JsonValue) -> Option<T>,
101125
type_name: &str,
102126
) -> Result<ResolutionDetails<T>, EvaluationError> {
127+
// Check for flag updates and clear cache if needed
128+
self.check_for_updates().await;
129+
103130
// Try cache first
104131
if let Some(cached_value) = self
105132
.get_cached_value(flag_key, context, |v| match v {

crates/flagd/src/resolver/in_process/targeting/semver.rs

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,66 @@ impl Operator for SemVerOperator {
99
fn evaluate(
1010
&self,
1111
args: &[Value],
12-
_context: &mut ContextStack,
12+
context: &mut ContextStack,
1313
_evaluator: &dyn Evaluator,
1414
) -> datalogic_rs::Result<Value> {
1515
if args.len() != 3 {
1616
debug!("SemVer requires exactly 3 arguments, got {}", args.len());
1717
return Ok(Value::Null);
1818
}
1919

20-
let version1 = match args[0].as_str() {
21-
Some(s) => match Version::parse(s) {
22-
Ok(v) => v,
23-
Err(e) => {
24-
debug!("Failed to parse first version: {:?}: {}", s, e);
25-
return Ok(Value::Null);
20+
// Helper function to resolve a value (either a string literal or a variable reference)
21+
let resolve_value = |arg: &Value| -> Option<String> {
22+
match arg {
23+
Value::String(s) => Some(s.clone()),
24+
Value::Object(obj) => {
25+
// Check if it's a variable reference: {"var": "variable_name"}
26+
if let Some(var_name) = obj.get("var").and_then(|v| v.as_str()) {
27+
// Get the current data from context
28+
let frame = context.root();
29+
let data = frame.data();
30+
// Try to resolve from context (check both direct key and nested in $flagd)
31+
data.get(var_name)
32+
.and_then(|v| v.as_str())
33+
.map(|s| s.to_string())
34+
.or_else(|| {
35+
// Also check in $flagd.flagKey format
36+
if let Some(parts) = var_name.split_once('.') {
37+
if parts.0 == "$flagd" {
38+
data.get("$flagd")
39+
.and_then(|v| v.get(parts.1))
40+
.and_then(|v| v.as_str())
41+
.map(|s| s.to_string())
42+
} else {
43+
None
44+
}
45+
} else {
46+
None
47+
}
48+
})
49+
} else {
50+
None
51+
}
2652
}
27-
},
53+
_ => None,
54+
}
55+
};
56+
57+
let version1_str = match resolve_value(&args[0]) {
58+
Some(s) => s,
2859
None => {
29-
debug!("First argument must be a string: {:?}", args[0]);
60+
debug!(
61+
"First argument must be a string or variable reference: {:?}",
62+
args[0]
63+
);
64+
return Ok(Value::Null);
65+
}
66+
};
67+
68+
let version1 = match Version::parse(&version1_str) {
69+
Ok(v) => v,
70+
Err(e) => {
71+
debug!("Failed to parse first version: {:?}: {}", version1_str, e);
3072
return Ok(Value::Null);
3173
}
3274
};
@@ -39,16 +81,21 @@ impl Operator for SemVerOperator {
3981
}
4082
};
4183

42-
let version2 = match args[2].as_str() {
43-
Some(s) => match Version::parse(s) {
44-
Ok(v) => v,
45-
Err(e) => {
46-
debug!("Failed to parse second version: {:?}: {}", s, e);
47-
return Ok(Value::Null);
48-
}
49-
},
84+
let version2_str = match resolve_value(&args[2]) {
85+
Some(s) => s,
5086
None => {
51-
debug!("Second argument must be a string: {:?}", args[2]);
87+
debug!(
88+
"Second argument must be a string or variable reference: {:?}",
89+
args[2]
90+
);
91+
return Ok(Value::Null);
92+
}
93+
};
94+
95+
let version2 = match Version::parse(&version2_str) {
96+
Ok(v) => v,
97+
Err(e) => {
98+
debug!("Failed to parse second version: {:?}: {}", version2_str, e);
5299
return Ok(Value::Null);
53100
}
54101
};

0 commit comments

Comments
 (0)