Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions datadog-sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,12 @@ impl SidecarServer {

let futures = clients
.values()
.filter_map(|client| client.client.stats().ok())
.filter_map(|client| client.client.lock_or_panic().worker.stats().ok())
.collect::<Vec<_>>();

let metric_counts = clients
.values()
.map(|client| client.telemetry_metrics.lock_or_panic().len() as u32)
.map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32)
.collect::<Vec<_>>();

(futures, metric_counts)
Expand Down Expand Up @@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer {
let env = entry.get().env.as_deref().unwrap_or("none");

// Lock telemetry client
let mut telemetry = self.telemetry_clients.get_or_create(
let telemetry_mutex = self.telemetry_clients.get_or_create(
service,
env,
&instance_id,
Expand All @@ -420,6 +420,7 @@ impl SidecarInterface for SidecarServer {
})
},
);
let mut telemetry = telemetry_mutex.lock_or_panic();

let mut actions_to_process = vec![];
let mut composer_paths_to_process = vec![];
Expand Down Expand Up @@ -462,32 +463,33 @@ impl SidecarInterface for SidecarServer {
}

if !actions_to_process.is_empty() {
let client_clone = telemetry.clone();
let mut handle = telemetry.handle.lock_or_panic();
let last_handle = handle.take();
*handle = Some(tokio::spawn(async move {
let telemetry_mutex_clone = telemetry_mutex.clone();
let worker = telemetry.worker.clone();
let last_handle = telemetry.handle.take();
telemetry.handle = Some(tokio::spawn(async move {
if let Some(last_handle) = last_handle {
last_handle.await.ok();
};
let processed = client_clone.process_actions(actions_to_process);
let processed = telemetry_mutex_clone
.lock_or_panic()
.process_actions(actions_to_process);
debug!("Sending Processed Actions :{processed:?}");
client_clone.client.send_msgs(processed).await.ok();
worker.send_msgs(processed).await.ok();
}));
}

if !composer_paths_to_process.is_empty() {
let client_clone = telemetry.clone();
let mut handle = telemetry.handle.lock_or_panic();
let last_handle = handle.take();
*handle = Some(tokio::spawn(async move {
let worker = telemetry.worker.clone();
let last_handle = telemetry.handle.take();
telemetry.handle = Some(tokio::spawn(async move {
if let Some(last_handle) = last_handle {
last_handle.await.ok();
};
let composer_actions =
TelemetryCachedClient::process_composer_paths(composer_paths_to_process)
.await;
debug!("Sending Composer Paths :{composer_actions:?}");
client_clone.client.send_msgs(composer_actions).await.ok();
worker.send_msgs(composer_actions).await.ok();
}));
}

Expand Down
88 changes: 47 additions & 41 deletions datadog-sidecar/src/service/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,23 @@ struct ComposerPackages {
packages: Vec<data::Dependency>,
}

#[derive(Clone)]
pub struct TelemetryCachedEntry {
last_used: Instant,
pub client: Arc<Mutex<TelemetryCachedClient>>,
}

pub struct TelemetryCachedClient {
pub client: TelemetryWorkerHandle,
pub shm_writer: Arc<OneWayShmWriter<NamedShmHandle>>,
pub last_used: Instant,
pub worker: TelemetryWorkerHandle,
pub shm_writer: OneWayShmWriter<NamedShmHandle>,
pub config_sent: bool,
pub buffered_integrations: HashSet<Integration>,
pub buffered_composer_paths: HashSet<PathBuf>,
pub telemetry_metrics: Arc<Mutex<HashMap<String, ContextKey>>>,
pub handle: Arc<Mutex<Option<JoinHandle<()>>>>,
pub telemetry_metrics: HashMap<String, ContextKey>,
pub handle: Option<JoinHandle<()>>,
}

impl TelemetryCachedClient {
pub fn new(
fn new(
service: &str,
env: &str,
instance_id: &InstanceId,
Expand All @@ -90,17 +93,16 @@ impl TelemetryCachedClient {

info!("spawning telemetry worker {config:?}");
Self {
client: handle.clone(),
shm_writer: Arc::new(
worker: handle.clone(),
shm_writer: {
#[allow(clippy::unwrap_used)]
OneWayShmWriter::<NamedShmHandle>::new(path_for_telemetry(service, env)).unwrap(),
),
last_used: Instant::now(),
OneWayShmWriter::<NamedShmHandle>::new(path_for_telemetry(service, env)).unwrap()
},
config_sent: false,
buffered_integrations: HashSet::new(),
buffered_composer_paths: HashSet::new(),
telemetry_metrics: Default::default(),
handle: Arc::new(Mutex::new(None)),
handle: None,
}
}

Expand All @@ -116,12 +118,11 @@ impl TelemetryCachedClient {
}
}

pub fn register_metric(&self, metric: MetricContext) {
let mut metrics = self.telemetry_metrics.lock_or_panic();
if !metrics.contains_key(&metric.name) {
metrics.insert(
pub fn register_metric(&mut self, metric: MetricContext) {
if !self.telemetry_metrics.contains_key(&metric.name) {
self.telemetry_metrics.insert(
metric.name.clone(),
self.client.register_metric_context(
self.worker.register_metric_context(
metric.name,
metric.tags,
metric.metric_type,
Expand All @@ -137,14 +138,13 @@ impl TelemetryCachedClient {
(name, val, tags): (String, f64, Vec<Tag>),
) -> TelemetryActions {
#[allow(clippy::unwrap_used)]
TelemetryActions::AddPoint((
val,
*self.telemetry_metrics.lock_or_panic().get(&name).unwrap(),
tags,
))
TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags))
}

pub fn process_actions(&self, sidecar_actions: Vec<SidecarAction>) -> Vec<TelemetryActions> {
pub fn process_actions(
&mut self,
sidecar_actions: Vec<SidecarAction>,
) -> Vec<TelemetryActions> {
let mut actions = vec![];
for action in sidecar_actions {
match action {
Expand Down Expand Up @@ -239,13 +239,13 @@ type EnvString = String;
type TelemetryCachedClientKey = (ServiceString, EnvString);

pub struct TelemetryCachedClientSet {
pub inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedClient>>>,
pub inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedEntry>>>,
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
}

impl Default for TelemetryCachedClientSet {
fn default() -> Self {
let inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedClient>>> =
let inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedEntry>>> =
Arc::new(Default::default());
let clients = inner.clone();

Expand Down Expand Up @@ -289,7 +289,7 @@ impl TelemetryCachedClientSet {
instance_id: &InstanceId,
runtime_meta: &RuntimeMetadata,
get_config: F,
) -> TelemetryCachedClient
) -> Arc<Mutex<TelemetryCachedClient>>
where
F: FnOnce() -> ddtelemetry::config::Config,
{
Expand All @@ -299,32 +299,37 @@ impl TelemetryCachedClientSet {

if let Some(existing) = map.get_mut(&key) {
existing.last_used = Instant::now();
let client = existing.clone();
tokio::spawn({
let telemetry = client.clone();
let worker = existing.client.lock_or_panic().worker.clone();
async move {
telemetry
.client
worker
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await
.ok();
}
});

info!("Reusing existing telemetry client for {key:?}");
return client;
return existing.client.clone();
}

let client =
TelemetryCachedClient::new(service, env, instance_id, runtime_meta, get_config);
let entry = TelemetryCachedEntry {
last_used: Instant::now(),
client: Arc::new(Mutex::new(TelemetryCachedClient::new(
service,
env,
instance_id,
runtime_meta,
get_config,
))),
};

map.insert(key.clone(), client.clone());
let entry = map.entry(key.clone()).or_insert(entry);

tokio::spawn({
let telemetry = client.clone();
let worker = entry.client.lock_or_panic().worker.clone();
async move {
telemetry
.client
worker
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await
.ok();
Expand All @@ -333,7 +338,7 @@ impl TelemetryCachedClientSet {

info!("Created new telemetry client for {key:?}");

client
entry.client.clone()
}

pub fn remove_telemetry_client(&self, service: &str, env: &str) {
Expand Down Expand Up @@ -391,10 +396,11 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) {
&actions.service_name,
&actions.env_name,
);
let client = telemetry_client.lock_or_panic().worker.clone();

for action in actions.actions {
let action_str = format!("{action:?}");
match telemetry_client.client.send_msg(action).await {
match client.send_msg(action).await {
Ok(_) => {
debug!("Sent telemetry action to TelemetryWorker: {action_str}");
}
Expand All @@ -412,7 +418,7 @@ fn get_telemetry_client(
instance_id: &InstanceId,
service_name: &str,
env_name: &str,
) -> TelemetryCachedClient {
) -> Arc<Mutex<TelemetryCachedClient>> {
let session = sidecar.get_session(&instance_id.session_id);
let trace_config = session.get_trace_config();
let runtime_meta = RuntimeMetadata::new(
Expand Down
Loading