Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions crates/flagd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ pub struct FlagdOptions {
pub stream_deadline_ms: u32,
/// Offline polling interval in milliseconds
pub offline_poll_interval_ms: Option<u32>,
/// Provider ID for identifying this provider instance to flagd
/// Used in in-process resolver for sync requests
pub provider_id: Option<String>,
}
/// Type of resolver to use for flag evaluation
#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -336,9 +339,12 @@ impl Default for FlagdOptions {
.and_then(|s| s.parse().ok())
.unwrap_or(5000),
),
provider_id: std::env::var("FLAGD_PROVIDER_ID").ok(),
};

if options.source_configuration.is_some() && options.resolver_type != ResolverType::Rpc {
let resolver_env_set = std::env::var("FLAGD_RESOLVER").is_ok();
if options.source_configuration.is_some() && !resolver_env_set {
// Only override to File if FLAGD_RESOLVER wasn't explicitly set
options.resolver_type = ResolverType::File;
}

Expand All @@ -360,6 +366,13 @@ impl FlagdProvider {
pub async fn new(options: FlagdOptions) -> Result<Self, FlagdError> {
debug!("Initializing FlagdProvider with options: {:?}", options);

// Validate File resolver configuration
if options.resolver_type == ResolverType::File && options.source_configuration.is_none() {
return Err(FlagdError::Config(
"File resolver requires 'source_configuration' (FLAGD_OFFLINE_FLAG_SOURCE_PATH) to be set".to_string()
));
}

let provider: Arc<dyn FeatureProvider + Send + Sync> = match options.resolver_type {
ResolverType::Rpc => {
debug!("Using RPC resolver");
Expand All @@ -377,7 +390,9 @@ impl FlagdProvider {
debug!("Using file resolver");
Arc::new(
FileResolver::new(
options.source_configuration.unwrap(),
options
.source_configuration
.expect("source_configuration validated above"),
options.cache_settings.clone(),
)
.await?,
Expand Down
38 changes: 18 additions & 20 deletions crates/flagd/src/resolver/in_process/storage/connector/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub struct GrpcStreamConnector {
retry_backoff_max_ms: u32,
retry_grace_period: u32,
stream_deadline_ms: u32,
authority: String, // desired authority, e.g. "b-features-api.service"
authority: String, // desired authority, e.g. "b-features-api.service"
provider_id: String, // provider identifier for sync requests
}

impl GrpcStreamConnector {
Expand All @@ -52,6 +53,10 @@ impl GrpcStreamConnector {
retry_grace_period: options.retry_grace_period,
stream_deadline_ms: options.stream_deadline_ms,
authority,
provider_id: options
.provider_id
.clone()
.unwrap_or_else(|| "rust-flagd-provider".to_string()),
}
}

Expand Down Expand Up @@ -119,7 +124,7 @@ impl GrpcStreamConnector {
// Create the gRPC client with no interceptor because the endpoint already carries the desired authority.
let mut client = FlagSyncServiceClient::new(channel);
let request = tonic::Request::new(SyncFlagsRequest {
provider_id: "rust-flagd-provider".to_string(),
provider_id: self.provider_id.clone(),
selector: self.selector.clone().unwrap_or_default(),
});
debug!("Sending sync request with selector: {:?}", self.selector);
Expand Down Expand Up @@ -227,24 +232,17 @@ mod tests {
drop(listener);

// Create options configured for a failing connection.
let options = FlagdOptions {
host: addr.ip().to_string(),
resolver_type: crate::ResolverType::InProcess,
port: addr.port(),
target_uri: None,
deadline_ms: 100, // Short timeout for fast failures
retry_backoff_ms: 100,
retry_backoff_max_ms: 400,
retry_grace_period: 3,
stream_deadline_ms: 500,
tls: false,
cert_path: None,
selector: None,
socket_path: None,
cache_settings: None,
source_configuration: None,
offline_poll_interval_ms: None,
};
let mut options = FlagdOptions::default();
options.host = addr.ip().to_string();
options.resolver_type = crate::ResolverType::InProcess;
options.port = addr.port();
options.deadline_ms = 100; // Short timeout for fast failures
options.retry_backoff_ms = 100;
options.retry_backoff_max_ms = 400;
options.retry_grace_period = 3;
options.stream_deadline_ms = 500;
options.tls = false;
options.cache_settings = None;

let target = format!("{}:{}", addr.ip(), addr.port());
let connector =
Expand Down
71 changes: 66 additions & 5 deletions crates/flagd/src/resolver/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ fn convert_proto_metadata(metadata: prost_types::Struct) -> FlagMetadata {
FlagMetadata { values }
}

/// Maps gRPC status codes to OpenFeature error codes
///
/// This ensures consistent error handling across different resolver types
/// and proper conformance with the OpenFeature specification.
fn map_grpc_status_to_error_code(status: &tonic::Status) -> EvaluationErrorCode {
use tonic::Code;
match status.code() {
Code::NotFound => EvaluationErrorCode::FlagNotFound,
Code::InvalidArgument => EvaluationErrorCode::InvalidContext,
Code::Unauthenticated | Code::PermissionDenied => {
EvaluationErrorCode::General("authentication/authorization error".to_string())
}
Code::FailedPrecondition => EvaluationErrorCode::TypeMismatch,
Code::DeadlineExceeded | Code::Cancelled => {
EvaluationErrorCode::General("request timeout or cancelled".to_string())
}
Code::Unavailable => EvaluationErrorCode::General("service unavailable".to_string()),
_ => EvaluationErrorCode::General(format!("{:?}", status.code())),
}
}

pub struct RpcResolver {
client: ClientType,
metadata: OnceLock<ProviderMetadata>,
Expand Down Expand Up @@ -214,7 +235,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve boolean flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -247,7 +268,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve string flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -280,7 +301,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve float flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -313,7 +334,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve integer flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -346,7 +367,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve struct flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -765,4 +786,44 @@ mod tests {
// Clean shutdown
server_handle.abort();
}

#[test]
fn test_grpc_error_code_mapping() {
use tonic::Code;

// Test NOT_FOUND -> FlagNotFound
let status = tonic::Status::new(Code::NotFound, "Flag not found");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::FlagNotFound));

// Test INVALID_ARGUMENT -> InvalidContext
let status = tonic::Status::new(Code::InvalidArgument, "Invalid context");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::InvalidContext));

// Test UNAUTHENTICATED -> General
let status = tonic::Status::new(Code::Unauthenticated, "Not authenticated");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));

// Test PERMISSION_DENIED -> General
let status = tonic::Status::new(Code::PermissionDenied, "Access denied");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));

// Test FAILED_PRECONDITION -> TypeMismatch
let status = tonic::Status::new(Code::FailedPrecondition, "Type mismatch");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::TypeMismatch));

// Test DEADLINE_EXCEEDED -> General
let status = tonic::Status::new(Code::DeadlineExceeded, "Timeout");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));

// Test UNAVAILABLE -> General
let status = tonic::Status::new(Code::Unavailable, "Service unavailable");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));
}
}
83 changes: 64 additions & 19 deletions crates/flagd/tests/gherkin_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ impl ConfigWorld {

fn clear(&mut self) {
// SAFETY: Removing environment variables is safe here because:
// 1. We're only removing variables that were set during this specific test scenario
// 2. The test is protected by #[serial_test::serial]
// 3. This prevents test pollution between scenarios
// 4. All variables being removed are tracked in world.env_vars
// 1. The test is protected by #[serial_test::serial]
// 2. This prevents test pollution between scenarios
// 3. We clear env vars that were set in previous scenarios

// Clear env vars that were set in the previous scenario
for key in self.env_vars.keys() {
unsafe {
std::env::remove_var(key);
}
}
self.env_vars.clear();

// Also explicitly clear FLAGD_OFFLINE_FLAG_SOURCE_PATH because it can affect resolver type
// via FlagdOptions::default() logic, even if not tracked in world.env_vars
unsafe {
std::env::remove_var("FLAGD_OFFLINE_FLAG_SOURCE_PATH");
}

self.env_vars.clear();
self.options = FlagdOptions::default();
self.provider = None;
self.option_values.clear();
Expand Down Expand Up @@ -106,43 +113,60 @@ async fn env_with_value(world: &mut ConfigWorld, env: String, value: String) {

#[when(expr = "a config was initialized")]
async fn initialize_config(world: &mut ConfigWorld) {
// Start with defaults (which reads from environment variables)
let mut options = FlagdOptions::default();
let mut resolver_explicitly_set = false;

// Handle resolver type first
if let Some(resolver) = world.option_values.get("resolver") {
// Apply env vars from world.env_vars to ensure they take precedence
// This handles cases where env vars were set in test steps but timing issues
// prevent FlagdOptions::default() from reading them correctly
if let Some(resolver) = world.env_vars.get("FLAGD_RESOLVER") {
options.resolver_type = match resolver.to_uppercase().as_str() {
"RPC" => ResolverType::Rpc,
"REST" => ResolverType::Rest,
"IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
"FILE" | "OFFLINE" => ResolverType::File,
_ => ResolverType::Rpc,
};
} else if let Ok(resolver) = std::env::var("FLAGD_RESOLVER") {
resolver_explicitly_set = true;
// Update port based on resolver type when set via env var
options.port = match options.resolver_type {
ResolverType::Rpc => 8013,
ResolverType::InProcess => 8015,
_ => options.port,
};
}

// Handle explicit options - these override env vars
if let Some(resolver) = world.option_values.get("resolver") {
options.resolver_type = match resolver.to_uppercase().as_str() {
"RPC" => ResolverType::Rpc,
"REST" => ResolverType::Rest,
"IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
"FILE" | "OFFLINE" => ResolverType::File,
_ => ResolverType::Rpc,
};
resolver_explicitly_set = true;
// Update port based on resolver type when explicitly set
options.port = match options.resolver_type {
ResolverType::Rpc => 8013,
ResolverType::InProcess => 8015,
_ => options.port,
};
}

// Set default port based on resolver type
options.port = match options.resolver_type {
ResolverType::Rpc => 8013,
ResolverType::InProcess => 8015,
_ => options.port,
};

// Handle source configuration after resolver type
// Handle source configuration - may override resolver type for backwards compatibility
// BUT only if resolver wasn't explicitly set to "rpc"
if let Some(source) = world.option_values.get("offlineFlagSourcePath") {
options.source_configuration = Some(source.clone());
if options.resolver_type != ResolverType::Rpc {
// For backwards compatibility: if offline path is set, switch to File resolver
// UNLESS resolver was explicitly set to "rpc" (in which case keep it as "rpc")
if !resolver_explicitly_set || options.resolver_type != ResolverType::Rpc {
options.resolver_type = ResolverType::File;
}
}
Comment on lines 160 to 167

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic here for determining the resolver type seems to contradict the stated goal of the pull request and the implementation in FlagdOptions::default().

The PR description states: "FLAGD_RESOLVER takes precedence. If set, it is respected and FLAGD_OFFLINE_FLAG_SOURCE_PATH does not override the resolver type to File."

The logic in FlagdOptions::default() correctly implements this by only overriding to File if FLAGD_RESOLVER is not set. However, the logic in this test is if !resolver_explicitly_set || options.resolver_type != ResolverType::Rpc, which is equivalent to if !(resolver_explicitly_set && options.resolver_type == ResolverType::Rpc). This means that setting offlineFlagSourcePath will force the resolver to File unless the resolver was explicitly set to Rpc. For example, if the resolver was explicitly set to InProcess, this logic would override it to File, which seems incorrect according to the PR's goal.

If the goal is to give any explicitly set resolver precedence, the condition should be simplified. If the current logic is required to pass the behavioral tests, it might be worth adding a more detailed comment explaining this specific backward-compatibility rule.

Suggested change
if let Some(source) = world.option_values.get("offlineFlagSourcePath") {
options.source_configuration = Some(source.clone());
if options.resolver_type != ResolverType::Rpc {
// For backwards compatibility: if offline path is set, switch to File resolver
// UNLESS resolver was explicitly set to "rpc" (in which case keep it as "rpc")
if !resolver_explicitly_set || options.resolver_type != ResolverType::Rpc {
options.resolver_type = ResolverType::File;
}
}
if let Some(source) = world.option_values.get("offlineFlagSourcePath") {
options.source_configuration = Some(source.clone());
// For backwards compatibility: if offline path is set, switch to File resolver
// UNLESS resolver was explicitly set.
if !resolver_explicitly_set {
options.resolver_type = ResolverType::File;
}
}


// Handle remaining explicit options
// Handle remaining explicit options (these override env vars)
if let Some(host) = world.option_values.get("host") {
options.host = host.clone();
}
Expand Down Expand Up @@ -217,6 +241,9 @@ async fn initialize_config(world: &mut ConfigWorld) {
if let Some(selector) = world.option_values.get("selector") {
options.selector = Some(selector.clone());
}
if let Some(provider_id) = world.option_values.get("providerId") {
options.provider_id = Some(provider_id.clone());
}
if let Some(max_size) = world
.option_values
.get("maxCacheSize")
Expand Down Expand Up @@ -269,12 +296,30 @@ async fn check_option_value(
"retryBackoffMaxMs" => Some(world.options.retry_backoff_max_ms.to_string()),
"retryGracePeriod" => Some(world.options.retry_grace_period.to_string()),
"selector" => world.options.selector.clone(),
"providerId" => world.options.provider_id.clone(),
"socketPath" => world.options.socket_path.clone(),
"streamDeadlineMs" => Some(world.options.stream_deadline_ms.to_string()),
_ => None,
};
let expected = convert_type(&option_type, &value);
assert_eq!(actual, expected, "Option '{}' value mismatch", option);

// For resolver type, do case-insensitive comparison since enum normalizes to lowercase
let actual_normalized = if option == "resolver" {
actual.as_ref().map(|v| v.to_lowercase())
} else {
actual.clone()
};
let expected_normalized = if option == "resolver" {
expected.as_ref().map(|v| v.to_lowercase())
} else {
expected.clone()
};

assert_eq!(
actual_normalized, expected_normalized,
"Option '{}' value mismatch",
option
);
}

#[test(tokio::test)]
Expand Down
Loading