Skip to content

Commit a120146

Browse files
committed
Fix errors
1 parent cf32144 commit a120146

File tree

5 files changed

+299
-3
lines changed

5 files changed

+299
-3
lines changed

datadog-sidecar/src/service/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,5 @@ pub enum SidecarAction {
7272
AddTelemetryMetricPoint((String, f64, Vec<Tag>)),
7373
PhpComposerTelemetryFile(PathBuf),
7474
ClearQueueId,
75-
AddRoute(Route),
75+
AddEndpoint(ddtelemetry::data::Endpoint),
7676
}
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use ddcommon::tag::Tag;
5+
use ddtelemetry::data;
6+
use ddtelemetry::metrics::MetricContext;
7+
use ddtelemetry::worker::store::Store;
8+
use ddtelemetry::worker::{TelemetryActions, MAX_ITEMS};
9+
use futures::future::Shared;
10+
use futures::FutureExt;
11+
use manual_future::ManualFuture;
12+
use serde::Deserialize;
13+
use serde_with::{serde_as, VecSkipError};
14+
use std::collections::HashMap;
15+
use std::ops::Sub;
16+
use std::path::PathBuf;
17+
use std::sync::atomic::AtomicU64;
18+
use std::sync::atomic::Ordering;
19+
use std::sync::{Arc, LazyLock};
20+
use std::time::{Duration, Instant, SystemTime};
21+
use tracing::warn;
22+
23+
use super::enqueued_telemetry_stats::EnqueuedTelemetryStats;
24+
use crate::service::telemetry::AppInstance;
25+
use crate::service::SidecarAction;
26+
27+
type ComposerCache = HashMap<PathBuf, (SystemTime, Arc<Vec<data::Dependency>>)>;
28+
29+
static COMPOSER_CACHE: LazyLock<tokio::sync::Mutex<ComposerCache>> =
30+
LazyLock::new(|| tokio::sync::Mutex::new(Default::default()));
31+
32+
static LAST_CACHE_CLEAN: LazyLock<AtomicU64> = LazyLock::new(|| AtomicU64::new(0));
33+
34+
#[serde_as]
35+
#[derive(Deserialize)]
36+
struct ComposerPackages {
37+
#[serde_as(as = "VecSkipError<_>")]
38+
packages: Vec<data::Dependency>,
39+
}
40+
41+
/// `EnqueuedTelemetryData` is a structure that holds telemetry data that is queued for processing.
42+
pub(crate) struct EnqueuedTelemetryData {
43+
dependencies: Store<data::Dependency>,
44+
configurations: Store<data::Configuration>,
45+
integrations: Store<data::Integration>,
46+
endpoints: Store<data::Endpoint>,
47+
pub(crate) metrics: Vec<MetricContext>,
48+
pub(crate) points: Vec<(String, f64, Vec<Tag>)>,
49+
pub(crate) actions: Vec<TelemetryActions>,
50+
computed_dependencies: Vec<Shared<ManualFuture<Arc<Vec<data::Dependency>>>>>,
51+
}
52+
53+
impl Default for EnqueuedTelemetryData {
54+
fn default() -> Self {
55+
Self {
56+
dependencies: Store::new(MAX_ITEMS),
57+
configurations: Store::new(MAX_ITEMS),
58+
integrations: Store::new(MAX_ITEMS),
59+
endpoints: Store::new(MAX_ITEMS),
60+
metrics: Vec::new(),
61+
points: Vec::new(),
62+
actions: Vec::new(),
63+
computed_dependencies: Vec::new(),
64+
}
65+
}
66+
}
67+
68+
impl EnqueuedTelemetryData {
69+
/// Processes a vector of `SidecarAction` and stores the telemetry data accordingly.
70+
///
71+
/// # Arguments
72+
///
73+
/// * `actions` - A vector of `SidecarAction` that needs to be processed.
74+
pub fn process(&mut self, actions: Vec<SidecarAction>) {
75+
for action in actions {
76+
match action {
77+
SidecarAction::Telemetry(TelemetryActions::AddConfig(c)) => {
78+
self.configurations.insert(c)
79+
}
80+
SidecarAction::Telemetry(TelemetryActions::AddDependency(d)) => {
81+
self.dependencies.insert(d)
82+
}
83+
SidecarAction::Telemetry(TelemetryActions::AddIntegration(i)) => {
84+
self.integrations.insert(i)
85+
}
86+
SidecarAction::Telemetry(other) => self.actions.push(other),
87+
SidecarAction::PhpComposerTelemetryFile(composer_path) => self
88+
.computed_dependencies
89+
.push(Self::extract_composer_telemetry(composer_path).shared()),
90+
91+
SidecarAction::RegisterTelemetryMetric(m) => self.metrics.push(m),
92+
SidecarAction::AddTelemetryMetricPoint(p) => self.points.push(p),
93+
SidecarAction::AddEndpoint(e) => self.endpoints.insert(e),
94+
}
95+
}
96+
}
97+
98+
/// Creates a new `EnqueuedTelemetryData` instance and processes a vector of `SidecarAction`.
99+
///
100+
/// # Arguments
101+
///
102+
/// * `action` - A vector of `SidecarAction` that needs to be processed.
103+
///
104+
/// # Returns
105+
///
106+
/// * An instance of `EnqueuedTelemetryData`.
107+
pub fn processed(action: Vec<SidecarAction>) -> Self {
108+
let mut data = Self::default();
109+
data.process(action);
110+
111+
data
112+
}
113+
114+
/// Extracts telemetry actions from the stored data and adds them to the provided vector.
115+
///
116+
/// # Arguments
117+
///
118+
/// * `actions` - A mutable reference to a vector of `TelemetryActions` where the extracted
119+
/// actions will be added.
120+
pub(crate) async fn extract_telemetry_actions(&mut self, actions: &mut Vec<TelemetryActions>) {
121+
for computed_deps in self.computed_dependencies.clone() {
122+
for d in computed_deps.await.iter() {
123+
actions.push(TelemetryActions::AddDependency(d.clone()));
124+
}
125+
}
126+
for d in self.dependencies.unflushed() {
127+
actions.push(TelemetryActions::AddDependency(d.clone()));
128+
}
129+
for c in self.configurations.unflushed() {
130+
actions.push(TelemetryActions::AddConfig(c.clone()));
131+
}
132+
for i in self.integrations.unflushed() {
133+
actions.push(TelemetryActions::AddIntegration(i.clone()));
134+
}
135+
}
136+
137+
/// Processes a vector of `SidecarAction` immediately and returns a vector of
138+
/// `TelemetryActions`.
139+
///
140+
/// # Arguments
141+
///
142+
/// * `sidecar_actions` - A vector of `SidecarAction` that needs to be processed immediately.
143+
/// * `app` - A mutable reference to an `AppInstance`.
144+
///
145+
/// # Returns
146+
///
147+
/// * A vector of `TelemetryActions` that resulted from the immediate processing.
148+
pub async fn process_immediately(
149+
sidecar_actions: Vec<SidecarAction>,
150+
app: &mut AppInstance,
151+
) -> Vec<TelemetryActions> {
152+
let mut actions = vec![];
153+
for action in sidecar_actions {
154+
match action {
155+
SidecarAction::Telemetry(t) => actions.push(t),
156+
SidecarAction::PhpComposerTelemetryFile(path) => {
157+
for nested in Self::extract_composer_telemetry(path).await.iter() {
158+
actions.push(TelemetryActions::AddDependency(nested.clone()));
159+
}
160+
}
161+
SidecarAction::RegisterTelemetryMetric(metric) => app.register_metric(metric),
162+
SidecarAction::AddTelemetryMetricPoint(point) => {
163+
actions.push(app.to_telemetry_point(point));
164+
}
165+
SidecarAction::AddEndpoint(endpoint) => {
166+
actions.push(TelemetryActions::AddEndpoint(endpoint));
167+
}
168+
}
169+
}
170+
actions
171+
}
172+
173+
/// Parses and extracts telemetry data from a vendor/composer/installed.json file and returns a
174+
/// future of the data. The data is cached for a short period of time to avoid unnecessary
175+
/// parsing.
176+
///
177+
/// # Arguments
178+
///
179+
/// * `path` - A `PathBuf` that represents the path to the composer file.
180+
///
181+
/// # Returns
182+
///
183+
/// * A `ManualFuture` that resolves to an `Arc<Vec<data::Dependency>>>`.
184+
fn extract_composer_telemetry(path: PathBuf) -> ManualFuture<Arc<Vec<data::Dependency>>> {
185+
let (deps, completer) = ManualFuture::new();
186+
tokio::spawn(async {
187+
let mut cache = COMPOSER_CACHE.lock().await;
188+
let packages = match tokio::fs::metadata(&path).await.and_then(|m| m.modified()) {
189+
Err(e) => {
190+
warn!("Failed to report dependencies from {path:?}, could not read modification time: {e:?}");
191+
Arc::new(vec![])
192+
}
193+
Ok(modification) => {
194+
let now = SystemTime::now();
195+
if let Some((last_update, actions)) = cache.get(&path) {
196+
if modification < *last_update {
197+
completer.complete(actions.clone()).await;
198+
return;
199+
}
200+
}
201+
async fn parse(path: &PathBuf) -> anyhow::Result<Vec<data::Dependency>> {
202+
let mut json = tokio::fs::read(&path).await?;
203+
#[cfg(not(target_arch = "x86"))]
204+
let parsed: ComposerPackages = simd_json::from_slice(json.as_mut_slice())?;
205+
#[cfg(target_arch = "x86")]
206+
let parsed = crate::interface::ComposerPackages { packages: vec![] }; // not interested in 32 bit
207+
Ok(parsed.packages)
208+
}
209+
let packages = Arc::new(parse(&path).await.unwrap_or_else(|e| {
210+
warn!("Failed to report dependencies from {path:?}: {e:?}");
211+
vec![]
212+
}));
213+
cache.insert(path, (now, packages.clone()));
214+
// cheap way to avoid unbounded caching
215+
const CACHE_INTERVAL: u64 = 2000;
216+
let last_clean = LAST_CACHE_CLEAN.load(Ordering::Relaxed);
217+
let now_secs = Instant::now().elapsed().as_secs();
218+
if now_secs > last_clean + CACHE_INTERVAL
219+
&& LAST_CACHE_CLEAN
220+
.compare_exchange(
221+
last_clean,
222+
now_secs,
223+
Ordering::SeqCst,
224+
Ordering::Acquire,
225+
)
226+
.is_ok()
227+
{
228+
cache.retain(|_, (inserted, _)| {
229+
*inserted > now.sub(Duration::from_secs(CACHE_INTERVAL))
230+
});
231+
}
232+
packages
233+
}
234+
};
235+
completer.complete(packages).await;
236+
});
237+
deps
238+
}
239+
240+
/// Returns the statistics of the stored telemetry data.
241+
///
242+
/// # Returns
243+
///
244+
/// * An instance of `EnqueuedTelemetryStats` that represents the statistics of the stored
245+
/// telemetry data.
246+
pub fn stats(&self) -> EnqueuedTelemetryStats {
247+
EnqueuedTelemetryStats {
248+
dependencies_stored: self.dependencies.len_stored() as u32,
249+
dependencies_unflushed: self.dependencies.len_unflushed() as u32,
250+
configurations_stored: self.configurations.len_stored() as u32,
251+
configurations_unflushed: self.configurations.len_unflushed() as u32,
252+
integrations_stored: self.integrations.len_stored() as u32,
253+
integrations_unflushed: self.integrations.len_unflushed() as u32,
254+
metrics: self.metrics.len() as u32,
255+
points: self.points.len() as u32,
256+
actions: self.actions.len() as u32,
257+
computed_dependencies: self.computed_dependencies.len() as u32,
258+
}
259+
}
260+
}
261+
262+
#[cfg(test)]
263+
mod tests {
264+
use super::*;
265+
266+
#[tokio::test]
267+
#[cfg_attr(miri, ignore)]
268+
async fn test_extract_composer_telemetry() {
269+
let data = EnqueuedTelemetryData::extract_composer_telemetry(
270+
concat!(env!("CARGO_MANIFEST_DIR"), "/fixtures/installed.json").into(),
271+
)
272+
.await;
273+
assert_eq!(
274+
data,
275+
vec![
276+
data::Dependency {
277+
name: "g1a/composer-test-scenarios".to_string(),
278+
version: None
279+
},
280+
data::Dependency {
281+
name: "datadog/dd-trace".to_string(),
282+
version: Some("dev-master".to_string())
283+
},
284+
]
285+
.into()
286+
);
287+
}
288+
}
289+
290+
//TODO: APMSP-1079 - Add more comprehensive tests for EnqueuedTelemetryData

ddtelemetry/src/data/payload.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub enum Payload {
1212
AppDependenciesLoaded(AppDependenciesLoaded),
1313
AppIntegrationsChange(AppIntegrationsChange),
1414
AppClientConfigurationChange(AppClientConfigurationChange),
15+
AppEndpointsChange(AppEndpointsChange),
1516
AppHeartbeat(#[serde(skip_serializing)] ()),
1617
AppClosing(#[serde(skip_serializing)] ()),
1718
GenerateMetrics(GenerateMetrics),
@@ -29,6 +30,7 @@ impl Payload {
2930
AppDependenciesLoaded(_) => "app-dependencies-loaded",
3031
AppIntegrationsChange(_) => "app-integrations-change",
3132
AppClientConfigurationChange(_) => "app-client-configuration-change",
33+
AppEndpointsChange(_) => "app-endpoints-change",
3234
AppHeartbeat(_) => "app-heartbeat",
3335
AppClosing(_) => "app-closing",
3436
GenerateMetrics(_) => "generate-metrics",

ddtelemetry/src/data/payloads.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ pub struct AppClientConfigurationChange {
6161
pub configuration: Vec<Configuration>,
6262
}
6363

64+
#[derive(Debug, Serialize)]
65+
pub struct AppEndpointsChange {
66+
pub endpoints: Vec<Endpoint>,
67+
}
68+
6469
#[derive(Serialize, Debug)]
6570
pub struct GenerateMetrics {
6671
pub series: Vec<metrics::Serie>,

ddtelemetry/src/worker/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use crate::{
1010
data::{self, Application, Dependency, Endpoint, Host, Integration, Log, Payload, Telemetry},
1111
metrics::{ContextKey, MetricBuckets, MetricContexts},
1212
};
13-
use ddcommon::Endpoint;
1413
use ddcommon::{hyper_migration, tag::Tag, worker::Worker};
1514

1615
use std::fmt::Debug;
@@ -717,7 +716,7 @@ impl TelemetryWorker {
717716
if let Some(endpoint) = self.config.endpoint.as_ref() {
718717
endpoint.timeout_ms
719718
} else {
720-
Endpoint::DEFAULT_TIMEOUT
719+
ddcommon::Endpoint::DEFAULT_TIMEOUT
721720
})) => {
722721
Err(anyhow::anyhow!("Request timed out"))
723722
},

0 commit comments

Comments
 (0)