Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions crates/flagd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ pub struct FlagdOptions {
pub stream_deadline_ms: u32,
/// Offline polling interval in milliseconds
pub offline_poll_interval_ms: Option<u32>,
/// Provider ID for identifying this provider instance to flagd
/// Used in in-process resolver for sync requests
pub provider_id: Option<String>,
}
/// Type of resolver to use for flag evaluation
#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -336,9 +339,12 @@ impl Default for FlagdOptions {
.and_then(|s| s.parse().ok())
.unwrap_or(5000),
),
provider_id: std::env::var("FLAGD_PROVIDER_ID").ok(),
};

if options.source_configuration.is_some() && options.resolver_type != ResolverType::Rpc {
let resolver_env_set = std::env::var("FLAGD_RESOLVER").is_ok();
if options.source_configuration.is_some() && !resolver_env_set {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also add a check here that source_configuration needs to be set, if the ResolverType is file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much! I updated it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure i can really provide a review - i have no clue about rust, i just saw that this behaviour was missing, but i can give it a shot

// Only override to File if FLAGD_RESOLVER wasn't explicitly set
options.resolver_type = ResolverType::File;
}

Expand All @@ -360,6 +366,13 @@ impl FlagdProvider {
pub async fn new(options: FlagdOptions) -> Result<Self, FlagdError> {
debug!("Initializing FlagdProvider with options: {:?}", options);

// Validate File resolver configuration
if options.resolver_type == ResolverType::File && options.source_configuration.is_none() {
return Err(FlagdError::Config(
"File resolver requires 'source_configuration' (FLAGD_OFFLINE_FLAG_SOURCE_PATH) to be set".to_string()
));
}

let provider: Arc<dyn FeatureProvider + Send + Sync> = match options.resolver_type {
ResolverType::Rpc => {
debug!("Using RPC resolver");
Expand All @@ -377,7 +390,9 @@ impl FlagdProvider {
debug!("Using file resolver");
Arc::new(
FileResolver::new(
options.source_configuration.unwrap(),
options
.source_configuration
.expect("source_configuration validated above"),
options.cache_settings.clone(),
)
.await?,
Expand Down
38 changes: 18 additions & 20 deletions crates/flagd/src/resolver/in_process/storage/connector/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub struct GrpcStreamConnector {
retry_backoff_max_ms: u32,
retry_grace_period: u32,
stream_deadline_ms: u32,
authority: String, // desired authority, e.g. "b-features-api.service"
authority: String, // desired authority, e.g. "b-features-api.service"
provider_id: String, // provider identifier for sync requests
}

impl GrpcStreamConnector {
Expand All @@ -52,6 +53,10 @@ impl GrpcStreamConnector {
retry_grace_period: options.retry_grace_period,
stream_deadline_ms: options.stream_deadline_ms,
authority,
provider_id: options
.provider_id
.clone()
.unwrap_or_else(|| "rust-flagd-provider".to_string()),
}
}

Expand Down Expand Up @@ -119,7 +124,7 @@ impl GrpcStreamConnector {
// Create the gRPC client with no interceptor because the endpoint already carries the desired authority.
let mut client = FlagSyncServiceClient::new(channel);
let request = tonic::Request::new(SyncFlagsRequest {
provider_id: "rust-flagd-provider".to_string(),
provider_id: self.provider_id.clone(),
selector: self.selector.clone().unwrap_or_default(),
});
debug!("Sending sync request with selector: {:?}", self.selector);
Expand Down Expand Up @@ -227,24 +232,17 @@ mod tests {
drop(listener);

// Create options configured for a failing connection.
let options = FlagdOptions {
host: addr.ip().to_string(),
resolver_type: crate::ResolverType::InProcess,
port: addr.port(),
target_uri: None,
deadline_ms: 100, // Short timeout for fast failures
retry_backoff_ms: 100,
retry_backoff_max_ms: 400,
retry_grace_period: 3,
stream_deadline_ms: 500,
tls: false,
cert_path: None,
selector: None,
socket_path: None,
cache_settings: None,
source_configuration: None,
offline_poll_interval_ms: None,
};
let mut options = FlagdOptions::default();
options.host = addr.ip().to_string();
options.resolver_type = crate::ResolverType::InProcess;
options.port = addr.port();
options.deadline_ms = 100; // Short timeout for fast failures
options.retry_backoff_ms = 100;
options.retry_backoff_max_ms = 400;
options.retry_grace_period = 3;
options.stream_deadline_ms = 500;
options.tls = false;
options.cache_settings = None;

let target = format!("{}:{}", addr.ip(), addr.port());
let connector =
Expand Down
71 changes: 66 additions & 5 deletions crates/flagd/src/resolver/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,27 @@ fn convert_proto_metadata(metadata: prost_types::Struct) -> FlagMetadata {
FlagMetadata { values }
}

/// Maps gRPC status codes to OpenFeature error codes
///
/// This ensures consistent error handling across different resolver types
/// and proper conformance with the OpenFeature specification.
fn map_grpc_status_to_error_code(status: &tonic::Status) -> EvaluationErrorCode {
use tonic::Code;
match status.code() {
Code::NotFound => EvaluationErrorCode::FlagNotFound,
Code::InvalidArgument => EvaluationErrorCode::InvalidContext,
Code::Unauthenticated | Code::PermissionDenied => {
EvaluationErrorCode::General("authentication/authorization error".to_string())
}
Code::FailedPrecondition => EvaluationErrorCode::TypeMismatch,
Code::DeadlineExceeded | Code::Cancelled => {
EvaluationErrorCode::General("request timeout or cancelled".to_string())
}
Code::Unavailable => EvaluationErrorCode::General("service unavailable".to_string()),
_ => EvaluationErrorCode::General(format!("{:?}", status.code())),
}
}

pub struct RpcResolver {
client: ClientType,
metadata: OnceLock<ProviderMetadata>,
Expand Down Expand Up @@ -214,7 +235,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve boolean flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -247,7 +268,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve string flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -280,7 +301,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve float flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -313,7 +334,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve integer flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -346,7 +367,7 @@ impl FeatureProvider for RpcResolver {
Err(status) => {
error!(flag_key, error = %status, "failed to resolve struct flag");
Err(EvaluationError {
code: EvaluationErrorCode::General(status.code().to_string()),
code: map_grpc_status_to_error_code(&status),
message: Some(status.message().to_string()),
})
}
Expand Down Expand Up @@ -765,4 +786,44 @@ mod tests {
// Clean shutdown
server_handle.abort();
}

#[test]
fn test_grpc_error_code_mapping() {
use tonic::Code;

// Test NOT_FOUND -> FlagNotFound
let status = tonic::Status::new(Code::NotFound, "Flag not found");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::FlagNotFound));

// Test INVALID_ARGUMENT -> InvalidContext
let status = tonic::Status::new(Code::InvalidArgument, "Invalid context");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::InvalidContext));

// Test UNAUTHENTICATED -> General
let status = tonic::Status::new(Code::Unauthenticated, "Not authenticated");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));

// Test PERMISSION_DENIED -> General
let status = tonic::Status::new(Code::PermissionDenied, "Access denied");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));

// Test FAILED_PRECONDITION -> TypeMismatch
let status = tonic::Status::new(Code::FailedPrecondition, "Type mismatch");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::TypeMismatch));

// Test DEADLINE_EXCEEDED -> General
let status = tonic::Status::new(Code::DeadlineExceeded, "Timeout");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));

// Test UNAVAILABLE -> General
let status = tonic::Status::new(Code::Unavailable, "Service unavailable");
let error_code = map_grpc_status_to_error_code(&status);
assert!(matches!(error_code, EvaluationErrorCode::General(_)));
}
}
83 changes: 64 additions & 19 deletions crates/flagd/tests/gherkin_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,24 @@ impl ConfigWorld {

fn clear(&mut self) {
// SAFETY: Removing environment variables is safe here because:
// 1. We're only removing variables that were set during this specific test scenario
// 2. The test is protected by #[serial_test::serial]
// 3. This prevents test pollution between scenarios
// 4. All variables being removed are tracked in world.env_vars
// 1. The test is protected by #[serial_test::serial]
// 2. This prevents test pollution between scenarios
// 3. We clear env vars that were set in previous scenarios

// Clear env vars that were set in the previous scenario
for key in self.env_vars.keys() {
unsafe {
std::env::remove_var(key);
}
}
self.env_vars.clear();

// Also explicitly clear FLAGD_OFFLINE_FLAG_SOURCE_PATH because it can affect resolver type
// via FlagdOptions::default() logic, even if not tracked in world.env_vars
unsafe {
std::env::remove_var("FLAGD_OFFLINE_FLAG_SOURCE_PATH");
}

self.env_vars.clear();
self.options = FlagdOptions::default();
self.provider = None;
self.option_values.clear();
Expand Down Expand Up @@ -106,43 +113,60 @@ async fn env_with_value(world: &mut ConfigWorld, env: String, value: String) {

#[when(expr = "a config was initialized")]
async fn initialize_config(world: &mut ConfigWorld) {
// Start with defaults (which reads from environment variables)
let mut options = FlagdOptions::default();
let mut resolver_explicitly_set = false;

// Handle resolver type first
if let Some(resolver) = world.option_values.get("resolver") {
// Apply env vars from world.env_vars to ensure they take precedence
// This handles cases where env vars were set in test steps but timing issues
// prevent FlagdOptions::default() from reading them correctly
if let Some(resolver) = world.env_vars.get("FLAGD_RESOLVER") {
options.resolver_type = match resolver.to_uppercase().as_str() {
"RPC" => ResolverType::Rpc,
"REST" => ResolverType::Rest,
"IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
"FILE" | "OFFLINE" => ResolverType::File,
_ => ResolverType::Rpc,
};
} else if let Ok(resolver) = std::env::var("FLAGD_RESOLVER") {
resolver_explicitly_set = true;
// Update port based on resolver type when set via env var
options.port = match options.resolver_type {
ResolverType::Rpc => 8013,
ResolverType::InProcess => 8015,
_ => options.port,
};
}

// Handle explicit options - these override env vars
if let Some(resolver) = world.option_values.get("resolver") {
options.resolver_type = match resolver.to_uppercase().as_str() {
"RPC" => ResolverType::Rpc,
"REST" => ResolverType::Rest,
"IN-PROCESS" | "INPROCESS" => ResolverType::InProcess,
"FILE" | "OFFLINE" => ResolverType::File,
_ => ResolverType::Rpc,
};
resolver_explicitly_set = true;
// Update port based on resolver type when explicitly set
options.port = match options.resolver_type {
ResolverType::Rpc => 8013,
ResolverType::InProcess => 8015,
_ => options.port,
};
}

// Set default port based on resolver type
options.port = match options.resolver_type {
ResolverType::Rpc => 8013,
ResolverType::InProcess => 8015,
_ => options.port,
};

// Handle source configuration after resolver type
// Handle source configuration - may override resolver type for backwards compatibility
// BUT only if resolver wasn't explicitly set to "rpc"
if let Some(source) = world.option_values.get("offlineFlagSourcePath") {
options.source_configuration = Some(source.clone());
if options.resolver_type != ResolverType::Rpc {
// For backwards compatibility: if offline path is set, switch to File resolver
// UNLESS resolver was explicitly set to "rpc" (in which case keep it as "rpc")
if !resolver_explicitly_set || options.resolver_type != ResolverType::Rpc {
options.resolver_type = ResolverType::File;
}
}

// Handle remaining explicit options
// Handle remaining explicit options (these override env vars)
if let Some(host) = world.option_values.get("host") {
options.host = host.clone();
}
Expand Down Expand Up @@ -217,6 +241,9 @@ async fn initialize_config(world: &mut ConfigWorld) {
if let Some(selector) = world.option_values.get("selector") {
options.selector = Some(selector.clone());
}
if let Some(provider_id) = world.option_values.get("providerId") {
options.provider_id = Some(provider_id.clone());
}
if let Some(max_size) = world
.option_values
.get("maxCacheSize")
Expand Down Expand Up @@ -269,12 +296,30 @@ async fn check_option_value(
"retryBackoffMaxMs" => Some(world.options.retry_backoff_max_ms.to_string()),
"retryGracePeriod" => Some(world.options.retry_grace_period.to_string()),
"selector" => world.options.selector.clone(),
"providerId" => world.options.provider_id.clone(),
"socketPath" => world.options.socket_path.clone(),
"streamDeadlineMs" => Some(world.options.stream_deadline_ms.to_string()),
_ => None,
};
let expected = convert_type(&option_type, &value);
assert_eq!(actual, expected, "Option '{}' value mismatch", option);

// For resolver type, do case-insensitive comparison since enum normalizes to lowercase
let actual_normalized = if option == "resolver" {
actual.as_ref().map(|v| v.to_lowercase())
} else {
actual.clone()
};
let expected_normalized = if option == "resolver" {
expected.as_ref().map(|v| v.to_lowercase())
} else {
expected.clone()
};

assert_eq!(
actual_normalized, expected_normalized,
"Option '{}' value mismatch",
option
);
}

#[test(tokio::test)]
Expand Down
Loading