diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 2ceaa9b7a..c95b1cf42 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -297,12 +297,12 @@ impl SidecarServer { let futures = clients .values() - .filter_map(|client| client.client.stats().ok()) + .filter_map(|client| client.client.lock_or_panic().worker.stats().ok()) .collect::>(); let metric_counts = clients .values() - .map(|client| client.telemetry_metrics.lock_or_panic().len() as u32) + .map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32) .collect::>(); (futures, metric_counts) @@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer { let env = entry.get().env.as_deref().unwrap_or("none"); // Lock telemetry client - let mut telemetry = self.telemetry_clients.get_or_create( + let telemetry_mutex = self.telemetry_clients.get_or_create( service, env, &instance_id, @@ -420,6 +420,7 @@ impl SidecarInterface for SidecarServer { }) }, ); + let mut telemetry = telemetry_mutex.lock_or_panic(); let mut actions_to_process = vec![]; let mut composer_paths_to_process = vec![]; @@ -462,24 +463,25 @@ impl SidecarInterface for SidecarServer { } if !actions_to_process.is_empty() { - let client_clone = telemetry.clone(); - let mut handle = telemetry.handle.lock_or_panic(); - let last_handle = handle.take(); - *handle = Some(tokio::spawn(async move { + let telemetry_mutex_clone = telemetry_mutex.clone(); + let worker = telemetry.worker.clone(); + let last_handle = telemetry.handle.take(); + telemetry.handle = Some(tokio::spawn(async move { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; - let processed = client_clone.process_actions(actions_to_process); + let processed = telemetry_mutex_clone + .lock_or_panic() + .process_actions(actions_to_process); debug!("Sending Processed Actions :{processed:?}"); - client_clone.client.send_msgs(processed).await.ok(); + worker.send_msgs(processed).await.ok(); })); } if !composer_paths_to_process.is_empty() { - let client_clone = telemetry.clone(); - let mut handle = telemetry.handle.lock_or_panic(); - let last_handle = handle.take(); - *handle = Some(tokio::spawn(async move { + let worker = telemetry.worker.clone(); + let last_handle = telemetry.handle.take(); + telemetry.handle = Some(tokio::spawn(async move { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; @@ -487,7 +489,7 @@ impl SidecarInterface for SidecarServer { TelemetryCachedClient::process_composer_paths(composer_paths_to_process) .await; debug!("Sending Composer Paths :{composer_actions:?}"); - client_clone.client.send_msgs(composer_actions).await.ok(); + worker.send_msgs(composer_actions).await.ok(); })); } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index e9fb422c1..a0525bea0 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -54,20 +54,23 @@ struct ComposerPackages { packages: Vec, } -#[derive(Clone)] +pub struct TelemetryCachedEntry { + last_used: Instant, + pub client: Arc>, +} + pub struct TelemetryCachedClient { - pub client: TelemetryWorkerHandle, - pub shm_writer: Arc>, - pub last_used: Instant, + pub worker: TelemetryWorkerHandle, + pub shm_writer: OneWayShmWriter, pub config_sent: bool, pub buffered_integrations: HashSet, pub buffered_composer_paths: HashSet, - pub telemetry_metrics: Arc>>, - pub handle: Arc>>>, + pub telemetry_metrics: HashMap, + pub handle: Option>, } impl TelemetryCachedClient { - pub fn new( + fn new( service: &str, env: &str, instance_id: &InstanceId, @@ -90,17 +93,16 @@ impl TelemetryCachedClient { info!("spawning telemetry worker {config:?}"); Self { - client: handle.clone(), - shm_writer: Arc::new( + worker: handle.clone(), + shm_writer: { #[allow(clippy::unwrap_used)] - OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap(), - ), - last_used: Instant::now(), + OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap() + }, config_sent: false, buffered_integrations: HashSet::new(), buffered_composer_paths: HashSet::new(), telemetry_metrics: Default::default(), - handle: Arc::new(Mutex::new(None)), + handle: None, } } @@ -116,12 +118,11 @@ impl TelemetryCachedClient { } } - pub fn register_metric(&self, metric: MetricContext) { - let mut metrics = self.telemetry_metrics.lock_or_panic(); - if !metrics.contains_key(&metric.name) { - metrics.insert( + pub fn register_metric(&mut self, metric: MetricContext) { + if !self.telemetry_metrics.contains_key(&metric.name) { + self.telemetry_metrics.insert( metric.name.clone(), - self.client.register_metric_context( + self.worker.register_metric_context( metric.name, metric.tags, metric.metric_type, @@ -137,14 +138,13 @@ impl TelemetryCachedClient { (name, val, tags): (String, f64, Vec), ) -> TelemetryActions { #[allow(clippy::unwrap_used)] - TelemetryActions::AddPoint(( - val, - *self.telemetry_metrics.lock_or_panic().get(&name).unwrap(), - tags, - )) + TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags)) } - pub fn process_actions(&self, sidecar_actions: Vec) -> Vec { + pub fn process_actions( + &mut self, + sidecar_actions: Vec, + ) -> Vec { let mut actions = vec![]; for action in sidecar_actions { match action { @@ -239,13 +239,13 @@ type EnvString = String; type TelemetryCachedClientKey = (ServiceString, EnvString); pub struct TelemetryCachedClientSet { - pub inner: Arc>>, + pub inner: Arc>>, cleanup_handle: Option>, } impl Default for TelemetryCachedClientSet { fn default() -> Self { - let inner: Arc>> = + let inner: Arc>> = Arc::new(Default::default()); let clients = inner.clone(); @@ -289,7 +289,7 @@ impl TelemetryCachedClientSet { instance_id: &InstanceId, runtime_meta: &RuntimeMetadata, get_config: F, - ) -> TelemetryCachedClient + ) -> Arc> where F: FnOnce() -> ddtelemetry::config::Config, { @@ -299,12 +299,10 @@ impl TelemetryCachedClientSet { if let Some(existing) = map.get_mut(&key) { existing.last_used = Instant::now(); - let client = existing.clone(); tokio::spawn({ - let telemetry = client.clone(); + let worker = existing.client.lock_or_panic().worker.clone(); async move { - telemetry - .client + worker .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await .ok(); @@ -312,19 +310,26 @@ impl TelemetryCachedClientSet { }); info!("Reusing existing telemetry client for {key:?}"); - return client; + return existing.client.clone(); } - let client = - TelemetryCachedClient::new(service, env, instance_id, runtime_meta, get_config); + let entry = TelemetryCachedEntry { + last_used: Instant::now(), + client: Arc::new(Mutex::new(TelemetryCachedClient::new( + service, + env, + instance_id, + runtime_meta, + get_config, + ))), + }; - map.insert(key.clone(), client.clone()); + let entry = map.entry(key.clone()).or_insert(entry); tokio::spawn({ - let telemetry = client.clone(); + let worker = entry.client.lock_or_panic().worker.clone(); async move { - telemetry - .client + worker .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await .ok(); @@ -333,7 +338,7 @@ impl TelemetryCachedClientSet { info!("Created new telemetry client for {key:?}"); - client + entry.client.clone() } pub fn remove_telemetry_client(&self, service: &str, env: &str) { @@ -391,10 +396,11 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) { &actions.service_name, &actions.env_name, ); + let client = telemetry_client.lock_or_panic().worker.clone(); for action in actions.actions { let action_str = format!("{action:?}"); - match telemetry_client.client.send_msg(action).await { + match client.send_msg(action).await { Ok(_) => { debug!("Sent telemetry action to TelemetryWorker: {action_str}"); } @@ -412,7 +418,7 @@ fn get_telemetry_client( instance_id: &InstanceId, service_name: &str, env_name: &str, -) -> TelemetryCachedClient { +) -> Arc> { let session = sidecar.get_session(&instance_id.session_id); let trace_config = session.get_trace_config(); let runtime_meta = RuntimeMetadata::new(