Skip to content

Commit 81b0e5a

Browse files
authored
move metric emitting to separate file for trace exporter (#1143)
1 parent 923f348 commit 81b0e5a

File tree

2 files changed

+88
-26
lines changed

2 files changed

+88
-26
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::health_metrics::HealthMetric;
5+
use ddcommon::tag::Tag;
6+
use dogstatsd_client::{Client, DogStatsDAction};
7+
use either::Either;
8+
use tracing::debug;
9+
10+
/// Handles emission of health metrics to DogStatsD
11+
#[derive(Debug)]
12+
pub(crate) struct MetricsEmitter<'a> {
13+
dogstatsd: Option<&'a Client>,
14+
common_tags: &'a [Tag],
15+
}
16+
17+
impl<'a> MetricsEmitter<'a> {
18+
/// Create a new MetricsEmitter
19+
pub(crate) fn new(dogstatsd: Option<&'a Client>, common_tags: &'a [Tag]) -> Self {
20+
Self {
21+
dogstatsd,
22+
common_tags,
23+
}
24+
}
25+
26+
/// Emit a health metric to dogstatsd
27+
pub(crate) fn emit(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
28+
let has_custom_tags = custom_tags.is_some();
29+
if let Some(flusher) = self.dogstatsd {
30+
let tags = match custom_tags {
31+
None => Either::Left(self.common_tags),
32+
Some(custom) => Either::Right(self.common_tags.iter().chain(custom)),
33+
};
34+
match metric {
35+
HealthMetric::Count(name, c) => {
36+
debug!(
37+
metric_name = name,
38+
count = c,
39+
has_custom_tags = has_custom_tags,
40+
"Emitting health metric to dogstatsd"
41+
);
42+
flusher.send(vec![DogStatsDAction::Count(name, c, tags.into_iter())])
43+
}
44+
}
45+
} else {
46+
debug!(
47+
metric = ?metric,
48+
"Skipping metric emission - dogstatsd client not configured"
49+
);
50+
}
51+
}
52+
}
53+
54+
// Primary testing is done in the main TraceExporter module for now.
55+
#[cfg(test)]
56+
mod tests {
57+
use super::*;
58+
use ddcommon::tag;
59+
60+
#[test]
61+
fn test_metrics_emitter_new() {
62+
let tags = vec![tag!("service", "test")];
63+
let emitter = MetricsEmitter::new(None, &tags);
64+
65+
assert!(emitter.dogstatsd.is_none());
66+
assert_eq!(emitter.common_tags.len(), 1);
67+
assert_eq!(emitter.common_tags[0], tag!("service", "test"));
68+
}
69+
70+
#[test]
71+
fn test_metrics_emitter_emit_no_client() {
72+
let tags = vec![tag!("env", "test")];
73+
let emitter = MetricsEmitter::new(None, &tags);
74+
75+
// Should not panic when dogstatsd client is None
76+
emitter.emit(HealthMetric::Count("test.metric", 1), None);
77+
emitter.emit(
78+
HealthMetric::Count("test.metric", 5),
79+
Some(vec![&tag!("custom", "tag")]),
80+
);
81+
}
82+
}

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
pub mod agent_response;
44
pub mod builder;
55
pub mod error;
6+
pub mod metrics;
67

78
// Re-export the builder
89
pub use builder::TraceExporterBuilder;
910

1011
use self::agent_response::AgentResponse;
12+
use self::metrics::MetricsEmitter;
1113
use crate::agent_info::{AgentInfoFetcher, ResponseObserver};
1214
use crate::pausable_worker::PausableWorker;
1315
use crate::stats_exporter::StatsExporter;
@@ -39,8 +41,7 @@ use ddcommon::tag::Tag;
3941
use ddcommon::MutexExt;
4042
use ddcommon::{hyper_migration, Endpoint};
4143
use ddtelemetry::worker::TelemetryWorker;
42-
use dogstatsd_client::{Client, DogStatsDAction};
43-
use either::Either;
44+
use dogstatsd_client::Client;
4445
use http_body_util::BodyExt;
4546
use hyper::http::uri::PathAndQuery;
4647
use hyper::{header::CONTENT_TYPE, Method, Uri};
@@ -50,7 +51,7 @@ use std::time::Duration;
5051
use std::{borrow::Borrow, collections::HashMap, str::FromStr, time};
5152
use tokio::runtime::Runtime;
5253
use tokio_util::sync::CancellationToken;
53-
use tracing::{debug, error, info, warn};
54+
use tracing::{error, info, warn};
5455

5556
const DEFAULT_STATS_ELIGIBLE_SPAN_KINDS: [&str; 4] = ["client", "server", "producer", "consumer"];
5657
const STATS_ENDPOINT: &str = "/v0.6/stats";
@@ -846,29 +847,8 @@ impl TraceExporter {
846847

847848
/// Emit a health metric to dogstatsd
848849
fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
849-
let has_custom_tags = custom_tags.is_some();
850-
if let Some(flusher) = &self.dogstatsd {
851-
let tags = match custom_tags {
852-
None => Either::Left(&self.common_stats_tags),
853-
Some(custom) => Either::Right(self.common_stats_tags.iter().chain(custom)),
854-
};
855-
match metric {
856-
HealthMetric::Count(name, c) => {
857-
debug!(
858-
metric_name = name,
859-
count = c,
860-
has_custom_tags = has_custom_tags,
861-
"Emitting health metric to dogstatsd"
862-
);
863-
flusher.send(vec![DogStatsDAction::Count(name, c, tags.into_iter())])
864-
}
865-
}
866-
} else {
867-
debug!(
868-
metric = ?metric,
869-
"Skipping metric emission - dogstatsd client not configured"
870-
);
871-
}
850+
let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
851+
emitter.emit(metric, custom_tags);
872852
}
873853

874854
/// Add all spans from the given iterator into the stats concentrator

0 commit comments

Comments
 (0)