Skip to content

Commit b91ee92

Browse files
bwoebiestringana
authored andcommitted
Fix cloning too much for telemetry workers
Turns out telemetry workers were cloning their whole contents, so when the HashSets and HashMaps were updated, the changes weren't persisted. Signed-off-by: Bob Weinand <[email protected]>
1 parent 71f43e1 commit b91ee92

File tree

2 files changed

+63
-55
lines changed

2 files changed

+63
-55
lines changed

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,12 @@ impl SidecarServer {
297297

298298
let futures = clients
299299
.values()
300-
.filter_map(|client| client.client.stats().ok())
300+
.filter_map(|client| client.client.lock_or_panic().worker.stats().ok())
301301
.collect::<Vec<_>>();
302302

303303
let metric_counts = clients
304304
.values()
305-
.map(|client| client.telemetry_metrics.lock_or_panic().len() as u32)
305+
.map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32)
306306
.collect::<Vec<_>>();
307307

308308
(futures, metric_counts)
@@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer {
403403
let env = entry.get().env.as_deref().unwrap_or("none");
404404

405405
// Lock telemetry client
406-
let mut telemetry = self.telemetry_clients.get_or_create(
406+
let telemetry_mutex = self.telemetry_clients.get_or_create(
407407
service,
408408
env,
409409
&instance_id,
@@ -420,6 +420,7 @@ impl SidecarInterface for SidecarServer {
420420
})
421421
},
422422
);
423+
let mut telemetry = telemetry_mutex.lock_or_panic();
423424

424425
let mut actions_to_process: Vec<SidecarAction> = vec![];
425426
let mut composer_paths_to_process = vec![];
@@ -467,32 +468,33 @@ impl SidecarInterface for SidecarServer {
467468
}
468469

469470
if !actions_to_process.is_empty() {
470-
let client_clone = telemetry.clone();
471-
let mut handle = telemetry.handle.lock_or_panic();
472-
let last_handle = handle.take();
473-
*handle = Some(tokio::spawn(async move {
471+
let telemetry_mutex_clone = telemetry_mutex.clone();
472+
let worker = telemetry.worker.clone();
473+
let last_handle = telemetry.handle.take();
474+
telemetry.handle = Some(tokio::spawn(async move {
474475
if let Some(last_handle) = last_handle {
475476
last_handle.await.ok();
476477
};
477-
let processed = client_clone.process_actions(actions_to_process);
478+
let processed = telemetry_mutex_clone
479+
.lock_or_panic()
480+
.process_actions(actions_to_process);
478481
debug!("Sending Processed Actions :{processed:?}");
479-
client_clone.client.send_msgs(processed).await.ok();
482+
worker.send_msgs(processed).await.ok();
480483
}));
481484
}
482485

483486
if !composer_paths_to_process.is_empty() {
484-
let client_clone = telemetry.clone();
485-
let mut handle = telemetry.handle.lock_or_panic();
486-
let last_handle = handle.take();
487-
*handle = Some(tokio::spawn(async move {
487+
let worker = telemetry.worker.clone();
488+
let last_handle = telemetry.handle.take();
489+
telemetry.handle = Some(tokio::spawn(async move {
488490
if let Some(last_handle) = last_handle {
489491
last_handle.await.ok();
490492
};
491493
let composer_actions =
492494
TelemetryCachedClient::process_composer_paths(composer_paths_to_process)
493495
.await;
494496
debug!("Sending Composer Paths :{composer_actions:?}");
495-
client_clone.client.send_msgs(composer_actions).await.ok();
497+
worker.send_msgs(composer_actions).await.ok();
496498
}));
497499
}
498500

datadog-sidecar/src/service/telemetry.rs

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,24 @@ struct ComposerPackages {
5454
packages: Vec<data::Dependency>,
5555
}
5656

57-
#[derive(Clone)]
57+
pub struct TelemetryCachedEntry {
58+
last_used: Instant,
59+
pub client: Arc<Mutex<TelemetryCachedClient>>,
60+
}
61+
5862
pub struct TelemetryCachedClient {
59-
pub client: TelemetryWorkerHandle,
60-
pub shm_writer: Arc<OneWayShmWriter<NamedShmHandle>>,
61-
pub last_used: Instant,
63+
pub worker: TelemetryWorkerHandle,
64+
pub shm_writer: OneWayShmWriter<NamedShmHandle>,
6265
pub config_sent: bool,
6366
pub buffered_integrations: HashSet<Integration>,
6467
pub buffered_composer_paths: HashSet<PathBuf>,
6568
pub last_endpoints_push: SystemTime,
66-
pub telemetry_metrics: Arc<Mutex<HashMap<String, ContextKey>>>,
67-
pub handle: Arc<Mutex<Option<JoinHandle<()>>>>,
69+
pub telemetry_metrics: HashMap<String, ContextKey>,
70+
pub handle: Option<JoinHandle<()>>,
6871
}
6972

7073
impl TelemetryCachedClient {
71-
pub fn new(
74+
fn new(
7275
service: &str,
7376
env: &str,
7477
instance_id: &InstanceId,
@@ -91,18 +94,17 @@ impl TelemetryCachedClient {
9194

9295
info!("spawning telemetry worker {config:?}");
9396
Self {
94-
client: handle.clone(),
95-
shm_writer: Arc::new(
97+
worker: handle.clone(),
98+
shm_writer: {
9699
#[allow(clippy::unwrap_used)]
97-
OneWayShmWriter::<NamedShmHandle>::new(path_for_telemetry(service, env)).unwrap(),
98-
),
99-
last_used: Instant::now(),
100+
OneWayShmWriter::<NamedShmHandle>::new(path_for_telemetry(service, env)).unwrap()
101+
},
100102
config_sent: false,
101103
buffered_integrations: HashSet::new(),
102104
buffered_composer_paths: HashSet::new(),
103105
last_endpoints_push: SystemTime::UNIX_EPOCH,
104106
telemetry_metrics: Default::default(),
105-
handle: Arc::new(Mutex::new(None)),
107+
handle: None,
106108
}
107109
}
108110

@@ -119,12 +121,11 @@ impl TelemetryCachedClient {
119121
}
120122
}
121123

122-
pub fn register_metric(&self, metric: MetricContext) {
123-
let mut metrics = self.telemetry_metrics.lock_or_panic();
124-
if !metrics.contains_key(&metric.name) {
125-
metrics.insert(
124+
pub fn register_metric(&mut self, metric: MetricContext) {
125+
if !self.telemetry_metrics.contains_key(&metric.name) {
126+
self.telemetry_metrics.insert(
126127
metric.name.clone(),
127-
self.client.register_metric_context(
128+
self.worker.register_metric_context(
128129
metric.name,
129130
metric.tags,
130131
metric.metric_type,
@@ -140,14 +141,13 @@ impl TelemetryCachedClient {
140141
(name, val, tags): (String, f64, Vec<Tag>),
141142
) -> TelemetryActions {
142143
#[allow(clippy::unwrap_used)]
143-
TelemetryActions::AddPoint((
144-
val,
145-
*self.telemetry_metrics.lock_or_panic().get(&name).unwrap(),
146-
tags,
147-
))
144+
TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags))
148145
}
149146

150-
pub fn process_actions(&self, sidecar_actions: Vec<SidecarAction>) -> Vec<TelemetryActions> {
147+
pub fn process_actions(
148+
&mut self,
149+
sidecar_actions: Vec<SidecarAction>,
150+
) -> Vec<TelemetryActions> {
151151
let mut actions = vec![];
152152
for action in sidecar_actions {
153153
match action {
@@ -236,13 +236,13 @@ type EnvString = String;
236236
type TelemetryCachedClientKey = (ServiceString, EnvString);
237237

238238
pub struct TelemetryCachedClientSet {
239-
pub inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedClient>>>,
239+
pub inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedEntry>>>,
240240
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
241241
}
242242

243243
impl Default for TelemetryCachedClientSet {
244244
fn default() -> Self {
245-
let inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedClient>>> =
245+
let inner: Arc<Mutex<HashMap<TelemetryCachedClientKey, TelemetryCachedEntry>>> =
246246
Arc::new(Default::default());
247247
let clients = inner.clone();
248248

@@ -286,7 +286,7 @@ impl TelemetryCachedClientSet {
286286
instance_id: &InstanceId,
287287
runtime_meta: &RuntimeMetadata,
288288
get_config: F,
289-
) -> TelemetryCachedClient
289+
) -> Arc<Mutex<TelemetryCachedClient>>
290290
where
291291
F: FnOnce() -> ddtelemetry::config::Config,
292292
{
@@ -296,32 +296,37 @@ impl TelemetryCachedClientSet {
296296

297297
if let Some(existing) = map.get_mut(&key) {
298298
existing.last_used = Instant::now();
299-
let client = existing.clone();
300299
tokio::spawn({
301-
let telemetry = client.clone();
300+
let worker = existing.client.lock_or_panic().worker.clone();
302301
async move {
303-
telemetry
304-
.client
302+
worker
305303
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
306304
.await
307305
.ok();
308306
}
309307
});
310308

311309
info!("Reusing existing telemetry client for {key:?}");
312-
return client;
310+
return existing.client.clone();
313311
}
314312

315-
let client =
316-
TelemetryCachedClient::new(service, env, instance_id, runtime_meta, get_config);
313+
let entry = TelemetryCachedEntry {
314+
last_used: Instant::now(),
315+
client: Arc::new(Mutex::new(TelemetryCachedClient::new(
316+
service,
317+
env,
318+
instance_id,
319+
runtime_meta,
320+
get_config,
321+
))),
322+
};
317323

318-
map.insert(key.clone(), client.clone());
324+
let entry = map.entry(key.clone()).or_insert(entry);
319325

320326
tokio::spawn({
321-
let telemetry = client.clone();
327+
let worker = entry.client.lock_or_panic().worker.clone();
322328
async move {
323-
telemetry
324-
.client
329+
worker
325330
.send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start))
326331
.await
327332
.ok();
@@ -330,7 +335,7 @@ impl TelemetryCachedClientSet {
330335

331336
info!("Created new telemetry client for {key:?}");
332337

333-
client
338+
entry.client.clone()
334339
}
335340

336341
pub fn remove_telemetry_client(&self, service: &str, env: &str) {
@@ -388,10 +393,11 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) {
388393
&actions.service_name,
389394
&actions.env_name,
390395
);
396+
let client = telemetry_client.lock_or_panic().worker.clone();
391397

392398
for action in actions.actions {
393399
let action_str = format!("{action:?}");
394-
match telemetry_client.client.send_msg(action).await {
400+
match client.send_msg(action).await {
395401
Ok(_) => {
396402
debug!("Sent telemetry action to TelemetryWorker: {action_str}");
397403
}
@@ -409,7 +415,7 @@ fn get_telemetry_client(
409415
instance_id: &InstanceId,
410416
service_name: &str,
411417
env_name: &str,
412-
) -> TelemetryCachedClient {
418+
) -> Arc<Mutex<TelemetryCachedClient>> {
413419
let session = sidecar.get_session(&instance_id.session_id);
414420
let trace_config = session.get_trace_config();
415421
let runtime_meta = RuntimeMetadata::new(

0 commit comments

Comments
 (0)