Skip to content

Commit d14c3e6

Browse files
author
Ganesh Jangir
committed
fix: format
# Conflicts: # data-pipeline-ffi/src/trace_exporter.rs # data-pipeline/examples/send-traces-with-stats.rs # ddtelemetry/src/worker/http_client.rs
1 parent e28f43f commit d14c3e6

File tree

3 files changed

+120
-24
lines changed

3 files changed

+120
-24
lines changed

data-pipeline-ffi/src/trace_exporter.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use ddcommon_ffi::{
1111
{slice::AsBytes, slice::ByteSlice},
1212
};
1313
use std::{ptr::NonNull, time::Duration};
14-
use tracing::error;
14+
use tracing::{debug, error};
1515

1616
#[cfg(all(feature = "catch_panic", panic = "unwind"))]
1717
use std::panic::{catch_unwind, AssertUnwindSafe};
@@ -325,16 +325,16 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_enable_telemetry(
325325
catch_panic!(
326326
if let Option::Some(config) = config {
327327
if let Option::Some(telemetry_cfg) = telemetry_cfg {
328-
config.telemetry_cfg = Some(TelemetryConfig {
328+
let cfg = TelemetryConfig {
329329
heartbeat: telemetry_cfg.interval,
330330
runtime_id: match sanitize_string(telemetry_cfg.runtime_id) {
331331
Ok(s) => Some(s),
332332
Err(e) => return Some(e),
333333
},
334334
debug_enabled: telemetry_cfg.debug_enabled,
335-
})
336-
} else {
337-
config.telemetry_cfg = Some(TelemetryConfig::default());
335+
};
336+
debug!(telemetry_cfg = ?cfg, "Configuring telemetry");
337+
config.telemetry_cfg = Some(cfg);
338338
}
339339
None
340340
} else {
@@ -467,7 +467,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
467467
}
468468

469469
if let Some(cfg) = &config.telemetry_cfg {
470-
builder.enable_telemetry(Some(cfg.clone()));
470+
builder.enable_telemetry(cfg.clone());
471471
}
472472

473473
if let Some(token) = &config.test_session_token {
@@ -1080,7 +1080,8 @@ mod tests {
10801080
let mock_metrics = server.mock(|when, then| {
10811081
when.method(POST)
10821082
.path("/telemetry/proxy/api/v2/apmtelemetry")
1083-
.body_contains(r#""runtime_id":"foo""#);
1083+
.body_includes(r#""runtime_id":"foo""#)
1084+
.body_includes(r#""metric":"trace_api."#);
10841085
then.status(200)
10851086
.header("content-type", "application/json")
10861087
.body("");
@@ -1132,8 +1133,8 @@ mod tests {
11321133
);
11331134

11341135
ddog_trace_exporter_free(exporter);
1135-
// It should receive 1 payloads: metrics
1136-
mock_metrics.assert_hits(1);
1136+
// It should receive 1 metrics payload (excluding heartbeats)
1137+
mock_metrics.assert_calls(1);
11371138
}
11381139
}
11391140

data-pipeline/examples/send-traces-with-stats.rs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use clap::Parser;
45
use data_pipeline::trace_exporter::{
5-
TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
6+
TelemetryConfig, TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
7+
};
8+
use datadog_log::logger::{
9+
logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget,
610
};
711
use datadog_trace_protobuf::pb;
812
use std::{
@@ -29,10 +33,31 @@ fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span {
2933
}
3034
}
3135

36+
#[derive(Parser)]
37+
#[command(name = "send-traces-with-stats")]
38+
#[command(about = "A data pipeline example for sending traces with statistics")]
39+
struct Args {
40+
#[arg(
41+
short = 'u',
42+
long = "url",
43+
default_value = "http://localhost:8126",
44+
help = "Set the trace agent URL\n\nExamples:\n http://localhost:8126 (default)\n windows://./pipe/dd-apm-test-agent (Windows named pipe)\n https://trace.agent.datadoghq.com:443 (custom endpoint)"
45+
)]
46+
url: String,
47+
}
48+
3249
fn main() {
50+
logger_configure_std(StdConfig {
51+
target: StdTarget::Out,
52+
})
53+
.expect("Failed to configure logger");
54+
logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level");
55+
56+
let args = Args::parse();
57+
let telemetry_cfg = TelemetryConfig::default();
3358
let mut builder = TraceExporter::builder();
3459
builder
35-
.set_url("http://localhost:8126")
60+
.set_url(&args.url)
3661
.set_hostname("test")
3762
.set_env("testing")
3863
.set_app_version(env!("CARGO_PKG_VERSION"))
@@ -42,20 +67,28 @@ fn main() {
4267
.set_language_version(env!("CARGO_PKG_RUST_VERSION"))
4368
.set_input_format(TraceExporterInputFormat::V04)
4469
.set_output_format(TraceExporterOutputFormat::V04)
70+
.enable_telemetry(telemetry_cfg)
4571
.enable_stats(Duration::from_secs(10));
46-
let exporter = builder.build().unwrap();
47-
let now = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64;
72+
let exporter = builder.build().expect("Failed to build TraceExporter");
73+
let now = UNIX_EPOCH
74+
.elapsed()
75+
.expect("Failed to get time since UNIX_EPOCH")
76+
.as_nanos() as i64;
4877

4978
let mut traces = Vec::new();
50-
for trace_id in 1..=100 {
79+
for trace_id in 1..=2 {
5180
let mut trace = Vec::new();
52-
for span_id in 1..=1000 {
81+
for span_id in 1..=2 {
5382
trace.push(get_span(now, trace_id, span_id));
5483
}
5584
traces.push(trace);
5685
}
57-
let data = rmp_serde::to_vec_named(&traces).unwrap();
86+
let data = rmp_serde::to_vec_named(&traces).expect("Failed to serialize traces");
5887

59-
exporter.send(data.as_ref(), 100).unwrap();
60-
exporter.shutdown(None).unwrap();
88+
exporter
89+
.send(data.as_ref(), 2)
90+
.expect("Failed to send traces");
91+
exporter
92+
.shutdown(None)
93+
.expect("Failed to shutdown exporter");
6194
}

ddtelemetry/src/worker/http_client.rs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::{
1212
};
1313

1414
use crate::config::Config;
15+
use tracing::{debug, error, info, warn};
1516

1617
pub mod header {
1718
#![allow(clippy::declare_interior_mutable_const)]
@@ -35,16 +36,29 @@ pub trait HttpClient {
3536
pub fn request_builder(c: &Config) -> anyhow::Result<HttpRequestBuilder> {
3637
match &c.endpoint {
3738
Some(e) => {
39+
debug!(
40+
endpoint.url = %e.url,
41+
endpoint.timeout_ms = e.timeout_ms,
42+
telemetry.version = env!("CARGO_PKG_VERSION"),
43+
"Building telemetry request"
44+
);
3845
let mut builder =
3946
e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION")));
4047
if c.debug_enabled {
48+
debug!(
49+
telemetry.debug_enabled = true,
50+
"Telemetry debug mode enabled"
51+
);
4152
builder = Ok(builder?.header(header::DEBUG_ENABLED, "true"))
4253
}
4354
builder
4455
}
45-
None => Err(anyhow::Error::msg(
46-
"no valid endpoint found, can't build the request".to_string(),
47-
)),
56+
None => {
57+
error!("No valid telemetry endpoint found, cannot build request");
58+
Err(anyhow::Error::msg(
59+
"no valid endpoint found, can't build the request".to_string(),
60+
))
61+
}
4862
}
4963
}
5064

@@ -54,6 +68,10 @@ pub fn from_config(c: &Config) -> Box<dyn HttpClient + Sync + Send> {
5468
#[allow(clippy::expect_used)]
5569
let file_path = ddcommon::decode_uri_path_in_authority(&e.url)
5670
.expect("file urls should always have been encoded in authority");
71+
info!(
72+
file.path = ?file_path,
73+
"Using file-based mock telemetry client"
74+
);
5775
return Box::new(MockClient {
5876
#[allow(clippy::expect_used)]
5977
file: Arc::new(Mutex::new(Box::new(
@@ -65,7 +83,19 @@ pub fn from_config(c: &Config) -> Box<dyn HttpClient + Sync + Send> {
6583
))),
6684
});
6785
}
68-
Some(_) | None => {}
86+
Some(e) => {
87+
info!(
88+
endpoint.url = %e.url,
89+
endpoint.timeout_ms = e.timeout_ms,
90+
"Using HTTP telemetry client"
91+
);
92+
}
93+
None => {
94+
warn!(
95+
endpoint = "default",
96+
"No telemetry endpoint configured, using default HTTP client"
97+
);
98+
}
6999
};
70100
Box::new(HyperClient {
71101
inner: hyper_migration::new_client_periodic(),
@@ -78,8 +108,26 @@ pub struct HyperClient {
78108

79109
impl HttpClient for HyperClient {
80110
fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture {
111+
debug!("Sending HTTP request via HyperClient");
81112
let resp = self.inner.request(req);
82-
Box::pin(async { Ok(hyper_migration::into_response(resp.await?)) })
113+
Box::pin(async move {
114+
match resp.await {
115+
Ok(response) => {
116+
debug!(
117+
http.status = response.status().as_u16(),
118+
"HTTP request completed successfully"
119+
);
120+
Ok(hyper_migration::into_response(response))
121+
}
122+
Err(e) => {
123+
error!(
124+
error = %e,
125+
"HTTP request failed"
126+
);
127+
Err(e.into())
128+
}
129+
}
130+
})
83131
}
84132
}
85133

@@ -92,16 +140,30 @@ impl HttpClient for MockClient {
92140
fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture {
93141
let s = self.clone();
94142
Box::pin(async move {
143+
debug!("MockClient writing request to file");
95144
let mut body = req.collect().await?.to_bytes().to_vec();
96145
body.push(b'\n');
97146

98147
{
99148
#[allow(clippy::expect_used)]
100149
let mut writer = s.file.lock().expect("mutex poisoned");
101150

102-
writer.write_all(body.as_ref())?;
151+
match writer.write_all(body.as_ref()) {
152+
Ok(()) => debug!(
153+
file.bytes_written = body.len(),
154+
"Successfully wrote payload to mock file"
155+
),
156+
Err(e) => {
157+
error!(
158+
error = %e,
159+
"Failed to write to mock file"
160+
);
161+
return Err(e.into());
162+
}
163+
}
103164
}
104165

166+
debug!(http.status = 202, "MockClient returning success response");
105167
hyper_migration::empty_response(hyper::Response::builder().status(202))
106168
})
107169
}

0 commit comments

Comments
 (0)