Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ accessible by making an HTTP request to either "/stats/prometheus" or "/metrics"
- Proxies started (`istio_proxies_started_total`)
- Proxies stopped (`istio_proxies_stopped_total`)

#### Certificate metrics

- Cert expiration seconds (`istio_cert_expiration_seconds`): Gauge of seconds until the leaf
certificate expires (negative if expired), labeled by `identity` (SPIFFE URI).

#### XDS metrics

- XDS Connection terminations (`istio_xds_connection_terminations_total`)
Expand Down
18 changes: 14 additions & 4 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,12 @@ fn hbone_connections(c: &mut Criterion) {

// Global setup: spin up an echo server and ztunnel instance
let (echo_addr, ta) = rt.block_on(async move {
let cert_manager = identity::mock::new_secret_manager(Duration::from_secs(10));
let registry = Registry::default();
let identity_metrics = Arc::new(identity::metrics::Metrics::new());
let cert_manager = identity::mock::new_secret_manager_with_metrics(
Duration::from_secs(10),
identity_metrics.clone(),
);
let port = 80;
let config_source = Some(hbone_connection_config());
let config = test_helpers::test_config_with_port_xds_addr_and_root_cert(
Expand All @@ -517,9 +522,14 @@ fn hbone_connections(c: &mut Criterion) {
None,
config_source,
);
let app = app::build_with_cert(Arc::new(config), cert_manager.clone())
.await
.unwrap();
let app = app::build_with_cert_and_registry(
Arc::new(config),
cert_manager.clone(),
identity_metrics,
registry,
)
.await
.unwrap();
let ta = TestApp::from((&app, cert_manager));
ta.ready().await;

Expand Down
23 changes: 15 additions & 8 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ use crate::state::ProxyStateManager;
use crate::{admin, config, metrics, proxy, readiness, signal};
use crate::{dns, xds};

pub async fn build_with_cert(
pub async fn build_with_cert_and_registry(
config: Arc<config::Config>,
cert_manager: Arc<SecretManager>,
identity_metrics: Arc<crate::identity::metrics::Metrics>,
mut registry: Registry,
) -> anyhow::Result<Bound> {
// Start the data plane worker pool.
let data_plane_pool = new_data_plane_pool(config.num_worker_threads);
Expand Down Expand Up @@ -76,10 +78,10 @@ pub async fn build_with_cert(
})?;

// Register metrics.
let mut registry = Registry::default();
register_process_metrics(&mut registry);
let istio_registry = metrics::sub_registry(&mut registry);
let _ = metrics::meta::Metrics::new(istio_registry);
identity_metrics.register(istio_registry);
let xds_metrics = xds::Metrics::new(istio_registry);
let proxy_metrics = Arc::new(proxy::Metrics::new(istio_registry));
let dns_metrics = if config.dns_proxy {
Expand Down Expand Up @@ -313,21 +315,26 @@ fn new_data_plane_pool(num_worker_threads: usize) -> mpsc::Sender<DataPlaneTask>
}

pub async fn build(config: Arc<config::Config>) -> anyhow::Result<Bound> {
let registry = Registry::default();
let identity_metrics = Arc::new(crate::identity::metrics::Metrics::new());
let cert_manager = if config.fake_ca {
mock_secret_manager()
mock_secret_manager(identity_metrics.clone())
} else {
Arc::new(SecretManager::new(config.clone()).await?)
Arc::new(SecretManager::new(config.clone(), identity_metrics.clone()).await?)
};
build_with_cert(config, cert_manager).await
build_with_cert_and_registry(config, cert_manager, identity_metrics, registry).await
}

#[cfg(feature = "testing")]
fn mock_secret_manager() -> Arc<SecretManager> {
crate::identity::mock::new_secret_manager(std::time::Duration::from_secs(86400))
fn mock_secret_manager(metrics: Arc<crate::identity::metrics::Metrics>) -> Arc<SecretManager> {
crate::identity::mock::new_secret_manager_with_metrics(
std::time::Duration::from_secs(86400),
metrics,
)
}

#[cfg(not(feature = "testing"))]
fn mock_secret_manager() -> Arc<SecretManager> {
fn mock_secret_manager(_metrics: Arc<crate::identity::metrics::Metrics>) -> Arc<SecretManager> {
unimplemented!("fake_ca requires --features testing")
}

Expand Down
3 changes: 3 additions & 0 deletions src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::Arc;
mod caclient;
pub use caclient::*;

pub mod metrics;

pub mod manager;
pub use manager::*;

Expand All @@ -31,6 +33,7 @@ pub mod mock {
pub use super::caclient::mock::CaClient;
pub use super::manager::mock::{
Config as SecretManagerConfig, new_secret_manager, new_secret_manager_cfg,
new_secret_manager_cfg_with_metrics, new_secret_manager_with_metrics,
};
}

Expand Down
123 changes: 114 additions & 9 deletions src/identity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use prometheus_client::encoding::{EncodeLabelValue, LabelValueEncoder};
use tokio::sync::{Mutex, mpsc, watch};
use tokio::time::{Duration, Instant, sleep_until};

use crate::{strng, tls};
use crate::{identity::metrics, strng, tls};

use super::CaClient;
use super::Error::{self, Spiffe};
Expand Down Expand Up @@ -219,6 +219,7 @@ struct Worker {
certs: Mutex<HashMap<Identity, CertChannel>>,
// How many concurrent fetch_certificate calls can be pending at a time.
concurrency: u16,
metrics: Arc<metrics::Metrics>,
}

impl Worker {
Expand All @@ -235,6 +236,7 @@ impl Worker {
time_conv: cfg.time_conv,
concurrency: cfg.concurrency,
certs: Default::default(),
metrics: cfg.metrics,
});

// Process requests in the background. The task will terminate on its own when the
Expand Down Expand Up @@ -446,19 +448,58 @@ impl Worker {
while fetches.next().await.is_some() {}
}

async fn update_cert_expiration_metric(
&self,
id: &Identity,
not_after: Option<std::time::SystemTime>,
) {
let labels = metrics::CertExpirationLabels {
identity: id.clone(),
};

let Some(not_after) = not_after else {
self.metrics.cert_expiration_seconds.remove(&labels);
return;
};
let Some(not_after) = self.time_conv.system_time_to_instant(not_after) else {
self.metrics.cert_expiration_seconds.remove(&labels);
return;
};
let now = std::time::Instant::now();
let seconds = if not_after >= now {
i64::try_from(not_after.duration_since(now).as_secs()).unwrap_or(i64::MAX)
} else {
-i64::try_from(now.duration_since(not_after).as_secs()).unwrap_or(i64::MAX)
};
self.metrics
.cert_expiration_seconds
.get_or_create(&labels)
.set(seconds);
}

// Returns whether the Identity is still managed.
async fn update_certs(&self, id: &Identity, certs: CertState) -> bool {
// Both errors (lack of entry in the `certs` map and a send error) are handled the same way
// (by returning false): either (a) there was no entry in the `certs` map due to a
// forget_certificate call some time ago or (b) a forget_certificate call was made and
// finished just after the lock was released (but before certs was sent)
match self.certs.lock().await.get(id) {
Some(state) => {
state.tx.send(certs).expect("state.rx cannot be gone");
true
let not_after = match &certs {
CertState::Available(cert) => Some(cert.cert.expiration().not_after),
_ => None,
};
let updated = {
match self.certs.lock().await.get(id) {
Some(state) => {
state.tx.send(certs).expect("state.rx cannot be gone");
true
}
None => false,
}
None => false,
};
if updated {
self.update_cert_expiration_metric(id, not_after).await;
}
updated
}

/// Returns existing valid certificate and its expiry time, or None if unavailable/expired
Expand Down Expand Up @@ -516,6 +557,7 @@ pub enum Request {
pub struct SecretManagerConfig {
time_conv: crate::time::Converter,
concurrency: u16,
metrics: Arc<metrics::Metrics>,
}

// push_increase pushes an item onto the queue if its not present, otherwise updates the priority to the
Expand Down Expand Up @@ -547,7 +589,10 @@ impl fmt::Debug for SecretManager {
}

impl SecretManager {
pub async fn new(cfg: Arc<crate::config::Config>) -> Result<Self, Error> {
pub async fn new(
cfg: Arc<crate::config::Config>,
metrics: Arc<metrics::Metrics>,
) -> Result<Self, Error> {
let caclient = CaClient::new(
cfg.ca_address
.clone()
Expand All @@ -562,15 +607,19 @@ impl SecretManager {
cfg.ca_headers.vec.clone(),
)
.await?;
Ok(Self::new_with_client(caclient))
Ok(Self::new_with_client_and_metrics(caclient, metrics))
}

pub fn new_with_client<C: 'static + CaClientTrait>(client: C) -> Self {
pub fn new_with_client_and_metrics<C: 'static + CaClientTrait>(
client: C,
metrics: Arc<metrics::Metrics>,
) -> Self {
Self::new_internal(
Box::new(client),
SecretManagerConfig {
time_conv: crate::time::Converter::new(),
concurrency: 8,
metrics,
},
)
.0
Expand Down Expand Up @@ -676,6 +725,7 @@ impl SecretManager {
// We would ideally drop any pending or new requests to rotate.
if self.worker.certs.lock().await.remove(id).is_some() {
self.post(Request::Forget(id.clone())).await;
self.worker.update_cert_expiration_metric(id, None).await;
}
}

Expand Down Expand Up @@ -707,6 +757,7 @@ pub mod mock {
};

use crate::identity::caclient::mock::{self, CaClient as MockCaClient};
use crate::identity::metrics;

use super::SecretManager;

Expand All @@ -724,9 +775,34 @@ pub mod mock {
})
}

fn default_metrics() -> Arc<metrics::Metrics> {
Arc::new(metrics::Metrics::new())
}

// There is no need to return Arc, but most callers want one so it simplifies the code - and we
// don't care about the extra overhead in tests.
pub fn new_secret_manager_cfg(cfg: Config) -> Arc<SecretManager> {
new_secret_manager_cfg_with_metrics(cfg, default_metrics())
}

pub fn new_secret_manager_with_metrics(
cert_lifetime: Duration,
metrics: Arc<metrics::Metrics>,
) -> Arc<SecretManager> {
new_secret_manager_cfg_with_metrics(
Config {
cert_lifetime,
fetch_latency: Duration::ZERO,
epoch: None,
},
metrics,
)
}

pub fn new_secret_manager_cfg_with_metrics(
cfg: Config,
metrics: Arc<metrics::Metrics>,
) -> Arc<SecretManager> {
let time_conv = crate::time::Converter::new_at(cfg.epoch.unwrap_or_else(SystemTime::now));
let client = MockCaClient::new(mock::ClientConfig {
cert_lifetime: cfg.cert_lifetime,
Expand All @@ -739,6 +815,7 @@ pub mod mock {
super::SecretManagerConfig {
time_conv,
concurrency: 2,
metrics,
},
)
.0,
Expand Down Expand Up @@ -855,6 +932,32 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn test_cert_expiration_metric() {
let cert_lifetime = Duration::from_secs(60);
let identity_metrics = Arc::new(crate::identity::metrics::Metrics::new());
let secret_manager = mock::new_secret_manager_cfg_with_metrics(
mock::Config {
cert_lifetime,
fetch_latency: Duration::ZERO,
epoch: Some(time::SystemTime::UNIX_EPOCH),
},
identity_metrics.clone(),
);
let id: Identity = Default::default();
secret_manager.fetch_certificate(&id).await.unwrap();

let labels = crate::identity::metrics::CertExpirationLabels { identity: id };
let gauge = identity_metrics
.cert_expiration_seconds
.get(&labels)
.expect("metric missing");
let value = gauge.get();
let max = i64::try_from(cert_lifetime.as_secs()).unwrap();
assert!(value <= max);
assert!(value > 0);
}

fn collect_strings<T: IntoIterator>(xs: T) -> Vec<String>
where
T::Item: ToString,
Expand Down Expand Up @@ -899,6 +1002,7 @@ mod tests {
// nearest millisecond. That means eg. sleep(1ms) will advance the timer by 2ms while
// sleep(600us) will advance the timer by only 1ms.
let time_conv = crate::time::Converter::new();
let identity_metrics = Arc::new(crate::identity::metrics::Metrics::new());
let caclient = MockCaClient::new(caclient::mock::ClientConfig {
time_conv: time_conv.clone(),
fetch_latency: SEC,
Expand All @@ -909,6 +1013,7 @@ mod tests {
SecretManagerConfig {
time_conv,
concurrency,
metrics: identity_metrics,
},
);
Test {
Expand Down
Loading