Skip to content

Commit c62885f

Browse files
authored
feat(flagd): improve in-process evaluation mode, optimize dependencies (#91)
Signed-off-by: Eren Atas <[email protected]>
1 parent 6ba9d48 commit c62885f

File tree

17 files changed

+1054
-169
lines changed

17 files changed

+1054
-169
lines changed

crates/flagd/Cargo.toml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ rpc = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:hype
2929
# REST evaluation mode - uses HTTP/OFREP to connect to flagd service
3030
rest = ["dep:reqwest"]
3131
# In-process evaluation mode - local evaluation with gRPC sync or file-based configuration
32-
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver"]
32+
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver", "dep:notify"]
3333

3434
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
3535

@@ -38,7 +38,6 @@ tonic-prost-build = "0.14"
3838

3939
[dev-dependencies]
4040
cucumber = "0.22"
41-
tokio-stream = "0.1"
4241
futures-core = "0.3"
4342
testcontainers = { version = "0.26.0", features = ["http_wait", "blocking"] }
4443
wiremock = "0.6.5"
@@ -50,24 +49,23 @@ test-log = { version = "0.2", features = ["trace"] }
5049
[dependencies]
5150
open-feature = "0.2"
5251
async-trait = "0.1"
53-
tokio = { version = "1.48", features = ["full"] }
52+
tokio = { version = "1.48", features = ["sync", "time", "fs", "rt", "rt-multi-thread", "macros", "net"] }
5453
serde_json = "1.0"
5554
serde = { version = "1.0", features = ["derive"] }
5655
lru = "0.16"
57-
futures = "0.3"
56+
tokio-stream = "0.1"
5857
tracing = "0.1"
59-
anyhow = "1.0.100"
6058
thiserror = "2.0"
6159

6260
# RPC and In-Process shared dependencies (gRPC)
63-
tonic = { version = "0.14", optional = true }
61+
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen"], optional = true }
6462
tonic-prost = { version = "0.14", optional = true }
6563
prost = { version = "0.14", optional = true }
6664
prost-types = { version = "0.14", optional = true }
6765

6866
# RPC-specific dependencies
6967
hyper-util = { version = "0.1", features = ["tokio"], optional = true }
70-
tower = { version = "0.5", optional = true }
68+
tower = { version = "0.5", default-features = false, features = ["util"], optional = true }
7169

7270
# REST-specific dependencies
7371
reqwest = { version = "0.12", default-features = false, features = ["json", "stream", "rustls-tls"], optional = true }
@@ -76,3 +74,4 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "str
7674
datalogic-rs = { version = "4.0.4", optional = true }
7775
murmurhash3 = { version = "0.0.5", optional = true }
7876
semver = { version = "1.0.27", optional = true }
77+
notify = { version = "8.0", optional = true }

crates/flagd/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ Configurations can be provided as constructor options or via environment variabl
199199
| TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
200200
| Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
201201
| Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
202-
| Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
202+
| Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | In-Process: disabled, others: lru | RPC, In-Process, File |
203203
| Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
204204
| Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
205205
| Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//! Common utilities for flagd examples
2+
3+
use std::time::Duration;
4+
use testcontainers::{
5+
ContainerAsync, GenericImage, ImageExt,
6+
core::{ContainerPort, Mount, WaitFor, logs::LogSource, wait::LogWaitStrategy},
7+
runners::AsyncRunner,
8+
};
9+
10+
pub const FLAGD_SYNC_PORT: u16 = 8015;
11+
12+
/// Start a flagd container configured for in-process sync (port 8015)
13+
pub async fn start_flagd_sync(
14+
flags_path: &str,
15+
flags_file: &str,
16+
) -> Result<(ContainerAsync<GenericImage>, u16), Box<dyn std::error::Error>> {
17+
// Use fsnotify provider for faster file change detection
18+
let sources_config = format!(
19+
r#"[{{"uri":"/flags/{}","provider":"fsnotify"}}]"#,
20+
flags_file
21+
);
22+
23+
let container = GenericImage::new("ghcr.io/open-feature/flagd", "latest")
24+
.with_exposed_port(ContainerPort::Tcp(FLAGD_SYNC_PORT))
25+
.with_wait_for(WaitFor::Log(LogWaitStrategy::new(
26+
LogSource::StdErr,
27+
"Flag IResolver listening at",
28+
)))
29+
.with_mount(Mount::bind_mount(flags_path.to_string(), "/flags"))
30+
.with_cmd(["start", "--sources", &sources_config])
31+
.start()
32+
.await?;
33+
34+
let sync_port = container
35+
.get_host_port_ipv4(ContainerPort::Tcp(FLAGD_SYNC_PORT))
36+
.await?;
37+
38+
// Give flagd a moment to fully initialize
39+
tokio::time::sleep(Duration::from_millis(500)).await;
40+
41+
Ok((container, sync_port))
42+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"$schema": "https://flagd.dev/schema/v0/flags.json",
3+
"flags": {
4+
"basic-boolean": {
5+
"state": "ENABLED",
6+
"defaultVariant": "false",
7+
"variants": {
8+
"true": true,
9+
"false": false
10+
},
11+
"targeting": {}
12+
}
13+
}
14+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
//! In-process evaluation example using testcontainers with flagd
2+
//!
3+
//! This example demonstrates in-process flag evaluation by periodically
4+
//! evaluating a boolean flag. Edit `examples/flags/basic-flags.json` while
5+
//! running to see live flag updates.
6+
//!
7+
//! Run with: cargo run --example in_process --all-features
8+
//!
9+
//! Then edit basic-flags.json and change "defaultVariant": "false" to "true"
10+
//! to see the flag value change in real-time.
11+
12+
mod common;
13+
14+
use common::start_flagd_sync;
15+
use open_feature::EvaluationContext;
16+
use open_feature::provider::FeatureProvider;
17+
use open_feature_flagd::{FlagdOptions, FlagdProvider, ResolverType};
18+
use std::time::Duration;
19+
20+
#[tokio::main]
21+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
22+
let manifest_dir = env!("CARGO_MANIFEST_DIR");
23+
let flags_path = format!("{}/examples/flags", manifest_dir);
24+
25+
println!("Starting flagd container...");
26+
let (_container, sync_port) = start_flagd_sync(&flags_path, "basic-flags.json").await?;
27+
println!("flagd sync service available on port {}", sync_port);
28+
29+
// Configure the flagd provider for in-process evaluation
30+
let provider = FlagdProvider::new(FlagdOptions {
31+
host: "localhost".to_string(),
32+
port: sync_port,
33+
resolver_type: ResolverType::InProcess,
34+
..Default::default()
35+
})
36+
.await
37+
.expect("Failed to create provider");
38+
39+
let ctx = EvaluationContext::default();
40+
41+
println!("\nEvaluating 'basic-boolean' flag every 2 seconds...");
42+
println!("Edit examples/flags/basic-flags.json to change the flag value.");
43+
println!("Press Ctrl+C to stop.\n");
44+
45+
loop {
46+
let result = provider
47+
.resolve_bool_value("basic-boolean", &ctx)
48+
.await
49+
.expect("Failed to resolve flag");
50+
51+
println!("basic-boolean = {}", result.value);
52+
53+
tokio::time::sleep(Duration::from_secs(2)).await;
54+
}
55+
}

crates/flagd/src/error.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use thiserror::Error;
22

3+
/// Error type for flagd operations
34
#[derive(Error, Debug)]
45
pub enum FlagdError {
56
#[error("Provider error: {0}")]
@@ -8,9 +9,20 @@ pub enum FlagdError {
89
Connection(String),
910
#[error("Invalid configuration: {0}")]
1011
Config(String),
12+
#[error("Sync error: {0}")]
13+
Sync(String),
14+
#[error("Parse error: {0}")]
15+
Parse(String),
16+
#[error("Timeout: {0}")]
17+
Timeout(String),
18+
#[error("IO error: {0}")]
19+
Io(#[from] std::io::Error),
20+
#[error("JSON error: {0}")]
21+
Json(#[from] serde_json::Error),
22+
#[error("Channel send error: {0}")]
23+
Channel(String),
1124
}
1225

13-
// Add implementations for error conversion
1426
impl From<Box<dyn std::error::Error>> for FlagdError {
1527
fn from(error: Box<dyn std::error::Error>) -> Self {
1628
FlagdError::Provider(error.to_string())
@@ -23,8 +35,14 @@ impl From<Box<dyn std::error::Error + Send + Sync>> for FlagdError {
2335
}
2436
}
2537

26-
impl From<anyhow::Error> for FlagdError {
27-
fn from(error: anyhow::Error) -> Self {
28-
FlagdError::Provider(error.to_string())
38+
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for FlagdError {
39+
fn from(error: tokio::sync::mpsc::error::SendError<T>) -> Self {
40+
FlagdError::Channel(error.to_string())
41+
}
42+
}
43+
44+
impl From<tokio::time::error::Elapsed> for FlagdError {
45+
fn from(error: tokio::time::error::Elapsed) -> Self {
46+
FlagdError::Timeout(error.to_string())
2947
}
3048
}

crates/flagd/src/lib.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@
199199
//! | TLS | FLAGD_TLS | boolean | false | RPC, In-Process |
200200
//! | Socket Path | FLAGD_SOCKET_PATH | string | "" | RPC |
201201
//! | Certificate Path | FLAGD_SERVER_CERT_PATH | string | "" | RPC, In-Process |
202-
//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | lru | RPC, In-Process, File |
202+
//! | Cache Type (LRU / In-Memory / Disabled) | FLAGD_CACHE | string ("lru", "mem", "disabled") | In-Process: disabled, others: lru | RPC, In-Process, File |
203203
//! | Cache TTL (Seconds) | FLAGD_CACHE_TTL | number | 60 | RPC, In-Process, File |
204204
//! | Max Cache Size | FLAGD_MAX_CACHE_SIZE | number | 1000 | RPC, In-Process, File |
205205
//! | Offline File Path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | string | "" | File |
@@ -297,6 +297,9 @@ pub struct FlagdOptions {
297297
/// The deadline in milliseconds for event streaming operations. Set to 0 to disable.
298298
/// Recommended to prevent infrastructure from killing idle connections.
299299
pub stream_deadline_ms: u32,
300+
/// HTTP/2 keepalive time in milliseconds. Sends pings to keep connections alive during
301+
/// idle periods, allowing RPCs to start quickly without delay. Set to 0 to disable.
302+
pub keep_alive_time_ms: u64,
300303
/// Offline polling interval in milliseconds
301304
pub offline_poll_interval_ms: Option<u32>,
302305
/// Provider ID for identifying this provider instance to flagd
@@ -358,6 +361,10 @@ impl Default for FlagdOptions {
358361
.ok()
359362
.and_then(|v| v.parse().ok())
360363
.unwrap_or(600000),
364+
keep_alive_time_ms: std::env::var("FLAGD_KEEP_ALIVE_TIME_MS")
365+
.ok()
366+
.and_then(|v| v.parse().ok())
367+
.unwrap_or(0), // Disabled by default, per gherkin spec
361368
socket_path: std::env::var("FLAGD_SOCKET_PATH").ok(),
362369
selector: std::env::var("FLAGD_SOURCE_SELECTOR").ok(),
363370
cache_settings: Some(CacheSettings::default()),
@@ -378,6 +385,13 @@ impl Default for FlagdOptions {
378385
// Only override to File if FLAGD_RESOLVER wasn't explicitly set
379386
options.resolver_type = ResolverType::File;
380387
}
388+
// Disable caching for in-process/file modes per spec (caching is RPC-only)
389+
if matches!(
390+
options.resolver_type,
391+
ResolverType::InProcess | ResolverType::File
392+
) {
393+
options.cache_settings = None;
394+
}
381395
}
382396

383397
options
Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,45 @@
1-
use anyhow::{Context, Result};
1+
use crate::error::FlagdError;
22
use std::str::FromStr;
33
use tonic::transport::{Endpoint, Uri};
44
use tracing::debug;
55

66
pub struct UpstreamConfig {
77
endpoint: Endpoint,
8-
authority: Uri,
8+
authority: Option<String>, // Only set for custom name resolution (envoy://)
99
}
1010

1111
impl UpstreamConfig {
12-
pub fn new(target: String, is_in_process: bool) -> Result<Self> {
12+
pub fn new(target: String, is_in_process: bool) -> Result<Self, FlagdError> {
1313
debug!("Creating upstream config for target: {}", target);
1414

1515
if target.starts_with("http://") {
1616
debug!("Target is already an HTTP endpoint");
17-
let uri = Uri::from_str(&target)?;
18-
let endpoint = Endpoint::from_shared(target)?;
17+
let endpoint = Endpoint::from_shared(target)
18+
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;
1919
return Ok(Self {
2020
endpoint,
21-
authority: uri
22-
.authority()
23-
.map(|a| a.as_str())
24-
.unwrap_or_default()
25-
.parse()?,
21+
authority: None, // Standard HTTP doesn't need custom authority
2622
});
2723
}
2824

2925
let (endpoint_str, authority) = if target.starts_with("envoy://") {
30-
let uri = Uri::from_str(&target).context("Failed to parse target URI")?;
26+
let uri = Uri::from_str(&target)
27+
.map_err(|e| FlagdError::Config(format!("Failed to parse target URI: {}", e)))?;
3128
let authority = uri.path().trim_start_matches('/');
3229

3330
if authority.is_empty() {
34-
return Err(anyhow::anyhow!("Service name (authority) cannot be empty"));
31+
return Err(FlagdError::Config(
32+
"Service name (authority) cannot be empty".to_string(),
33+
));
3534
}
3635

3736
let host = uri.host().unwrap_or("localhost");
3837
let port = uri.port_u16().unwrap_or(9211); // Use Envoy port directly
3938

40-
(format!("http://{}:{}", host, port), authority.to_string())
39+
(
40+
format!("http://{}:{}", host, port),
41+
Some(authority.to_string()),
42+
)
4143
} else {
4244
let parts: Vec<&str> = target.split(':').collect();
4345
let host = parts.first().unwrap_or(&"localhost").to_string();
@@ -47,24 +49,23 @@ impl UpstreamConfig {
4749
.unwrap_or(if is_in_process { 8015 } else { 8013 });
4850

4951
debug!("Using standard resolution with {}:{}", host, port);
50-
(format!("http://{}:{}", host, port), host)
52+
(format!("http://{}:{}", host, port), None) // Standard resolution doesn't need custom authority
5153
};
5254

53-
let endpoint = Endpoint::from_shared(endpoint_str)?;
54-
let authority_uri =
55-
Uri::from_str(authority.as_str()).context("Failed to parse authority")?;
55+
let endpoint = Endpoint::from_shared(endpoint_str)
56+
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;
5657

5758
Ok(Self {
5859
endpoint,
59-
authority: authority_uri,
60+
authority,
6061
})
6162
}
6263

6364
pub fn endpoint(&self) -> &Endpoint {
6465
&self.endpoint
6566
}
6667

67-
pub fn authority(&self) -> &Uri {
68-
&self.authority
68+
pub fn authority(&self) -> Option<String> {
69+
self.authority.clone()
6970
}
7071
}

crates/flagd/src/resolver/in_process/model/flag_parser.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
use super::feature_flag::FeatureFlag;
22
use super::feature_flag::ParsingResult;
3-
use anyhow::Result;
3+
use crate::error::FlagdError;
44
use serde_json::{Map, Value};
55
use std::collections::HashMap;
66

77
pub struct FlagParser;
88

99
impl FlagParser {
10-
pub fn parse_string(configuration: &str) -> Result<ParsingResult> {
10+
pub fn parse_string(configuration: &str) -> Result<ParsingResult, FlagdError> {
1111
let value: Value = serde_json::from_str(configuration)?;
1212
let obj = value
1313
.as_object()
14-
.ok_or_else(|| anyhow::anyhow!("Invalid JSON structure"))?;
14+
.ok_or_else(|| FlagdError::Parse("Invalid JSON structure".to_string()))?;
1515

1616
let flags = obj
1717
.get("flags")
1818
.and_then(|v| v.as_object())
19-
.ok_or_else(|| anyhow::anyhow!("No flag configurations found in the payload"))?;
19+
.ok_or_else(|| {
20+
FlagdError::Parse("No flag configurations found in the payload".to_string())
21+
})?;
2022

2123
let flag_set_metadata = obj
2224
.get("metadata")

0 commit comments

Comments
 (0)