From cf32144b913e91e346c3f7489c10f85cabc6a2de Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Wed, 13 Aug 2025 12:40:41 +0200 Subject: [PATCH 01/22] Add new route types --- datadog-sidecar/src/service/mod.rs | 1 + ddtelemetry/src/data/payloads.rs | 61 ++++++++++++++++++++++++++++++ ddtelemetry/src/worker/mod.rs | 29 ++++++++++++-- 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index 16b869db84..2592d91d1f 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -72,4 +72,5 @@ pub enum SidecarAction { AddTelemetryMetricPoint((String, f64, Vec)), PhpComposerTelemetryFile(PathBuf), ClearQueueId, + AddRoute(Route), } diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 3b7d9a9c28..1b7c40178b 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -95,3 +95,64 @@ pub enum LogLevel { Warn, Debug, } + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct EndpointsPayload { + pub is_first: Option, + pub endpoints: Vec, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[serde(rename_all = "UPPERCASE")] +#[repr(C)] +pub enum Method { + Get, + Post, + Put, + Delete, + Patch, + Head, + Options, + Trace, + Connect, + Other, //This is specified as "*" in the OpenAPI spec +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[serde(rename_all = "UPPERCASE")] +#[repr(C)] +pub enum Authentication { + Jwt, + Basic, + Oauth, + Oidc, + ApiKey, + Session, + Mtls, + Saml, + Ldap, + Form, + Other, +} + +#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] +pub struct Endpoint { + #[serde(default)] + pub r#type: Option, + #[serde(default)] + pub method: Option, + #[serde(default)] + pub path: Option, + pub operation_name: String, + pub resource_name: String, + #[serde(default)] + pub request_body_type: Option>, + #[serde(default)] + pub response_body_type: Option>, + #[serde(default)] + pub response_code: Option>, + #[serde(default)] + pub authentication: Option>, + #[serde(default)] + pub metadata: Option, +} diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index 1d33d88356..144a65db67 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -7,7 +7,7 @@ pub mod store; use crate::{ config::Config, - data::{self, Application, Dependency, Host, Integration, Log, Payload, Telemetry}, + data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry}, metrics::{ContextKey, MetricBuckets, MetricContexts}, }; use ddcommon::Endpoint; @@ -79,6 +79,7 @@ pub enum TelemetryActions { AddDependency(Dependency), AddIntegration(Integration), AddLog((LogIdentifier, Log)), + AddEndpoint(Endpoint), Lifecycle(LifecycleAction), #[serde(skip)] CollectStats(oneshot::Sender), @@ -110,6 +111,7 @@ struct TelemetryWorkerData { dependencies: store::Store, configurations: store::Store, integrations: store::Store, + endpoints: store::Store, logs: store::QueueHashMap, metric_contexts: MetricContexts, metric_buckets: MetricBuckets, @@ -181,6 +183,8 @@ pub struct TelemetryWorkerStats { pub configurations_unflushed: u32, pub integrations_stored: u32, pub integrations_unflushed: u32, + pub endpoints_stored: u32, + pub endpoints_unflushed: u32, pub logs: u32, pub metric_contexts: u32, pub metric_buckets: MetricBucketStats, @@ -197,6 +201,8 @@ impl Add for TelemetryWorkerStats { configurations_unflushed: self.configurations_unflushed + rhs.configurations_unflushed, integrations_stored: self.integrations_stored + rhs.integrations_stored, integrations_unflushed: self.integrations_unflushed + rhs.integrations_unflushed, + endpoints_stored: self.endpoints_stored + rhs.endpoints_stored, + endpoints_unflushed: self.endpoints_unflushed + rhs.endpoints_unflushed, logs: self.logs + rhs.logs, metric_contexts: self.metric_contexts + rhs.metric_contexts, metric_buckets: MetricBucketStats { @@ -315,7 +321,7 @@ impl TelemetryWorker { } } } - AddConfig(_) | AddDependency(_) | AddIntegration(_) | Lifecycle(ExtendedHeartbeat) => {} + AddConfig(_) | AddDependency(_) | AddIntegration(_) | AddEndpoint(_) | Lifecycle(ExtendedHeartbeat) => {} Lifecycle(Stop) => { if !self.data.started { return BREAK; @@ -372,6 +378,7 @@ impl TelemetryWorker { AddDependency(dep) => self.data.dependencies.insert(dep), AddIntegration(integration) => self.data.integrations.insert(integration), AddConfig(cfg) => self.data.configurations.insert(cfg), + AddEndpoint(endpoint) => self.data.endpoints.insert(endpoint), AddLog((identifier, log)) => { let (l, new) = self.data.logs.get_mut_or_insert(identifier, log); if !new { @@ -424,6 +431,7 @@ impl TelemetryWorker { self.data.dependencies.unflush_stored(); self.data.integrations.unflush_stored(); self.data.configurations.unflush_stored(); + self.data.endpoints.unflush_stored(); let app_started = data::Payload::AppStarted(self.build_app_started()); match self.send_payload(&app_started).await { @@ -516,6 +524,13 @@ impl TelemetryWorker { }, )) } + if self.data.endpoints.flush_not_empty() { + payloads.push(data::Payload::AppEndpointsChange( + data::AppEndpointsChange { + endpoints: self.data.endpoints.unflushed().cloned().collect(), + }, + )) + } payloads } @@ -618,6 +633,9 @@ impl TelemetryWorker { .data .configurations .removed_flushed(p.configuration.len()), + AppEndpointsChange(p) => { + self.data.endpoints.removed_flushed(p.endpoints.len()) + } MessageBatch(batch) => { for p in batch { self.payload_sent_success(p); @@ -722,6 +740,8 @@ impl TelemetryWorker { configurations_unflushed: self.data.configurations.len_unflushed() as u32, integrations_stored: self.data.integrations.len_stored() as u32, integrations_unflushed: self.data.integrations.len_unflushed() as u32, + endpoints_stored: self.data.endpoints.len_stored() as u32, + endpoints_unflushed: self.data.endpoints.len_unflushed() as u32, logs: self.data.logs.len() as u32, metric_contexts: self.data.metric_contexts.lock().len() as u32, metric_buckets: self.data.metric_buckets.stats(), @@ -910,7 +930,7 @@ impl TelemetryWorkerHandle { } } -/// How many dependencies/integrations/configs we keep in memory at most +/// How many dependencies/integrations/configs/endpoints we keep in memory at most pub const MAX_ITEMS: usize = 5000; #[derive(Debug, Default, Clone, Copy)] @@ -930,6 +950,7 @@ pub struct TelemetryWorkerBuilder { pub dependencies: store::Store, pub integrations: store::Store, pub configurations: store::Store, + pub endpoints: store::Store, pub native_deps: bool, pub rust_shared_lib_deps: bool, pub config: Config, @@ -980,6 +1001,7 @@ impl TelemetryWorkerBuilder { dependencies: store::Store::new(MAX_ITEMS), integrations: store::Store::new(MAX_ITEMS), configurations: store::Store::new(MAX_ITEMS), + endpoints: store::Store::new(MAX_ITEMS), native_deps: true, rust_shared_lib_deps: false, config: Config::default(), @@ -1010,6 +1032,7 @@ impl TelemetryWorkerBuilder { dependencies: self.dependencies, integrations: self.integrations, configurations: self.configurations, + endpoints: self.endpoints, logs: store::QueueHashMap::default(), metric_contexts: contexts.clone(), metric_buckets: MetricBuckets::default(), From c6ec5326ebfea7636430d3c63929cbf09627ac11 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Wed, 13 Aug 2025 17:59:33 +0200 Subject: [PATCH 02/22] Fix errors --- datadog-sidecar/src/service/mod.rs | 2 +- ddtelemetry/src/data/payload.rs | 2 ++ ddtelemetry/src/data/payloads.rs | 5 +++++ ddtelemetry/src/worker/mod.rs | 3 +-- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index 2592d91d1f..7f270220ff 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -72,5 +72,5 @@ pub enum SidecarAction { AddTelemetryMetricPoint((String, f64, Vec)), PhpComposerTelemetryFile(PathBuf), ClearQueueId, - AddRoute(Route), + AddEndpoint(ddtelemetry::data::Endpoint), } diff --git a/ddtelemetry/src/data/payload.rs b/ddtelemetry/src/data/payload.rs index d966008534..db49fa2774 100644 --- a/ddtelemetry/src/data/payload.rs +++ b/ddtelemetry/src/data/payload.rs @@ -12,6 +12,7 @@ pub enum Payload { AppDependenciesLoaded(AppDependenciesLoaded), AppIntegrationsChange(AppIntegrationsChange), AppClientConfigurationChange(AppClientConfigurationChange), + AppEndpointsChange(AppEndpointsChange), AppHeartbeat(#[serde(skip_serializing)] ()), AppClosing(#[serde(skip_serializing)] ()), GenerateMetrics(GenerateMetrics), @@ -29,6 +30,7 @@ impl Payload { AppDependenciesLoaded(_) => "app-dependencies-loaded", AppIntegrationsChange(_) => "app-integrations-change", AppClientConfigurationChange(_) => "app-client-configuration-change", + AppEndpointsChange(_) => "app-endpoints-change", AppHeartbeat(_) => "app-heartbeat", AppClosing(_) => "app-closing", GenerateMetrics(_) => "generate-metrics", diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 1b7c40178b..9b1e0bda6c 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -61,6 +61,11 @@ pub struct AppClientConfigurationChange { pub configuration: Vec, } +#[derive(Debug, Serialize)] +pub struct AppEndpointsChange { + pub endpoints: Vec, +} + #[derive(Serialize, Debug)] pub struct GenerateMetrics { pub series: Vec, diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index 144a65db67..0664edfef3 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -10,7 +10,6 @@ use crate::{ data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry}, metrics::{ContextKey, MetricBuckets, MetricContexts}, }; -use ddcommon::Endpoint; use ddcommon::{hyper_migration, tag::Tag, worker::Worker}; use std::fmt::Debug; @@ -717,7 +716,7 @@ impl TelemetryWorker { if let Some(endpoint) = self.config.endpoint.as_ref() { endpoint.timeout_ms } else { - Endpoint::DEFAULT_TIMEOUT + ddcommon::Endpoint::DEFAULT_TIMEOUT })) => { Err(anyhow::anyhow!("Request timed out")) }, From 8ab2d9f4e396b32e6dcb5c4f61c17a85438701b6 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Thu, 14 Aug 2025 12:04:53 +0200 Subject: [PATCH 03/22] Add missing action --- datadog-sidecar/src/service/telemetry.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index a556f82866..f21f56c43e 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -153,6 +153,9 @@ impl TelemetryCachedClient { SidecarAction::AddTelemetryMetricPoint(point) => { actions.push(self.to_telemetry_point(point)); } + SidecarAction::AddEndpoint(endpoint) => { + actions.push(TelemetryActions::AddEndpoint(endpoint)); + } SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately SidecarAction::ClearQueueId => {} // handled separately } From 66daf8c4b5fa9f1e9a6957c1272bfeb15039df03 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Thu, 21 Aug 2025 12:19:31 +0200 Subject: [PATCH 04/22] Type enums --- ddtelemetry/src/data/payloads.rs | 42 ++++++++++++++++---------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 9b1e0bda6c..9ecc28b1f8 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -111,33 +111,33 @@ pub struct EndpointsPayload { #[serde(rename_all = "UPPERCASE")] #[repr(C)] pub enum Method { - Get, - Post, - Put, - Delete, - Patch, - Head, - Options, - Trace, - Connect, - Other, //This is specified as "*" in the OpenAPI spec + Get = 0, + Post = 1, + Put = 2, + Delete = 3, + Patch = 4, + Head = 5, + Options = 6, + Trace = 7, + Connect = 8, + Other = 9, //This is specified as "*" in the OpenAPI spec } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "UPPERCASE")] #[repr(C)] pub enum Authentication { - Jwt, - Basic, - Oauth, - Oidc, - ApiKey, - Session, - Mtls, - Saml, - Ldap, - Form, - Other, + Jwt = 0, + Basic = 1, + Oauth = 2, + Oidc = 3, + ApiKey = 4, + Session = 5, + Mtls = 6, + Saml = 7, + Ldap = 8, + Form = 9, + Other = 10, } #[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] From d444a77840b25454e66e00b7090aee2c83ed211e Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Thu, 21 Aug 2025 12:49:08 +0200 Subject: [PATCH 05/22] Simplify structs --- ddtelemetry/src/data/payloads.rs | 52 ++++++++++++++++---------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 9ecc28b1f8..f198425028 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -123,22 +123,22 @@ pub enum Method { Other = 9, //This is specified as "*" in the OpenAPI spec } -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] -#[serde(rename_all = "UPPERCASE")] -#[repr(C)] -pub enum Authentication { - Jwt = 0, - Basic = 1, - Oauth = 2, - Oidc = 3, - ApiKey = 4, - Session = 5, - Mtls = 6, - Saml = 7, - Ldap = 8, - Form = 9, - Other = 10, -} +// #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +// #[serde(rename_all = "UPPERCASE")] +// #[repr(C)] +// pub enum Authentication { +// Jwt = 0, +// Basic = 1, +// Oauth = 2, +// Oidc = 3, +// ApiKey = 4, +// Session = 5, +// Mtls = 6, +// Saml = 7, +// Ldap = 8, +// Form = 9, +// Other = 10, +// } #[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] pub struct Endpoint { @@ -150,14 +150,14 @@ pub struct Endpoint { pub path: Option, pub operation_name: String, pub resource_name: String, - #[serde(default)] - pub request_body_type: Option>, - #[serde(default)] - pub response_body_type: Option>, - #[serde(default)] - pub response_code: Option>, - #[serde(default)] - pub authentication: Option>, - #[serde(default)] - pub metadata: Option, + // #[serde(default)] + // pub request_body_type: Option>, + // #[serde(default)] + // pub response_body_type: Option>, + // #[serde(default)] + // pub response_code: Option>, + // #[serde(default)] + // pub authentication: Option>, + // #[serde(default)] + // pub metadata: Option, } From 29d24fae8735d1540d3a9bffa69c9e85a6adff7f Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 25 Aug 2025 15:25:50 +0200 Subject: [PATCH 06/22] wip --- datadog-sidecar-ffi/src/lib.rs | 29 +++++++++++++++++++ datadog-sidecar/src/service/mod.rs | 3 +- datadog-sidecar/src/service/sidecar_server.rs | 8 ++++- datadog-sidecar/src/service/telemetry.rs | 5 ++-- 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 9452a89484..c302bf7e20 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -410,6 +410,35 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig( MaybeError::None } +/// Reports an endpoint to the telemetry. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + r#type: CharSlice, + method: ddtelemetry::data::Method, + path: CharSlice, + operation_name: CharSlice, + resource_name: CharSlice, +) { + let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { + r#type: Some(r#type.to_utf8_lossy().into_owned()), + method: Some(method), + path: Some(path.to_utf8_lossy().into_owned()), + operation_name: operation_name.to_utf8_lossy().into_owned(), + resource_name: resource_name.to_utf8_lossy().into_owned(), + }); + + try_c!(blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::Telemetry(endpoint)], + )); +} + /// Reports a dependency to the telemetry. #[no_mangle] #[allow(clippy::missing_safety_doc)] diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index 7f270220ff..bd080ab9bd 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -71,6 +71,5 @@ pub enum SidecarAction { RegisterTelemetryMetric(MetricContext), AddTelemetryMetricPoint((String, f64, Vec)), PhpComposerTelemetryFile(PathBuf), - ClearQueueId, - AddEndpoint(ddtelemetry::data::Endpoint), + ClearQueueId } diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 2ceaa9b7aa..6f435d5d69 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -448,7 +448,13 @@ impl SidecarInterface for SidecarServer { } SidecarAction::ClearQueueId => { remove_entry = true; - } + }, + SidecarAction::Telemetry(TelemetryActions::AddEndpoint(ref endpoint)) => { + if telemetry.buffered_endpoints.insert(endpoint.clone()) { + buffered_info_changed = true; + actions_to_process.push(action); + } + }, SidecarAction::Telemetry(TelemetryActions::Lifecycle( LifecycleAction::Stop, )) => { diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index f21f56c43e..cde68136ba 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -62,6 +62,7 @@ pub struct TelemetryCachedClient { pub config_sent: bool, pub buffered_integrations: HashSet, pub buffered_composer_paths: HashSet, + pub buffered_endpoints: HashSet, pub telemetry_metrics: Arc>>, pub handle: Arc>>>, } @@ -99,6 +100,7 @@ impl TelemetryCachedClient { config_sent: false, buffered_integrations: HashSet::new(), buffered_composer_paths: HashSet::new(), + buffered_endpoints: HashSet::new(), telemetry_metrics: Default::default(), handle: Arc::new(Mutex::new(None)), } @@ -153,9 +155,6 @@ impl TelemetryCachedClient { SidecarAction::AddTelemetryMetricPoint(point) => { actions.push(self.to_telemetry_point(point)); } - SidecarAction::AddEndpoint(endpoint) => { - actions.push(TelemetryActions::AddEndpoint(endpoint)); - } SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately SidecarAction::ClearQueueId => {} // handled separately } From 619a6d3725d8f26674069980c460166eeb104ff9 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 25 Aug 2025 15:40:34 +0200 Subject: [PATCH 07/22] wip --- datadog-sidecar/src/service/mod.rs | 2 +- datadog-sidecar/src/service/telemetry.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index bd080ab9bd..16b869db84 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -71,5 +71,5 @@ pub enum SidecarAction { RegisterTelemetryMetric(MetricContext), AddTelemetryMetricPoint((String, f64, Vec)), PhpComposerTelemetryFile(PathBuf), - ClearQueueId + ClearQueueId, } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index cde68136ba..4ea9e7e112 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -111,6 +111,7 @@ impl TelemetryCachedClient { &self.config_sent, &self.buffered_integrations, &self.buffered_composer_paths, + &self.buffered_endpoints, )) { self.shm_writer.write(&buf); } else { From 07a007f033b4b4e5fe17699da9218b62a8d295fc Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 25 Aug 2025 16:30:58 +0200 Subject: [PATCH 08/22] Amend endpoints payload --- ddtelemetry/src/data/payloads.rs | 7 +------ ddtelemetry/src/worker/mod.rs | 1 + 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index f198425028..11b463a50a 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -63,6 +63,7 @@ pub struct AppClientConfigurationChange { #[derive(Debug, Serialize)] pub struct AppEndpointsChange { + pub is_first: bool, pub endpoints: Vec, } @@ -101,12 +102,6 @@ pub enum LogLevel { Debug, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub struct EndpointsPayload { - pub is_first: Option, - pub endpoints: Vec, -} - #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] #[serde(rename_all = "UPPERCASE")] #[repr(C)] diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index 0664edfef3..fea2d4df4d 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -526,6 +526,7 @@ impl TelemetryWorker { if self.data.endpoints.flush_not_empty() { payloads.push(data::Payload::AppEndpointsChange( data::AppEndpointsChange { + is_first: true, endpoints: self.data.endpoints.unflushed().cloned().collect(), }, )) From 374770159df873f5d03d56aa8055dcc82faef9cd Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 25 Aug 2025 17:17:33 +0200 Subject: [PATCH 09/22] Fix return --- datadog-sidecar-ffi/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index c302bf7e20..a405b3bd7a 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -422,7 +422,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: CharSlice, operation_name: CharSlice, resource_name: CharSlice, -) { +) -> MaybeError { let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { r#type: Some(r#type.to_utf8_lossy().into_owned()), method: Some(method), @@ -437,6 +437,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( queue_id, vec![SidecarAction::Telemetry(endpoint)], )); + MaybeError::None } /// Reports a dependency to the telemetry. From 71f43e14be54e60a3d595286a9a321a6fc47edf6 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Tue, 26 Aug 2025 12:26:43 +0200 Subject: [PATCH 10/22] Avoid writing all endpoints to shared memory --- datadog-sidecar/src/service/sidecar_server.rs | 15 +++++++-------- datadog-sidecar/src/service/telemetry.rs | 6 +++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 6f435d5d69..cb048b1eda 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -28,7 +28,7 @@ use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tracing::{debug, error, info, trace, warn}; use futures::FutureExt; @@ -421,7 +421,7 @@ impl SidecarInterface for SidecarServer { }, ); - let mut actions_to_process = vec![]; + let mut actions_to_process: Vec = vec![]; let mut composer_paths_to_process = vec![]; let mut buffered_info_changed = false; let mut remove_entry = false; @@ -449,12 +449,11 @@ impl SidecarInterface for SidecarServer { SidecarAction::ClearQueueId => { remove_entry = true; }, - SidecarAction::Telemetry(TelemetryActions::AddEndpoint(ref endpoint)) => { - if telemetry.buffered_endpoints.insert(endpoint.clone()) { - buffered_info_changed = true; - actions_to_process.push(action); - } - }, + SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => { + telemetry.last_endpoints_push = SystemTime::now(); + buffered_info_changed = true; + actions_to_process.push(action); + } SidecarAction::Telemetry(TelemetryActions::Lifecycle( LifecycleAction::Stop, )) => { diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 4ea9e7e112..4017f2df80 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -62,7 +62,7 @@ pub struct TelemetryCachedClient { pub config_sent: bool, pub buffered_integrations: HashSet, pub buffered_composer_paths: HashSet, - pub buffered_endpoints: HashSet, + pub last_endpoints_push: SystemTime, pub telemetry_metrics: Arc>>, pub handle: Arc>>>, } @@ -100,7 +100,7 @@ impl TelemetryCachedClient { config_sent: false, buffered_integrations: HashSet::new(), buffered_composer_paths: HashSet::new(), - buffered_endpoints: HashSet::new(), + last_endpoints_push: SystemTime::UNIX_EPOCH, telemetry_metrics: Default::default(), handle: Arc::new(Mutex::new(None)), } @@ -111,7 +111,7 @@ impl TelemetryCachedClient { &self.config_sent, &self.buffered_integrations, &self.buffered_composer_paths, - &self.buffered_endpoints, + &self.last_endpoints_push, )) { self.shm_writer.write(&buf); } else { From b91ee92327b2a613652f85b9c65b97e79449fcee Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 18 Sep 2025 14:37:57 +0200 Subject: [PATCH 11/22] Fix cloning too much for telemetry workers Turns out telemetry workers were cloning their whole contents, so when the HashSets and HashMaps were updated, the changes weren't persisted. Signed-off-by: Bob Weinand --- datadog-sidecar/src/service/sidecar_server.rs | 30 ++++--- datadog-sidecar/src/service/telemetry.rs | 88 ++++++++++--------- 2 files changed, 63 insertions(+), 55 deletions(-) diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index cb048b1eda..cf863bb035 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -297,12 +297,12 @@ impl SidecarServer { let futures = clients .values() - .filter_map(|client| client.client.stats().ok()) + .filter_map(|client| client.client.lock_or_panic().worker.stats().ok()) .collect::>(); let metric_counts = clients .values() - .map(|client| client.telemetry_metrics.lock_or_panic().len() as u32) + .map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32) .collect::>(); (futures, metric_counts) @@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer { let env = entry.get().env.as_deref().unwrap_or("none"); // Lock telemetry client - let mut telemetry = self.telemetry_clients.get_or_create( + let telemetry_mutex = self.telemetry_clients.get_or_create( service, env, &instance_id, @@ -420,6 +420,7 @@ impl SidecarInterface for SidecarServer { }) }, ); + let mut telemetry = telemetry_mutex.lock_or_panic(); let mut actions_to_process: Vec = vec![]; let mut composer_paths_to_process = vec![]; @@ -467,24 +468,25 @@ impl SidecarInterface for SidecarServer { } if !actions_to_process.is_empty() { - let client_clone = telemetry.clone(); - let mut handle = telemetry.handle.lock_or_panic(); - let last_handle = handle.take(); - *handle = Some(tokio::spawn(async move { + let telemetry_mutex_clone = telemetry_mutex.clone(); + let worker = telemetry.worker.clone(); + let last_handle = telemetry.handle.take(); + telemetry.handle = Some(tokio::spawn(async move { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; - let processed = client_clone.process_actions(actions_to_process); + let processed = telemetry_mutex_clone + .lock_or_panic() + .process_actions(actions_to_process); debug!("Sending Processed Actions :{processed:?}"); - client_clone.client.send_msgs(processed).await.ok(); + worker.send_msgs(processed).await.ok(); })); } if !composer_paths_to_process.is_empty() { - let client_clone = telemetry.clone(); - let mut handle = telemetry.handle.lock_or_panic(); - let last_handle = handle.take(); - *handle = Some(tokio::spawn(async move { + let worker = telemetry.worker.clone(); + let last_handle = telemetry.handle.take(); + telemetry.handle = Some(tokio::spawn(async move { if let Some(last_handle) = last_handle { last_handle.await.ok(); }; @@ -492,7 +494,7 @@ impl SidecarInterface for SidecarServer { TelemetryCachedClient::process_composer_paths(composer_paths_to_process) .await; debug!("Sending Composer Paths :{composer_actions:?}"); - client_clone.client.send_msgs(composer_actions).await.ok(); + worker.send_msgs(composer_actions).await.ok(); })); } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 4017f2df80..c7fabc8bdd 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -54,21 +54,24 @@ struct ComposerPackages { packages: Vec, } -#[derive(Clone)] +pub struct TelemetryCachedEntry { + last_used: Instant, + pub client: Arc>, +} + pub struct TelemetryCachedClient { - pub client: TelemetryWorkerHandle, - pub shm_writer: Arc>, - pub last_used: Instant, + pub worker: TelemetryWorkerHandle, + pub shm_writer: OneWayShmWriter, pub config_sent: bool, pub buffered_integrations: HashSet, pub buffered_composer_paths: HashSet, pub last_endpoints_push: SystemTime, - pub telemetry_metrics: Arc>>, - pub handle: Arc>>>, + pub telemetry_metrics: HashMap, + pub handle: Option>, } impl TelemetryCachedClient { - pub fn new( + fn new( service: &str, env: &str, instance_id: &InstanceId, @@ -91,18 +94,17 @@ impl TelemetryCachedClient { info!("spawning telemetry worker {config:?}"); Self { - client: handle.clone(), - shm_writer: Arc::new( + worker: handle.clone(), + shm_writer: { #[allow(clippy::unwrap_used)] - OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap(), - ), - last_used: Instant::now(), + OneWayShmWriter::::new(path_for_telemetry(service, env)).unwrap() + }, config_sent: false, buffered_integrations: HashSet::new(), buffered_composer_paths: HashSet::new(), last_endpoints_push: SystemTime::UNIX_EPOCH, telemetry_metrics: Default::default(), - handle: Arc::new(Mutex::new(None)), + handle: None, } } @@ -119,12 +121,11 @@ impl TelemetryCachedClient { } } - pub fn register_metric(&self, metric: MetricContext) { - let mut metrics = self.telemetry_metrics.lock_or_panic(); - if !metrics.contains_key(&metric.name) { - metrics.insert( + pub fn register_metric(&mut self, metric: MetricContext) { + if !self.telemetry_metrics.contains_key(&metric.name) { + self.telemetry_metrics.insert( metric.name.clone(), - self.client.register_metric_context( + self.worker.register_metric_context( metric.name, metric.tags, metric.metric_type, @@ -140,14 +141,13 @@ impl TelemetryCachedClient { (name, val, tags): (String, f64, Vec), ) -> TelemetryActions { #[allow(clippy::unwrap_used)] - TelemetryActions::AddPoint(( - val, - *self.telemetry_metrics.lock_or_panic().get(&name).unwrap(), - tags, - )) + TelemetryActions::AddPoint((val, *self.telemetry_metrics.get(&name).unwrap(), tags)) } - pub fn process_actions(&self, sidecar_actions: Vec) -> Vec { + pub fn process_actions( + &mut self, + sidecar_actions: Vec, + ) -> Vec { let mut actions = vec![]; for action in sidecar_actions { match action { @@ -236,13 +236,13 @@ type EnvString = String; type TelemetryCachedClientKey = (ServiceString, EnvString); pub struct TelemetryCachedClientSet { - pub inner: Arc>>, + pub inner: Arc>>, cleanup_handle: Option>, } impl Default for TelemetryCachedClientSet { fn default() -> Self { - let inner: Arc>> = + let inner: Arc>> = Arc::new(Default::default()); let clients = inner.clone(); @@ -286,7 +286,7 @@ impl TelemetryCachedClientSet { instance_id: &InstanceId, runtime_meta: &RuntimeMetadata, get_config: F, - ) -> TelemetryCachedClient + ) -> Arc> where F: FnOnce() -> ddtelemetry::config::Config, { @@ -296,12 +296,10 @@ impl TelemetryCachedClientSet { if let Some(existing) = map.get_mut(&key) { existing.last_used = Instant::now(); - let client = existing.clone(); tokio::spawn({ - let telemetry = client.clone(); + let worker = existing.client.lock_or_panic().worker.clone(); async move { - telemetry - .client + worker .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await .ok(); @@ -309,19 +307,26 @@ impl TelemetryCachedClientSet { }); info!("Reusing existing telemetry client for {key:?}"); - return client; + return existing.client.clone(); } - let client = - TelemetryCachedClient::new(service, env, instance_id, runtime_meta, get_config); + let entry = TelemetryCachedEntry { + last_used: Instant::now(), + client: Arc::new(Mutex::new(TelemetryCachedClient::new( + service, + env, + instance_id, + runtime_meta, + get_config, + ))), + }; - map.insert(key.clone(), client.clone()); + let entry = map.entry(key.clone()).or_insert(entry); tokio::spawn({ - let telemetry = client.clone(); + let worker = entry.client.lock_or_panic().worker.clone(); async move { - telemetry - .client + worker .send_msg(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await .ok(); @@ -330,7 +335,7 @@ impl TelemetryCachedClientSet { info!("Created new telemetry client for {key:?}"); - client + entry.client.clone() } pub fn remove_telemetry_client(&self, service: &str, env: &str) { @@ -388,10 +393,11 @@ pub(crate) async fn telemetry_action_receiver_task(sidecar: SidecarServer) { &actions.service_name, &actions.env_name, ); + let client = telemetry_client.lock_or_panic().worker.clone(); for action in actions.actions { let action_str = format!("{action:?}"); - match telemetry_client.client.send_msg(action).await { + match client.send_msg(action).await { Ok(_) => { debug!("Sent telemetry action to TelemetryWorker: {action_str}"); } @@ -409,7 +415,7 @@ fn get_telemetry_client( instance_id: &InstanceId, service_name: &str, env_name: &str, -) -> TelemetryCachedClient { +) -> Arc> { let session = sidecar.get_session(&instance_id.session_id); let trace_config = session.get_trace_config(); let runtime_meta = RuntimeMetadata::new( From dc30767407fc0cb6bf67993ab057777a4df835cb Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Tue, 23 Sep 2025 10:18:37 +0200 Subject: [PATCH 12/22] Add more fields to endpoints --- datadog-sidecar-ffi/src/lib.rs | 8 ++++++ ddtelemetry/src/data/payloads.rs | 48 ++++++++++++++++---------------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index a405b3bd7a..e33c49b408 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -422,6 +422,10 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: CharSlice, operation_name: CharSlice, resource_name: CharSlice, + request_body_type: ffi::Vec, + response_body_type: ffi::Vec, + response_code: ffi::Vec, + authentication: ffi::Vec ) -> MaybeError { let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { r#type: Some(r#type.to_utf8_lossy().into_owned()), @@ -429,6 +433,10 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: Some(path.to_utf8_lossy().into_owned()), operation_name: operation_name.to_utf8_lossy().into_owned(), resource_name: resource_name.to_utf8_lossy().into_owned(), + request_body_type: Some(request_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + response_body_type: Some(response_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + response_code: Some(response_code.iter().cloned().collect()), + authentication: Some(authentication.iter().cloned().collect()), }); try_c!(blocking::enqueue_actions( diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 11b463a50a..b37e7fc443 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -118,22 +118,22 @@ pub enum Method { Other = 9, //This is specified as "*" in the OpenAPI spec } -// #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] -// #[serde(rename_all = "UPPERCASE")] -// #[repr(C)] -// pub enum Authentication { -// Jwt = 0, -// Basic = 1, -// Oauth = 2, -// Oidc = 3, -// ApiKey = 4, -// Session = 5, -// Mtls = 6, -// Saml = 7, -// Ldap = 8, -// Form = 9, -// Other = 10, -// } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Clone)] +#[serde(rename_all = "UPPERCASE")] +#[repr(C)] +pub enum Authentication { + Jwt = 0, + Basic = 1, + Oauth = 2, + Oidc = 3, + ApiKey = 4, + Session = 5, + Mtls = 6, + Saml = 7, + Ldap = 8, + Form = 9, + Other = 10, +} #[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] pub struct Endpoint { @@ -145,14 +145,14 @@ pub struct Endpoint { pub path: Option, pub operation_name: String, pub resource_name: String, - // #[serde(default)] - // pub request_body_type: Option>, - // #[serde(default)] - // pub response_body_type: Option>, - // #[serde(default)] - // pub response_code: Option>, - // #[serde(default)] - // pub authentication: Option>, + #[serde(default)] + pub request_body_type: Option>, + #[serde(default)] + pub response_body_type: Option>, + #[serde(default)] + pub response_code: Option>, + #[serde(default)] + pub authentication: Option>, // #[serde(default)] // pub metadata: Option, } From ea929301b115f38e3998a9efd34e15e2cb32f197 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Thu, 25 Sep 2025 12:00:14 +0200 Subject: [PATCH 13/22] Temp changes --- datadog-sidecar-ffi/src/lib.rs | 51 ++++++++++++++++++++------------ ddtelemetry/src/data/payloads.rs | 8 ++--- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index e33c49b408..8370a7e40b 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -56,6 +56,9 @@ use std::ptr::NonNull; use std::slice; use std::sync::Arc; use std::time::Duration; +use std::fs::OpenOptions; +use std::io::Write; + #[no_mangle] #[cfg(target_os = "windows")] @@ -424,28 +427,38 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( resource_name: CharSlice, request_body_type: ffi::Vec, response_body_type: ffi::Vec, - response_code: ffi::Vec, - authentication: ffi::Vec + // response_code: ffi::Vec, + // authentication: ffi::Vec ) -> MaybeError { - let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { - r#type: Some(r#type.to_utf8_lossy().into_owned()), - method: Some(method), - path: Some(path.to_utf8_lossy().into_owned()), - operation_name: operation_name.to_utf8_lossy().into_owned(), - resource_name: resource_name.to_utf8_lossy().into_owned(), - request_body_type: Some(request_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), - response_body_type: Some(response_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), - response_code: Some(response_code.iter().cloned().collect()), - authentication: Some(authentication.iter().cloned().collect()), - }); - try_c!(blocking::enqueue_actions( - transport, - instance_id, - queue_id, - vec![SidecarAction::Telemetry(endpoint)], - )); + if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/tmp/alex.log") { + let _ = writeln!(file, "Here Alex 0"); + let _ = writeln!(file, "Request body type - {:?}", request_body_type.first()); + let _ = writeln!(file, "Response body type - {:?}", response_body_type.first()); + } + MaybeError::None + + + // let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { + // r#type: Some(r#type.to_utf8_lossy().into_owned()), + // method: Some(method), + // path: Some(path.to_utf8_lossy().into_owned()), + // operation_name: operation_name.to_utf8_lossy().into_owned(), + // resource_name: resource_name.to_utf8_lossy().into_owned(), + // request_body_type: Some(request_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + // response_body_type: Some(response_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + // // response_code: Some(response_code.iter().cloned().collect()), + // // authentication: Some(authentication.iter().cloned().collect()), + // }); + + // try_c!(blocking::enqueue_actions( + // transport, + // instance_id, + // queue_id, + // vec![SidecarAction::Telemetry(endpoint)], + // )); + // MaybeError::None } /// Reports a dependency to the telemetry. diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index b37e7fc443..77610a7c0d 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -149,10 +149,10 @@ pub struct Endpoint { pub request_body_type: Option>, #[serde(default)] pub response_body_type: Option>, - #[serde(default)] - pub response_code: Option>, - #[serde(default)] - pub authentication: Option>, + // #[serde(default)] + // pub response_code: Option>, + // #[serde(default)] + // pub authentication: Option>, // #[serde(default)] // pub metadata: Option, } From f20d32c8433aae03326da1ca455fdd394b86d678 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Thu, 25 Sep 2025 12:47:31 +0200 Subject: [PATCH 14/22] Add string vectors --- datadog-sidecar-ffi/src/lib.rs | 48 +++++++++++++------------------- ddtelemetry/src/data/payloads.rs | 6 ++-- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 8370a7e40b..805a8d2256 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -425,40 +425,30 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: CharSlice, operation_name: CharSlice, resource_name: CharSlice, - request_body_type: ffi::Vec, - response_body_type: ffi::Vec, + request_body_type:&mut ffi::Vec, + response_body_type:&mut ffi::Vec, // response_code: ffi::Vec, // authentication: ffi::Vec ) -> MaybeError { + let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { + r#type: Some(r#type.to_utf8_lossy().into_owned()), + method: Some(method), + path: Some(path.to_utf8_lossy().into_owned()), + operation_name: operation_name.to_utf8_lossy().into_owned(), + resource_name: resource_name.to_utf8_lossy().into_owned(), + request_body_type: Some(request_body_type.as_slice().to_vec()), + response_body_type: Some(response_body_type.as_slice().to_vec()), + // response_code: Some(response_code.iter().cloned().collect()), + // authentication: Some(authentication.iter().cloned().collect()), + }); - if let Ok(mut file) = OpenOptions::new().create(true).append(true).open("/tmp/alex.log") { - let _ = writeln!(file, "Here Alex 0"); - let _ = writeln!(file, "Request body type - {:?}", request_body_type.first()); - let _ = writeln!(file, "Response body type - {:?}", response_body_type.first()); - } - + try_c!(blocking::enqueue_actions( + transport, + instance_id, + queue_id, + vec![SidecarAction::Telemetry(endpoint)], + )); MaybeError::None - - - // let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { - // r#type: Some(r#type.to_utf8_lossy().into_owned()), - // method: Some(method), - // path: Some(path.to_utf8_lossy().into_owned()), - // operation_name: operation_name.to_utf8_lossy().into_owned(), - // resource_name: resource_name.to_utf8_lossy().into_owned(), - // request_body_type: Some(request_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), - // response_body_type: Some(response_body_type.iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), - // // response_code: Some(response_code.iter().cloned().collect()), - // // authentication: Some(authentication.iter().cloned().collect()), - // }); - - // try_c!(blocking::enqueue_actions( - // transport, - // instance_id, - // queue_id, - // vec![SidecarAction::Telemetry(endpoint)], - // )); - // MaybeError::None } /// Reports a dependency to the telemetry. diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 77610a7c0d..e0036cf391 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -5,6 +5,8 @@ use crate::data::metrics; use serde::{Deserialize, Serialize}; +use std::os::raw::c_char; + #[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] pub struct Dependency { pub name: String, @@ -146,9 +148,9 @@ pub struct Endpoint { pub operation_name: String, pub resource_name: String, #[serde(default)] - pub request_body_type: Option>, + pub request_body_type: Option>, #[serde(default)] - pub response_body_type: Option>, + pub response_body_type: Option>, // #[serde(default)] // pub response_code: Option>, // #[serde(default)] From 67c9d00f8a2eb911e9e3ad0e58fca6c6a9aaff67 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 29 Sep 2025 10:21:45 +0200 Subject: [PATCH 15/22] Add i32 vector --- datadog-sidecar-ffi/src/lib.rs | 6 +++--- ddtelemetry/src/data/payloads.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 805a8d2256..c9f9288dca 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -427,7 +427,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( resource_name: CharSlice, request_body_type:&mut ffi::Vec, response_body_type:&mut ffi::Vec, - // response_code: ffi::Vec, + response_code:&mut ffi::Vec, // authentication: ffi::Vec ) -> MaybeError { let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { @@ -438,8 +438,8 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( resource_name: resource_name.to_utf8_lossy().into_owned(), request_body_type: Some(request_body_type.as_slice().to_vec()), response_body_type: Some(response_body_type.as_slice().to_vec()), - // response_code: Some(response_code.iter().cloned().collect()), - // authentication: Some(authentication.iter().cloned().collect()), + response_code: Some(response_code.to_vec()), + authentication: Some(authentication.iter().cloned().collect()), }); try_c!(blocking::enqueue_actions( diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index e0036cf391..ece1fce327 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -151,8 +151,8 @@ pub struct Endpoint { pub request_body_type: Option>, #[serde(default)] pub response_body_type: Option>, - // #[serde(default)] - // pub response_code: Option>, + #[serde(default)] + pub response_code: Option>, // #[serde(default)] // pub authentication: Option>, // #[serde(default)] From 485ae3060ecbc58e37b7e693b2e569fc94907dda Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 29 Sep 2025 10:42:18 +0200 Subject: [PATCH 16/22] Add authentication vec --- datadog-sidecar-ffi/src/lib.rs | 4 ++-- ddtelemetry/src/data/payloads.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index c9f9288dca..fd763397cf 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -428,7 +428,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( request_body_type:&mut ffi::Vec, response_body_type:&mut ffi::Vec, response_code:&mut ffi::Vec, - // authentication: ffi::Vec + authentication: ffi::Vec ) -> MaybeError { let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { r#type: Some(r#type.to_utf8_lossy().into_owned()), @@ -439,7 +439,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( request_body_type: Some(request_body_type.as_slice().to_vec()), response_body_type: Some(response_body_type.as_slice().to_vec()), response_code: Some(response_code.to_vec()), - authentication: Some(authentication.iter().cloned().collect()), + authentication: Some(authentication.to_vec()), }); try_c!(blocking::enqueue_actions( diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index ece1fce327..8eaecef6fb 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -153,8 +153,8 @@ pub struct Endpoint { pub response_body_type: Option>, #[serde(default)] pub response_code: Option>, - // #[serde(default)] - // pub authentication: Option>, + #[serde(default)] + pub authentication: Option>, // #[serde(default)] // pub metadata: Option, } From 9f34ed4612ca12d74a5bccac4f2ac3e6439e901f Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 29 Sep 2025 12:10:43 +0200 Subject: [PATCH 17/22] Add serde json --- datadog-sidecar-ffi/Cargo.toml | 1 + datadog-sidecar-ffi/src/lib.rs | 9 ++++++--- ddtelemetry/src/data/payloads.rs | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/datadog-sidecar-ffi/Cargo.toml b/datadog-sidecar-ffi/Cargo.toml index 871fc8a7ea..14cd1ed0a0 100644 --- a/datadog-sidecar-ffi/Cargo.toml +++ b/datadog-sidecar-ffi/Cargo.toml @@ -26,6 +26,7 @@ paste = "1" libc = "0.2" tracing = { version = "0.1", default-features = false } rmp-serde = "1.1.1" +serde_json = "1.0" [target.'cfg(windows)'.dependencies] diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index fd763397cf..e9dc2fa944 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -56,8 +56,7 @@ use std::ptr::NonNull; use std::slice; use std::sync::Arc; use std::time::Duration; -use std::fs::OpenOptions; -use std::io::Write; +use serde_json::Value; #[no_mangle] @@ -428,8 +427,11 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( request_body_type:&mut ffi::Vec, response_body_type:&mut ffi::Vec, response_code:&mut ffi::Vec, - authentication: ffi::Vec + authentication: ffi::Vec, + metadata: CharSlice ) -> MaybeError { + + let metadata_json = serde_json::from_slice::(&metadata.to_utf8_lossy().into_owned().as_bytes()).unwrap(); let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { r#type: Some(r#type.to_utf8_lossy().into_owned()), method: Some(method), @@ -440,6 +442,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( response_body_type: Some(response_body_type.as_slice().to_vec()), response_code: Some(response_code.to_vec()), authentication: Some(authentication.to_vec()), + metadata: Some(metadata_json), }); try_c!(blocking::enqueue_actions( diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 8eaecef6fb..011ee7f6c8 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -155,6 +155,6 @@ pub struct Endpoint { pub response_code: Option>, #[serde(default)] pub authentication: Option>, - // #[serde(default)] - // pub metadata: Option, + #[serde(default)] + pub metadata: Option, } From 3b8d8955e74902ea5184ea3be2eddc79b7479d1f Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 29 Sep 2025 16:04:21 +0200 Subject: [PATCH 18/22] Replace char_c for slices --- Cargo.lock | 2 ++ datadog-sidecar-ffi/src/lib.rs | 10 +++++----- ddtelemetry/Cargo.toml | 1 + ddtelemetry/src/data/payloads.rs | 6 ++---- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 073c14f257..4e74161001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,6 +1815,7 @@ dependencies = [ "libc", "paste", "rmp-serde", + "serde_json", "tempfile", "tinybytes", "tracing", @@ -1979,6 +1980,7 @@ dependencies = [ "base64 0.22.1", "datadog-ddsketch", "ddcommon", + "ddcommon-ffi", "futures", "hashbrown 0.15.1", "http 1.1.0", diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index e9dc2fa944..eb92e4b9df 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -424,10 +424,10 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: CharSlice, operation_name: CharSlice, resource_name: CharSlice, - request_body_type:&mut ffi::Vec, - response_body_type:&mut ffi::Vec, + request_body_type:&mut ffi::Vec, + response_body_type:&mut ffi::Vec, response_code:&mut ffi::Vec, - authentication: ffi::Vec, + authentication:&mut ffi::Vec, metadata: CharSlice ) -> MaybeError { @@ -438,8 +438,8 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: Some(path.to_utf8_lossy().into_owned()), operation_name: operation_name.to_utf8_lossy().into_owned(), resource_name: resource_name.to_utf8_lossy().into_owned(), - request_body_type: Some(request_body_type.as_slice().to_vec()), - response_body_type: Some(response_body_type.as_slice().to_vec()), + request_body_type: Some(request_body_type.to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + response_body_type: Some(response_body_type.to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), response_code: Some(response_code.to_vec()), authentication: Some(authentication.to_vec()), metadata: Some(metadata_json), diff --git a/ddtelemetry/Cargo.toml b/ddtelemetry/Cargo.toml index 824a0e6490..5dacec6bed 100644 --- a/ddtelemetry/Cargo.toml +++ b/ddtelemetry/Cargo.toml @@ -32,6 +32,7 @@ uuid = { version = "1.3", features = ["v4"] } hashbrown = "0.15" ddcommon = { path = "../ddcommon", default-features = false} +ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false } datadog-ddsketch = { path = "../ddsketch" } [dev-dependencies] diff --git a/ddtelemetry/src/data/payloads.rs b/ddtelemetry/src/data/payloads.rs index 011ee7f6c8..74edf55e59 100644 --- a/ddtelemetry/src/data/payloads.rs +++ b/ddtelemetry/src/data/payloads.rs @@ -5,8 +5,6 @@ use crate::data::metrics; use serde::{Deserialize, Serialize}; -use std::os::raw::c_char; - #[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone, Default)] pub struct Dependency { pub name: String, @@ -148,9 +146,9 @@ pub struct Endpoint { pub operation_name: String, pub resource_name: String, #[serde(default)] - pub request_body_type: Option>, + pub request_body_type: Option>, #[serde(default)] - pub response_body_type: Option>, + pub response_body_type: Option>, #[serde(default)] pub response_code: Option>, #[serde(default)] From 7210fbda345f7971d61af4be56e355f3935f813b Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 29 Sep 2025 17:14:31 +0200 Subject: [PATCH 19/22] Make response code a non vector --- datadog-sidecar-ffi/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index eb92e4b9df..35aee6151d 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -426,11 +426,13 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( resource_name: CharSlice, request_body_type:&mut ffi::Vec, response_body_type:&mut ffi::Vec, - response_code:&mut ffi::Vec, + response_code:i32, authentication:&mut ffi::Vec, metadata: CharSlice ) -> MaybeError { + let response_code_vec = vec![response_code]; + let metadata_json = serde_json::from_slice::(&metadata.to_utf8_lossy().into_owned().as_bytes()).unwrap(); let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { r#type: Some(r#type.to_utf8_lossy().into_owned()), @@ -440,7 +442,7 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( resource_name: resource_name.to_utf8_lossy().into_owned(), request_body_type: Some(request_body_type.to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), response_body_type: Some(response_body_type.to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), - response_code: Some(response_code.to_vec()), + response_code: Some(response_code_vec), authentication: Some(authentication.to_vec()), metadata: Some(metadata_json), }); From 73bb7dab19aeb8b308b3b16ee9591345dc0332c8 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Tue, 30 Sep 2025 10:39:15 +0200 Subject: [PATCH 20/22] Drop pointers --- datadog-sidecar-ffi/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 35aee6151d..f01e0e2fea 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -453,6 +453,11 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( queue_id, vec![SidecarAction::Telemetry(endpoint)], )); + + std::ptr::drop_in_place(request_body_type); + std::ptr::drop_in_place(response_body_type); + std::ptr::drop_in_place(authentication); + MaybeError::None } From 056d2fdb3693ef5da493e617d612c9dd6a1b1076 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Tue, 30 Sep 2025 13:10:06 +0200 Subject: [PATCH 21/22] Amend linting --- datadog-sidecar-ffi/src/lib.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index f01e0e2fea..5ffcc1db6d 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -433,7 +433,15 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( let response_code_vec = vec![response_code]; - let metadata_json = serde_json::from_slice::(&metadata.to_utf8_lossy().into_owned().as_bytes()).unwrap(); + let maybe_metadata = serde_json::from_slice::(std::slice::from_raw_parts( + metadata.as_ptr() as *const u8, + metadata.len(), + )); + if let Err(e) = maybe_metadata { + return MaybeError::Some(e.to_string().into()); + } + #[allow(clippy::unwrap_used)] + let metadata_json = maybe_metadata.unwrap(); let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint { r#type: Some(r#type.to_utf8_lossy().into_owned()), method: Some(method), From 3681f4cc68869cad0180c0560962dd46c5100e51 Mon Sep 17 00:00:00 2001 From: Alejandro Estringana Ruiz Date: Mon, 6 Oct 2025 10:24:13 +0200 Subject: [PATCH 22/22] Wip --- datadog-sidecar-ffi/src/lib.rs | 38 +++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 5ffcc1db6d..8830d71bd9 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -424,16 +424,40 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: CharSlice, operation_name: CharSlice, resource_name: CharSlice, - request_body_type:&mut ffi::Vec, - response_body_type:&mut ffi::Vec, + request_body_type:*mut ffi::Vec, + response_body_type:*mut ffi::Vec, response_code:i32, - authentication:&mut ffi::Vec, + authentication:*mut ffi::Vec, metadata: CharSlice ) -> MaybeError { let response_code_vec = vec![response_code]; + let request_body_type_local = Box::into_raw(Box::new(ffi::Vec::::new())); + let response_body_type_local = Box::into_raw(Box::new(ffi::Vec::::new())); + let authentication_local = Box::into_raw(Box::new(ffi::Vec::::new())); + if let Some(req_vec) = request_body_type.as_ref() { + if let Some(local_vec) = request_body_type_local.as_mut() { + for item in req_vec.to_vec() { + local_vec.push(item); + } + } + } + if let Some(resp_vec) = response_body_type.as_ref() { + if let Some(local_vec) = response_body_type_local.as_mut() { + for item in resp_vec.to_vec() { + local_vec.push(item); + } + } + } + if let Some(auth_vec) = authentication.as_ref() { + if let Some(local_vec) = authentication_local.as_mut() { + for item in auth_vec.to_vec() { + local_vec.push(item); + } + } + } - let maybe_metadata = serde_json::from_slice::(std::slice::from_raw_parts( + let maybe_metadata: Result = serde_json::from_slice::(std::slice::from_raw_parts( metadata.as_ptr() as *const u8, metadata.len(), )); @@ -448,10 +472,10 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint( path: Some(path.to_utf8_lossy().into_owned()), operation_name: operation_name.to_utf8_lossy().into_owned(), resource_name: resource_name.to_utf8_lossy().into_owned(), - request_body_type: Some(request_body_type.to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), - response_body_type: Some(response_body_type.to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + request_body_type: Some(request_body_type_local.as_ref().unwrap().to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), + response_body_type: Some(response_body_type_local.as_ref().unwrap().to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()), response_code: Some(response_code_vec), - authentication: Some(authentication.to_vec()), + authentication: Some(authentication_local.as_ref().unwrap().to_vec()), metadata: Some(metadata_json), });