Skip to content

Commit ddd9633

Browse files
committed
Fix cloning too much for telemetry workers
Turns out telemetry workers were cloning their whole contents, so when the HashSets and HashMaps were updated, the changes weren't persisted. Signed-off-by: Bob Weinand <[email protected]>
1 parent 52bd9a9 commit ddd9633

File tree

2 files changed

+63
-55
lines changed

2 files changed

+63
-55
lines changed

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,12 @@ impl SidecarServer {
297297

298298
let futures = clients
299299
.values()
300-
.filter_map(|client| client.client.stats().ok())
300+
.filter_map(|client| client.client.lock_or_panic().worker.stats().ok())
301301
.collect::<Vec<_>>();
302302

303303
let metric_counts = clients
304304
.values()
305-
.map(|client| client.telemetry_metrics.lock_or_panic().len() as u32)
305+
.map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32)
306306
.collect::<Vec<_>>();
307307

308308
(futures, metric_counts)
@@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer {
403403
let env = entry.get().env.as_deref().unwrap_or("none");
404404

405405
// Lock telemetry client
406-
let mut telemetry = self.telemetry_clients.get_or_create(
406+
let telemetry_mutex = self.telemetry_clients.get_or_create(
407407
service,
408408
env,
409409
&instance_id,
@@ -420,6 +420,7 @@ impl SidecarInterface for SidecarServer {
420420
})
421421
},
422422
);
423+
let mut telemetry = telemetry_mutex.lock_or_panic();
423424

424425
let mut actions_to_process = vec![];
425426
let mut composer_paths_to_process = vec![];
@@ -462,32 +463,33 @@ impl SidecarInterface for SidecarServer {
462463
}
463464

464465
if !actions_to_process.is_empty() {
465-
let client_clone = telemetry.clone();
466-
let mut handle = telemetry.handle.lock_or_panic();
467-
let last_handle = handle.take();
468-
*handle = Some(tokio::spawn(async move {
466+
let telemetry_mutex_clone = telemetry_mutex.clone();
467+
let worker = telemetry.worker.clone();
468+
let last_handle = telemetry.handle.take();
469+
telemetry.handle = Some(tokio::spawn(async move {
469470
if let Some(last_handle) = last_handle {
470471
last_handle.await.ok();
471472
};
472-
let processed = client_clone.process_actions(actions_to_process);
473+
let processed = telemetry_mutex_clone
474+
.lock_or_panic()
475+
.process_actions(actions_to_process);
473476
debug!("Sending Processed Actions :{processed:?}");
474-
client_clone.client.send_msgs(processed).await.ok();
477+
worker.send_msgs(processed).await.ok();
475478
}));
476479
}
477480

478481
if !composer_paths_to_process.is_empty() {
479-
let client_clone = telemetry.clone();
480-
let mut handle = telemetry.handle.lock_or_panic();
481-
let last_handle = handle.take();
482-
*handle = Some(tokio::spawn(async move {
482+
let worker = telemetry.worker.clone();
483+
let last_handle = telemetry.handle.take();
484+
telemetry.handle = Some(tokio::spawn(async move {
483485
if let Some(last_handle) = last_handle {
484486
last_handle.await.ok();
485487
};
486488
let composer_actions =
487489
TelemetryCachedClient::process_composer_paths(composer_paths_to_process)
488490
.await;
489491
debug!("Sending Composer Paths :{composer_actions:?}");
490-
client_clone.client.send_msgs(composer_actions).await.ok();
492+
worker.send_msgs(composer_actions).await.ok();
491493
}));
492494
}
493495

datadog-sidecar/src/service/telemetry.rs

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,23 @@ struct ComposerPackages {
5454
packages: Vec<data::Dependency>,
5555
}
5656

57-
#[derive(Clone)]
57+
pub struct TelemetryCachedEntry {
58+
last_used: Instant,
59+
pub client: Arc<Mutex<TelemetryCachedClient>>,
60+
}
61+
5862
pub struct TelemetryCachedClient {
59-
pub client: TelemetryWorkerHandle,
60-
pub shm_writer: Arc<OneWayShmWriter<NamedShmHandle>>,
61-
pub last_used: Instant,
63+
pub worker: TelemetryWorkerHandle,
64+
pub shm_writer: OneWayShmWriter<NamedShmHandle>,
6265
pub config_sent: bool,
6366
pub buffered_integrations: HashSet<Integration>,
6467
pub buffered_composer_paths: HashSet<PathBuf>,
65-
pub telemetry_metrics: Arc<Mutex<HashMap<String, ContextKey>>>,
66-
pub handle: Arc<Mutex<Option<JoinHandle<()>>>>,
68+
pub telemetry_metrics: HashMap<String, ContextKey>,
69+
pub handle: Option<JoinHandle<()>>,
6770
}
6871

6972
impl TelemetryCachedClient {
70-
pub fn new(
73+
fn new(
7174
service: &str,
7275
env: &str,
7376
instance_id: &InstanceId,
@@ -90,17 +93,16 @@ impl TelemetryCachedClient {
9093

9194
info!("spawning telemetry worker {config:?}");
9295
Self {
93-
client: handle.clone(),
94-
shm_writer: Arc::new(
96+
worker: handle.clone(),
97+
shm_writer: {
9598
#[allow(clippy::unwrap_used)]
96-
OneWayShmWriter::<NamedShmHandle>::new(path_for_telemetry(service, env)).unwrap(),
97-
),
98-
last_used: Instant::now(),
99+
OneWayShmWriter::<NamedShmHandle>::new(path_for_telemetry(service, env)).unwrap()
100+
},
99101
config_sent: false,
100102
buffered_integrations: HashSet::new(),
101103
buffered_composer_paths: HashSet::new(),
102104
telemetry_metrics: Default::default(),
103-
handle: Arc::new(Mutex::new(None)),
105+
handle: None,
104106
}
105107
}
106108

@@ -116,12 +118,11 @@ impl TelemetryCachedClient {
116118
}
117119
}
118120

119-
pub fn register_metric(&self, metric: MetricContext) {
120-
let mut metrics = self.telemetry_metrics.lock_or_panic();
121-
if !metrics.contains_key(&metric.name) {
122-
metrics.insert(
121+
pub fn register_metric(&mut self, metric: MetricContext) {
122+
if !self.telemetry_metrics.contains_key(&metric.name) {
123+
self.telemetry_metrics.insert(
123124
metric.name.clone(),
124-
self.client.register_metric_context(
125+
self.worker.register_metric_context(
125126
metric.name,
126127
metric.tags,
127128
metric.metric_type,
@@ -137,14 +138,13 @@ impl TelemetryCachedClient {
137138
(name, val, tags): (String, f64, Vec<Tag>),
138139
) -> TelemetryActions {
139140
#[allow(clippy::unwrap_used)]
140-
TelemetryActions::AddPoint((
141-
val,
142-
*self.telemetry_metrics.lock_or_panic().get(&name).unwrap(),
143-
tags,
144-
))
141+
TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags))
145142
}
146143

147-
pub fn process_actions(&self, sidecar_actions: Vec<SidecarAction>) -> Vec<TelemetryActions> {
144+
pub fn process_actions(
145+
&mut self,
146+
sidecar_actions: Vec<SidecarAction>,
147+
) -> Vec<TelemetryActions> {
148148
let mut actions = vec![];
149149
for action in sidecar_actions {
150150
match action {
@@ -239,13 +239,13 @@ type EnvString = String;
239239
type TelemetryCachedClientKey = (ServiceString, EnvString);
240240

241241
pub struct TelemetryCachedClientSet {
242-
pub inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedClient>>>,
242+
pub inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedEntry>>>,
243243
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
244244
}
245245

246246
impl Default for TelemetryCachedClientSet {
247247
fn default() -> Self {
248-
let inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedClient>>> =
248+
let inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedEntry>>> =
249249
Arc::new(Default::default());
250250
let clients = inner.clone();
251251

@@ -289,7 +289,7 @@ impl TelemetryCachedClientSet {
289289
instance_id: &InstanceId,
290290
runtime_meta: &RuntimeMetadata,
291291
get_config: F,
292-
) -> TelemetryCachedClient
292+
) -> Arc<Mutex<TelemetryCachedClient>>
293293
where
294294
F: FnOnce() -> ddtelemetry::config::Config,
295295
{
@@ -299,32 +299,37 @@ impl TelemetryCachedClientSet {
299299

300300
if let Some(existing) = map.get_mut(&key) {
301301
existing.last_used = Instant::now();
302-
let client = existing.clone();
303302
tokio::spawn({
304-
let telemetry = client.clone();
303+
let worker = existing.client.lock_or_panic().worker.clone();
305304
async move {
306-
telemetry
307-
.client
305+
worker
308306
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
309307
.await
310308
.ok();
311309
}
312310
});
313311

314312
info!("Reusing existing telemetry client for {key:?}");
315-
return client;
313+
return existing.client.clone();
316314
}
317315

318-
let client =
319-
TelemetryCachedClient::new(service, env, instance_id, runtime_meta, get_config);
316+
let entry = TelemetryCachedEntry {
317+
last_used: Instant::now(),
318+
client: Arc::new(Mutex::new(TelemetryCachedClient::new(
319+
service,
320+
env,
321+
instance_id,
322+
runtime_meta,
323+
get_config,
324+
))),
325+
};
320326

321-
map.insert(key.clone(), client.clone());
327+
let entry = map.entry(key.clone()).or_insert(entry);
322328

323329
tokio::spawn({
324-
let telemetry = client.clone();
330+
let worker = entry.client.lock_or_panic().worker.clone();
325331
async move {
326-
telemetry
327-
.client
332+
worker
328333
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
329334
.await
330335
.ok();
@@ -333,7 +338,7 @@ impl TelemetryCachedClientSet {
333338

334339
info!("Created new telemetry client for {key:?}");
335340

336-
client
341+
entry.client.clone()
337342
}
338343

339344
pub fn remove_telemetry_client(&self, service: &str, env: &str) {
@@ -391,10 +396,11 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) {
391396
&actions.service_name,
392397
&actions.env_name,
393398
);
399+
let client = telemetry_client.lock_or_panic().worker.clone();
394400

395401
for action in actions.actions {
396402
let action_str = format!("{action:?}");
397-
match telemetry_client.client.send_msg(action).await {
403+
match client.send_msg(action).await {
398404
Ok(_) => {
399405
debug!("Sent telemetry action to TelemetryWorker: {action_str}");
400406
}
@@ -412,7 +418,7 @@ fn get_telemetry_client(
412418
instance_id: &InstanceId,
413419
service_name: &str,
414420
env_name: &str,
415-
) -> TelemetryCachedClient {
421+
) -> Arc<Mutex<TelemetryCachedClient>> {
416422
let session = sidecar.get_session(&instance_id.session_id);
417423
let trace_config = session.get_trace_config();
418424
let runtime_meta = RuntimeMetadata::new(

0 commit comments

Comments
 (0)