Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/flagd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rpc = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:hype
# REST evaluation mode - uses HTTP/OFREP to connect to flagd service
rest = ["dep:reqwest"]
# In-process evaluation mode - local evaluation with gRPC sync or file-based configuration
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver", "dep:notify"]
in-process = ["dep:tonic", "dep:tonic-prost", "dep:prost", "dep:prost-types", "dep:datalogic-rs", "dep:murmurhash3", "dep:semver", "dep:notify", "dep:hyper-util", "dep:tower"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -39,9 +39,9 @@ tonic-prost-build = "0.14"
[dev-dependencies]
cucumber = "0.22"
futures-core = "0.3"
testcontainers = { version = "0.26.0", features = ["http_wait", "blocking"] }
testcontainers = { version = "0.26.3", features = ["http_wait", "blocking"] }
wiremock = "0.6.5"
tempfile = "3.23.0"
tempfile = "3.24.0"
serial_test = "3.2"
test-log = { version = "0.2", features = ["trace"] }

Expand All @@ -58,7 +58,7 @@ tracing = "0.1"
thiserror = "2.0"

# RPC and In-Process shared dependencies (gRPC)
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen"], optional = true }
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen", "tls-ring", "tls-webpki-roots"], optional = true }
tonic-prost = { version = "0.14", optional = true }
prost = { version = "0.14", optional = true }
prost-types = { version = "0.14", optional = true }
Expand All @@ -68,10 +68,10 @@ hyper-util = { version = "0.1", features = ["tokio"], optional = true }
tower = { version = "0.5", default-features = false, features = ["util"], optional = true }

# REST-specific dependencies
reqwest = { version = "0.12", default-features = false, features = ["json", "stream", "rustls-tls"], optional = true }
reqwest = { version = "0.13.1", default-features = false, features = ["json", "stream", "rustls"], optional = true }

# In-Process-specific dependencies (targeting engine)
datalogic-rs = { version = "4.0.4", optional = true }
murmurhash3 = { version = "0.0.5", optional = true }
semver = { version = "1.0.27", optional = true }
notify = { version = "8.0", optional = true }
notify = { version = "8.2", optional = true }
12 changes: 9 additions & 3 deletions crates/flagd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,15 @@ pub struct FlagdOptions {
pub target_uri: Option<String>,
/// Type of resolver to use
pub resolver_type: ResolverType,
/// Whether to use TLS
/// Whether to use TLS for connections (uses HTTPS/gRPCS)
/// When enabled, connections will use TLS with system/webpki root certificates by default.
/// For self-signed or custom CA certificates, also set `cert_path`.
pub tls: bool,
/// Path to TLS certificate
/// Path to a PEM-encoded CA certificate file for TLS connections.
/// Use this to connect to flagd servers using self-signed or custom CA certificates.
/// When provided, this certificate is used as the trusted CA instead of system roots.
/// Can also be set via the `FLAGD_SERVER_CERT_PATH` environment variable.
/// Example: "/path/to/ca-cert.pem"
pub cert_path: Option<String>,
/// Request timeout in milliseconds
pub deadline_ms: u32,
Expand Down Expand Up @@ -469,7 +475,7 @@ impl FlagdProvider {
#[cfg(feature = "rest")]
ResolverType::Rest => {
debug!("Using REST resolver");
Arc::new(RestResolver::new(&options))
Arc::new(RestResolver::new(&options)?)
}
#[cfg(feature = "in-process")]
ResolverType::InProcess => {
Expand Down
193 changes: 184 additions & 9 deletions crates/flagd/src/resolver/common/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
use crate::error::FlagdError;
use std::str::FromStr;
use tonic::transport::{Certificate, ClientTlsConfig};
use tonic::transport::{Endpoint, Uri};
use tracing::debug;

#[derive(Debug)]
pub struct UpstreamConfig {
endpoint: Endpoint,
authority: Option<String>, // Only set for custom name resolution (envoy://)
}

impl UpstreamConfig {
pub fn new(target: String, is_in_process: bool) -> Result<Self, FlagdError> {
debug!("Creating upstream config for target: {}", target);
/// Creates a new upstream configuration for connecting to flagd.
///
/// # Arguments
/// * `target` - The target address (host:port, URL, or envoy:// URI)
/// * `is_in_process` - Whether this is for in-process resolver (affects default port)
/// * `tls` - Whether to use TLS for the connection
/// * `cert_path` - Optional path to a PEM-encoded CA certificate for custom/self-signed certs
///
/// # TLS Behavior
/// - If `cert_path` is provided, the certificate is loaded and used as the trusted CA
/// - If `cert_path` is None and TLS is enabled, system/webpki roots are used
/// - Self-signed certificates require providing the CA cert via `cert_path`
pub fn new(
target: String,
is_in_process: bool,
tls: bool,
cert_path: Option<&str>,
) -> Result<Self, FlagdError> {
debug!(
"Creating upstream config for target: {}, tls: {}, cert_path: {:?}",
target, tls, cert_path
);

if target.starts_with("http://") {
debug!("Target is already an HTTP endpoint");
let endpoint = Endpoint::from_shared(target)
let scheme = if tls { "https" } else { "http" };

if target.starts_with("http://") || target.starts_with("https://") {
debug!("Target is already an HTTP(S) endpoint");
let mut endpoint = Endpoint::from_shared(target.clone())
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;

// Apply TLS config for https URLs
if target.starts_with("https://") {
let tls_config = Self::build_tls_config(cert_path)?;
endpoint = endpoint
.tls_config(tls_config)
.map_err(|e| FlagdError::Config(format!("TLS config error: {}", e)))?;
}

return Ok(Self {
endpoint,
authority: None, // Standard HTTP doesn't need custom authority
authority: None, // Standard HTTP(S) doesn't need custom authority
});
}

Expand All @@ -37,7 +70,7 @@ impl UpstreamConfig {
let port = uri.port_u16().unwrap_or(9211); // Use Envoy port directly

(
format!("http://{}:{}", host, port),
format!("{}://{}:{}", scheme, host, port),
Some(authority.to_string()),
)
} else {
Expand All @@ -49,18 +82,50 @@ impl UpstreamConfig {
.unwrap_or(if is_in_process { 8015 } else { 8013 });

debug!("Using standard resolution with {}:{}", host, port);
(format!("http://{}:{}", host, port), None) // Standard resolution doesn't need custom authority
(format!("{}://{}:{}", scheme, host, port), None)
};

let endpoint = Endpoint::from_shared(endpoint_str)
let mut endpoint = Endpoint::from_shared(endpoint_str)
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;

// Apply TLS config when tls is enabled
if tls {
let tls_config = Self::build_tls_config(cert_path)?;
endpoint = endpoint
.tls_config(tls_config)
.map_err(|e| FlagdError::Config(format!("TLS config error: {}", e)))?;
}

Ok(Self {
endpoint,
authority,
})
}

/// Builds a TLS configuration, optionally loading a custom CA certificate.
///
/// # Arguments
/// * `cert_path` - Optional path to a PEM-encoded CA certificate file
///
/// # Returns
/// A configured `ClientTlsConfig` with either custom CA or system roots
fn build_tls_config(cert_path: Option<&str>) -> Result<ClientTlsConfig, FlagdError> {
let mut tls_config = ClientTlsConfig::new();

if let Some(path) = cert_path {
debug!("Loading custom CA certificate from: {}", path);
let cert_pem = std::fs::read(path).map_err(|e| {
FlagdError::Config(format!("Failed to read certificate file '{}': {}", path, e))
})?;
let ca_cert = Certificate::from_pem(cert_pem);
tls_config = tls_config.ca_certificate(ca_cert);
} else {
tls_config = tls_config.with_enabled_roots();
}

Ok(tls_config)
}

pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}
Expand All @@ -69,3 +134,113 @@ impl UpstreamConfig {
self.authority.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_tls_disabled_uses_http_scheme() {
let config = UpstreamConfig::new("localhost:8013".to_string(), false, false, None).unwrap();
assert!(config.endpoint().uri().to_string().starts_with("http://"));
assert_eq!(
config.endpoint().uri().to_string(),
"http://localhost:8013/"
);
}

#[test]
fn test_tls_enabled_uses_https_scheme() {
let config = UpstreamConfig::new("localhost:8013".to_string(), false, true, None).unwrap();
assert!(config.endpoint().uri().to_string().starts_with("https://"));
assert_eq!(
config.endpoint().uri().to_string(),
"https://localhost:8013/"
);
}

#[test]
fn test_in_process_default_port_with_tls() {
let config = UpstreamConfig::new("localhost".to_string(), true, true, None).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"https://localhost:8015/"
);
}

#[test]
fn test_rpc_default_port_with_tls() {
let config = UpstreamConfig::new("localhost".to_string(), false, true, None).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"https://localhost:8013/"
);
}

#[test]
fn test_explicit_http_url_preserved() {
let config =
UpstreamConfig::new("http://example.com:9000".to_string(), false, true, None).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"http://example.com:9000/"
);
}

#[test]
fn test_explicit_https_url_preserved() {
let config =
UpstreamConfig::new("https://example.com:9000".to_string(), false, false, None)
.unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"https://example.com:9000/"
);
}

#[test]
fn test_envoy_target_with_tls() {
let config = UpstreamConfig::new(
"envoy://localhost:9211/my-service".to_string(),
false,
true,
None,
)
.unwrap();
assert!(config.endpoint().uri().to_string().starts_with("https://"));
assert_eq!(config.authority(), Some("my-service".to_string()));
}

#[test]
fn test_envoy_target_without_tls() {
let config = UpstreamConfig::new(
"envoy://localhost:9211/my-service".to_string(),
false,
false,
None,
)
.unwrap();
assert!(config.endpoint().uri().to_string().starts_with("http://"));
assert_eq!(config.authority(), Some("my-service".to_string()));
}

#[test]
fn test_cert_path_file_not_found() {
let result = UpstreamConfig::new(
"localhost:8013".to_string(),
false,
true,
Some("/nonexistent/path/to/cert.pem"),
);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("Failed to read certificate file"));
}

#[test]
fn test_tls_with_no_cert_path_uses_system_roots() {
// This test verifies that TLS works without a custom cert (uses system roots)
let config = UpstreamConfig::new("localhost:8013".to_string(), false, true, None).unwrap();
assert!(config.endpoint().uri().to_string().starts_with("https://"));
}
}
3 changes: 2 additions & 1 deletion crates/flagd/src/resolver/in_process/resolver/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ impl InProcessResolver {
.target_uri
.clone()
.unwrap_or_else(|| format!("{}:{}", options.host, options.port));
let upstream_config = UpstreamConfig::new(target, true)?;
let upstream_config =
UpstreamConfig::new(target, true, options.tls, options.cert_path.as_deref())?;
let connector = GrpcStreamConnector::new(
upstream_config.endpoint().uri().to_string(),
options.selector.clone(),
Expand Down
Loading