Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/flagd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ tracing = "0.1"
thiserror = "2.0"

# RPC and In-Process shared dependencies (gRPC)
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen"], optional = true }
tonic = { version = "0.14", default-features = false, features = ["transport", "codegen", "tls-ring", "tls-webpki-roots"], optional = true }
tonic-prost = { version = "0.14", optional = true }
prost = { version = "0.14", optional = true }
prost-types = { version = "0.14", optional = true }
Expand Down
103 changes: 96 additions & 7 deletions crates/flagd/src/resolver/common/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ pub struct UpstreamConfig {
}

impl UpstreamConfig {
pub fn new(target: String, is_in_process: bool) -> Result<Self, FlagdError> {
debug!("Creating upstream config for target: {}", target);
pub fn new(target: String, is_in_process: bool, tls: bool) -> Result<Self, FlagdError> {
debug!(
"Creating upstream config for target: {}, tls: {}",
target, tls
);

if target.starts_with("http://") {
debug!("Target is already an HTTP endpoint");
let scheme = if tls { "https" } else { "http" };

if target.starts_with("http://") || target.starts_with("https://") {
debug!("Target is already an HTTP(S) endpoint");
let endpoint = Endpoint::from_shared(target)
.map_err(|e| FlagdError::Config(format!("Invalid endpoint: {}", e)))?;
return Ok(Self {
endpoint,
authority: None, // Standard HTTP doesn't need custom authority
authority: None, // Standard HTTP(S) doesn't need custom authority
});
}

Expand All @@ -37,7 +42,7 @@ impl UpstreamConfig {
let port = uri.port_u16().unwrap_or(9211); // Use Envoy port directly

(
format!("http://{}:{}", host, port),
format!("{}://{}:{}", scheme, host, port),
Some(authority.to_string()),
)
} else {
Expand All @@ -49,7 +54,7 @@ impl UpstreamConfig {
.unwrap_or(if is_in_process { 8015 } else { 8013 });

debug!("Using standard resolution with {}:{}", host, port);
(format!("http://{}:{}", host, port), None) // Standard resolution doesn't need custom authority
(format!("{}://{}:{}", scheme, host, port), None) // Standard resolution doesn't need custom authority
};

let endpoint = Endpoint::from_shared(endpoint_str)
Expand All @@ -69,3 +74,87 @@ impl UpstreamConfig {
self.authority.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_tls_disabled_uses_http_scheme() {
let config = UpstreamConfig::new("localhost:8013".to_string(), false, false).unwrap();
assert!(config.endpoint().uri().to_string().starts_with("http://"));
assert_eq!(
config.endpoint().uri().to_string(),
"http://localhost:8013/"
);
}

#[test]
fn test_tls_enabled_uses_https_scheme() {
let config = UpstreamConfig::new("localhost:8013".to_string(), false, true).unwrap();
assert!(config.endpoint().uri().to_string().starts_with("https://"));
assert_eq!(
config.endpoint().uri().to_string(),
"https://localhost:8013/"
);
}

#[test]
fn test_in_process_default_port_with_tls() {
let config = UpstreamConfig::new("localhost".to_string(), true, true).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"https://localhost:8015/"
);
}

#[test]
fn test_rpc_default_port_with_tls() {
let config = UpstreamConfig::new("localhost".to_string(), false, true).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"https://localhost:8013/"
);
}

#[test]
fn test_explicit_http_url_preserved() {
let config =
UpstreamConfig::new("http://example.com:9000".to_string(), false, true).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"http://example.com:9000/"
);
}

#[test]
fn test_explicit_https_url_preserved() {
let config =
UpstreamConfig::new("https://example.com:9000".to_string(), false, false).unwrap();
assert_eq!(
config.endpoint().uri().to_string(),
"https://example.com:9000/"
);
}

#[test]
fn test_envoy_target_with_tls() {
let config =
UpstreamConfig::new("envoy://localhost:9211/my-service".to_string(), false, true)
.unwrap();
assert!(config.endpoint().uri().to_string().starts_with("https://"));
assert_eq!(config.authority(), Some("my-service".to_string()));
}

#[test]
fn test_envoy_target_without_tls() {
let config = UpstreamConfig::new(
"envoy://localhost:9211/my-service".to_string(),
false,
false,
)
.unwrap();
assert!(config.endpoint().uri().to_string().starts_with("http://"));
assert_eq!(config.authority(), Some("my-service".to_string()));
}
}
2 changes: 1 addition & 1 deletion crates/flagd/src/resolver/in_process/resolver/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl InProcessResolver {
.target_uri
.clone()
.unwrap_or_else(|| format!("{}:{}", options.host, options.port));
let upstream_config = UpstreamConfig::new(target, true)?;
let upstream_config = UpstreamConfig::new(target, true, options.tls)?;
let connector = GrpcStreamConnector::new(
upstream_config.endpoint().uri().to_string(),
options.selector.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct GrpcStreamConnector {
authority: Option<String>, // optional authority for custom name resolution (e.g. envoy://)
provider_id: String, // provider identifier for sync requests
channel: Arc<Mutex<Option<Channel>>>, // reusable channel for connection pooling
tls: bool, // whether to use TLS for connections
}

impl GrpcStreamConnector {
Expand Down Expand Up @@ -59,6 +60,7 @@ impl GrpcStreamConnector {
.clone()
.unwrap_or_else(|| "rust-flagd-provider".to_string()),
channel: Arc::new(Mutex::new(None)),
tls: options.tls,
}
}

Expand Down Expand Up @@ -90,6 +92,7 @@ impl GrpcStreamConnector {
.clone()
.unwrap_or_else(|| "rust-flagd-provider".to_string()),
channel: Arc::new(Mutex::new(None)),
tls: options.tls,
}
}

Expand Down Expand Up @@ -188,7 +191,7 @@ impl GrpcStreamConnector {
}

debug!("Creating new channel connection to {}", self.target);
let config = UpstreamConfig::new(self.target.clone(), true)?;
let config = UpstreamConfig::new(self.target.clone(), true, self.tls)?;
let channel = self.connect_with_timeout_using(&config).await?;
*channel_guard = Some(channel.clone());
Ok(channel)
Expand Down Expand Up @@ -335,7 +338,8 @@ mod tests {
let connector = GrpcStreamConnector::new(target.clone(), None, &options, None);

// Create an upstream configuration with the invalid target.
let config = UpstreamConfig::new(target, false).expect("failed to create upstream config");
let config =
UpstreamConfig::new(target, false, false).expect("failed to create upstream config");

let start = Instant::now();
let result = connector.connect_with_timeout_using(&config).await;
Expand Down
6 changes: 5 additions & 1 deletion crates/flagd/src/resolver/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ impl RpcResolver {
.target_uri
.clone()
.unwrap_or_else(|| format!("{}:{}", options.host, options.port));
let upstream_config = UpstreamConfig::new(target.replace("http://", ""), false)?;
let upstream_config = UpstreamConfig::new(
target.replace("http://", "").replace("https://", ""),
false,
options.tls,
)?;
let mut endpoint = upstream_config.endpoint().clone();

// Extend support for envoy names resolution
Expand Down