Skip to content

Commit fdab508

Browse files
authored
Fix in-process submission of telemetry (#1172)
1 parent 58e24af commit fdab508

File tree

3 files changed

+74
-109
lines changed

3 files changed

+74
-109
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,8 @@ impl<'a> TryInto<SerializedTracerHeaderTags> for &'a TracerHeaderTags<'a> {
660660
pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_log(
661661
session_id_ffi: CharSlice,
662662
runtime_id_ffi: CharSlice,
663-
queue_id: u64,
663+
service_name_ffi: CharSlice,
664+
env_name_ffi: CharSlice,
664665
identifier_ffi: CharSlice,
665666
level: ddtelemetry::data::LogLevel,
666667
message_ffi: CharSlice,
@@ -671,7 +672,8 @@ pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_log(
671672
try_c!(ddog_sidecar_enqueue_telemetry_log_impl(
672673
session_id_ffi,
673674
runtime_id_ffi,
674-
queue_id,
675+
service_name_ffi,
676+
env_name_ffi,
675677
identifier_ffi,
676678
level,
677679
message_ffi,
@@ -695,7 +697,8 @@ fn char_slice_to_string(slice: CharSlice) -> Result<String, String> {
695697
fn ddog_sidecar_enqueue_telemetry_log_impl(
696698
session_id_ffi: CharSlice,
697699
runtime_id_ffi: CharSlice,
698-
queue_id: u64,
700+
service_name_ffi: CharSlice,
701+
env_name_ffi: CharSlice,
699702
identifier_ffi: CharSlice,
700703
level: ddtelemetry::data::LogLevel,
701704
message_ffi: CharSlice,
@@ -705,7 +708,8 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
705708
) -> Result<(), String> {
706709
if session_id_ffi.is_empty()
707710
|| runtime_id_ffi.is_empty()
708-
|| queue_id == 0
711+
|| service_name_ffi.is_empty()
712+
|| env_name_ffi.is_empty()
709713
|| identifier_ffi.is_empty()
710714
|| message_ffi.is_empty()
711715
{
@@ -723,7 +727,8 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
723727
char_slice_to_string(session_id_ffi)?,
724728
char_slice_to_string(runtime_id_ffi)?,
725729
);
726-
let queue_id: QueueId = queue_id.into();
730+
let service_name: String = char_slice_to_string(service_name_ffi)?;
731+
let env_name: String = char_slice_to_string(env_name_ffi)?;
727732
let identifier: String = char_slice_to_string(identifier_ffi)?;
728733
let message: String = char_slice_to_string(message_ffi)?;
729734

@@ -753,7 +758,8 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
753758

754759
let msg = InternalTelemetryActions {
755760
instance_id,
756-
queue_id,
761+
service_name,
762+
env_name,
757763
actions: vec![log_action],
758764
};
759765

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 1 addition & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub struct SidecarServer {
9999
/// A `Mutex` guarded `HashMap` that keeps a count of each session.
100100
session_counter: Arc<Mutex<HashMap<String, u32>>>,
101101
/// A `Mutex` guarded `HashMap` that stores the active telemetry clients.
102-
telemetry_clients: TelemetryCachedClientSet,
102+
pub(crate) telemetry_clients: TelemetryCachedClientSet,
103103
/// A `Mutex` guarded optional `ManualFutureCompleter` for telemetry configuration.
104104
pub self_telemetry_config:
105105
Arc<Mutex<Option<ManualFutureCompleter<ddtelemetry::config::Config>>>>,
@@ -359,98 +359,6 @@ impl SidecarServer {
359359
}
360360
}
361361

362-
pub async fn process_telemetry_action(
363-
&self,
364-
instance_id: &InstanceId,
365-
queue_id: &QueueId,
366-
actions: Vec<TelemetryActions>,
367-
) -> anyhow::Result<()> {
368-
tracing::debug!(
369-
"Processing telemetry action for target {:?}/{:?}: {:?}",
370-
instance_id,
371-
queue_id,
372-
actions
373-
);
374-
let session = self.get_session(&instance_id.session_id);
375-
let trace_config = session.get_trace_config();
376-
let runtime_metadata = RuntimeMetadata::new(
377-
trace_config.language.clone(),
378-
trace_config.language_version.clone(),
379-
trace_config.tracer_version.clone(),
380-
);
381-
382-
let rt_info = self.get_runtime(instance_id);
383-
let mut applications = rt_info.lock_applications();
384-
385-
match applications.entry(*queue_id) {
386-
Entry::Occupied(mut entry) => {
387-
let value = entry.get_mut();
388-
389-
let env = value.env.as_deref().unwrap_or("none");
390-
let service = value.service_name.as_deref().unwrap_or("unknown-service");
391-
392-
let mut telemetry = match self.telemetry_clients.get_or_create(
393-
service,
394-
env,
395-
instance_id,
396-
&runtime_metadata,
397-
|| {
398-
self.get_session(&instance_id.session_id)
399-
.session_config
400-
.lock_or_panic()
401-
.clone()
402-
},
403-
) {
404-
Some(client) => client,
405-
None => return Ok(()),
406-
};
407-
408-
let mut actions_to_send = vec![];
409-
let mut buffered_info_changed = false;
410-
411-
for action in actions {
412-
match action {
413-
TelemetryActions::AddIntegration(ref integration) => {
414-
if telemetry.buffered_integrations.insert(integration.clone()) {
415-
actions_to_send.push(action);
416-
buffered_info_changed = true;
417-
}
418-
}
419-
_ => {
420-
actions_to_send.push(action);
421-
}
422-
}
423-
}
424-
425-
let client_clone = telemetry.clone();
426-
let mut handle = telemetry.handle.lock_or_panic();
427-
let last_handle = handle.take();
428-
*handle = Some(tokio::spawn(async move {
429-
if let Some(last_handle) = last_handle {
430-
last_handle.await.ok();
431-
};
432-
debug!("Sending Telemetry Actions: {actions_to_send:?}");
433-
client_clone.client.send_msgs(actions_to_send).await.ok();
434-
}));
435-
436-
if buffered_info_changed {
437-
info!(
438-
"Buffered telemetry info changed for instance {instance_id:?} and queue_id {queue_id:?}"
439-
);
440-
telemetry.write_shm_file();
441-
}
442-
}
443-
444-
Entry::Vacant(_) => {
445-
info!(
446-
"No application found for instance {instance_id:?} and queue_id {queue_id:?}"
447-
);
448-
}
449-
}
450-
451-
Ok(())
452-
}
453-
454362
pub fn shutdown(&self) {
455363
self.remote_configs.shutdown();
456364
}

datadog-sidecar/src/service/telemetry.rs

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use crate::service::{InstanceId, QueueId, RuntimeMetadata, SidecarAction, SidecarServer};
4+
use crate::service::{InstanceId, RuntimeMetadata, SidecarAction, SidecarServer};
55
use anyhow::{anyhow, Result};
66
use ddcommon::MutexExt;
77
use std::sync::OnceLock;
88
use tokio::sync::mpsc;
9-
use tracing::{info, warn};
9+
use tracing::{debug, info, warn};
1010

1111
use crate::one_way_shared_memory::OneWayShmWriter;
1212
use crate::primary_sidecar_identifier;
@@ -356,7 +356,8 @@ pub fn path_for_telemetry(service: &str, env: &str) -> CString {
356356
#[derive(Debug)]
357357
pub struct InternalTelemetryActions {
358358
pub instance_id: InstanceId,
359-
pub queue_id: QueueId,
359+
pub service_name: String,
360+
pub env_name: String,
360361
pub actions: Vec<TelemetryActions>,
361362
}
362363

@@ -377,16 +378,66 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) {
377378
return;
378379
}
379380

380-
while let Some(msg) = rx.recv().await {
381-
if let Err(e) = sidecar
382-
.process_telemetry_action(&msg.instance_id, &msg.queue_id, msg.actions)
383-
.await
384-
{
381+
while let Some(actions) = rx.recv().await {
382+
let telemetry_client = get_telemetry_client(
383+
&sidecar,
384+
&actions.instance_id,
385+
&actions.service_name,
386+
&actions.env_name,
387+
);
388+
389+
if let Some(telemetry_client) = telemetry_client {
390+
for action in actions.actions {
391+
let action_str = format!("{action:?}");
392+
match telemetry_client.client.send_msg(action).await {
393+
Ok(_) => {
394+
debug!("Sent telemetry action to TelemetryWorker: {action_str}");
395+
}
396+
Err(e) => {
397+
warn!(
398+
"Failed to send telemetry action {action_str} to TelemetryWorker: {e}"
399+
);
400+
}
401+
}
402+
}
403+
} else {
385404
warn!(
386-
"Could not process telemetry action for target {:?}/{:?}: {}. Action dropped.",
387-
msg.instance_id, msg.queue_id, e
405+
"Could not get or client telemetry client for target {:?}, service {:?}, env {:?}. Action dropped.",
406+
actions.instance_id, actions.service_name, actions.env_name
388407
);
389408
}
390409
}
391410
info!("Telemetry action receiver task shutting down.");
392411
}
412+
413+
fn get_telemetry_client(
414+
sidecar: &SidecarServer,
415+
instance_id: &InstanceId,
416+
service_name: &str,
417+
env_name: &str,
418+
) -> Option<TelemetryCachedClient> {
419+
let session = sidecar.get_session(&instance_id.session_id);
420+
let trace_config = session.get_trace_config();
421+
let runtime_meta = RuntimeMetadata::new(
422+
trace_config.language.as_str(),
423+
trace_config.language_version.as_str(),
424+
trace_config.tracer_version.as_str(),
425+
);
426+
427+
let get_config = || {
428+
sidecar
429+
.get_session(&instance_id.session_id)
430+
.session_config
431+
.lock_or_panic()
432+
.clone()
433+
};
434+
435+
TelemetryCachedClientSet::get_or_create(
436+
&sidecar.telemetry_clients,
437+
service_name,
438+
env_name,
439+
instance_id,
440+
&runtime_meta,
441+
get_config,
442+
)
443+
}

0 commit comments

Comments
 (0)