diff --git a/Cargo.lock b/Cargo.lock index 073c14f257..e87297f9d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1399,8 +1399,10 @@ dependencies = [ "anyhow", "arc-swap", "bytes", + "clap", "criterion", "datadog-ddsketch", + "datadog-log", "datadog-trace-protobuf", "datadog-trace-utils", "ddcommon", diff --git a/LICENSE-3rdparty.yml b/LICENSE-3rdparty.yml index 61a346d4d6..987dd77e2a 100644 --- a/LICENSE-3rdparty.yml +++ b/LICENSE-3rdparty.yml @@ -1,4 +1,4 @@ -root_name: builder, build_common, tools, datadog-alloc, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, datadog-crashtracker-ffi, ddcommon-ffi, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, tinybytes, spawn_worker, cc_utils, datadog-library-config, datadog-library-config-ffi, datadog-live-debugger, datadog-live-debugger-ffi, datadog-profiling, datadog-profiling-protobuf, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-utils, datadog-trace-normalization, dogstatsd-client, datadog-log-ffi, datadog-log, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, datadog-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib +root_name: builder, build_common, tools, datadog-alloc, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, datadog-crashtracker-ffi, ddcommon-ffi, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, tinybytes, spawn_worker, cc_utils, datadog-library-config, datadog-library-config-ffi, datadog-live-debugger, datadog-live-debugger-ffi, datadog-profiling, datadog-profiling-protobuf, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-utils, datadog-trace-normalization, dogstatsd-client, datadog-log, datadog-log-ffi, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, datadog-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib third_party_libraries: - package_name: addr2line package_version: 0.24.2 diff --git a/data-pipeline-ffi/src/trace_exporter.rs b/data-pipeline-ffi/src/trace_exporter.rs index 10225c87c6..ba15cd5fff 100644 --- a/data-pipeline-ffi/src/trace_exporter.rs +++ b/data-pipeline-ffi/src/trace_exporter.rs @@ -11,7 +11,7 @@ use ddcommon_ffi::{ {slice::AsBytes, slice::ByteSlice}, }; use std::{ptr::NonNull, time::Duration}; -use tracing::error; +use tracing::{debug, error}; #[cfg(all(feature = "catch_panic", panic = "unwind"))] use std::panic::{catch_unwind, AssertUnwindSafe}; @@ -325,16 +325,16 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_enable_telemetry( catch_panic!( if let Option::Some(config) = config { if let Option::Some(telemetry_cfg) = telemetry_cfg { - config.telemetry_cfg = Some(TelemetryConfig { + let cfg = TelemetryConfig { heartbeat: telemetry_cfg.interval, runtime_id: match sanitize_string(telemetry_cfg.runtime_id) { Ok(s) => Some(s), Err(e) => return Some(e), }, debug_enabled: telemetry_cfg.debug_enabled, - }) - } else { - config.telemetry_cfg = Some(TelemetryConfig::default()); + }; + debug!(telemetry_cfg = ?cfg, "Configuring telemetry"); + config.telemetry_cfg = Some(cfg); } None } else { @@ -467,7 +467,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( } if let Some(cfg) = &config.telemetry_cfg { - builder.enable_telemetry(Some(cfg.clone())); + builder.enable_telemetry(cfg.clone()); } if let Some(token) = &config.test_session_token { @@ -795,8 +795,7 @@ mod tests { let mut cfg = TraceExporterConfig::default(); let error = ddog_trace_exporter_config_enable_telemetry(Some(&mut cfg), None); assert!(error.is_none()); - assert_eq!(cfg.telemetry_cfg.as_ref().unwrap().heartbeat, 0); - assert!(cfg.telemetry_cfg.as_ref().unwrap().runtime_id.is_none()); + assert!(cfg.telemetry_cfg.is_none()); let mut cfg = TraceExporterConfig::default(); let error = ddog_trace_exporter_config_enable_telemetry( diff --git a/data-pipeline/Cargo.toml b/data-pipeline/Cargo.toml index 538681fc30..a5bf65585d 100644 --- a/data-pipeline/Cargo.toml +++ b/data-pipeline/Cargo.toml @@ -51,6 +51,8 @@ harness = false path = "benches/main.rs" [dev-dependencies] +datadog-log = { path = "../datadog-log" } +clap = { version = "4.0", features = ["derive"] } criterion = "0.5.1" datadog-trace-utils = { path = "../datadog-trace-utils", features = ["test-utils"] } httpmock = "0.7.0" diff --git a/data-pipeline/examples/send-traces-with-stats.rs b/data-pipeline/examples/send-traces-with-stats.rs index 3cdb766347..e6a9286210 100644 --- a/data-pipeline/examples/send-traces-with-stats.rs +++ b/data-pipeline/examples/send-traces-with-stats.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use data_pipeline::trace_exporter::{ - TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat, + TelemetryConfig, TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat, +}; +use datadog_log::logger::{ + logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget, }; use datadog_trace_protobuf::pb; use std::{ @@ -30,6 +33,13 @@ fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span { } fn main() { + logger_configure_std(StdConfig { + target: StdTarget::Out, + }) + .expect("Failed to configure logger"); + logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level"); + + let telemetry_cfg = TelemetryConfig::default(); let mut builder = TraceExporter::builder(); builder .set_url("http://localhost:8126") @@ -42,20 +52,28 @@ fn main() { .set_language_version(env!("CARGO_PKG_RUST_VERSION")) .set_input_format(TraceExporterInputFormat::V04) .set_output_format(TraceExporterOutputFormat::V04) + .enable_telemetry(telemetry_cfg) .enable_stats(Duration::from_secs(10)); - let exporter = builder.build().unwrap(); - let now = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64; + let exporter = builder.build().expect("Failed to build TraceExporter"); + let now = UNIX_EPOCH + .elapsed() + .expect("Failed to get time since UNIX_EPOCH") + .as_nanos() as i64; let mut traces = Vec::new(); - for trace_id in 1..=100 { + for trace_id in 1..=2 { let mut trace = Vec::new(); - for span_id in 1..=1000 { + for span_id in 1..=2 { trace.push(get_span(now, trace_id, span_id)); } traces.push(trace); } - let data = rmp_serde::to_vec_named(&traces).unwrap(); + let data = rmp_serde::to_vec_named(&traces).expect("Failed to serialize traces"); - exporter.send(data.as_ref(), 100).unwrap(); - exporter.shutdown(None).unwrap(); + exporter + .send(data.as_ref(), 2) + .expect("Failed to send traces"); + exporter + .shutdown(None) + .expect("Failed to shutdown exporter"); } diff --git a/data-pipeline/src/span_concentrator/aggregation.rs b/data-pipeline/src/span_concentrator/aggregation.rs index e47163935b..cfa4e091d1 100644 --- a/data-pipeline/src/span_concentrator/aggregation.rs +++ b/data-pipeline/src/span_concentrator/aggregation.rs @@ -52,11 +52,11 @@ pub(super) struct BorrowedAggregationKey<'a> { /// the key type `K` implements `Borrow`. Since `AggregationKey<'static>` cannot implement /// `Borrow>` we use `dyn BorrowableAggregationKey` as a placeholder. trait BorrowableAggregationKey { - fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey; + fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_>; } impl BorrowableAggregationKey for AggregationKey<'_> { - fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey { + fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_> { BorrowedAggregationKey { resource_name: self.resource_name.borrow(), service_name: self.service_name.borrow(), @@ -76,7 +76,7 @@ impl BorrowableAggregationKey for AggregationKey<'_> { } impl BorrowableAggregationKey for BorrowedAggregationKey<'_> { - fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey { + fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_> { self.clone() } } @@ -90,16 +90,16 @@ where } } -impl Eq for (dyn BorrowableAggregationKey + '_) {} +impl Eq for dyn BorrowableAggregationKey + '_ {} -impl PartialEq for (dyn BorrowableAggregationKey + '_) { +impl PartialEq for dyn BorrowableAggregationKey + '_ { fn eq(&self, other: &dyn BorrowableAggregationKey) -> bool { self.borrowed_aggregation_key() .eq(&other.borrowed_aggregation_key()) } } -impl std::hash::Hash for (dyn BorrowableAggregationKey + '_) { +impl std::hash::Hash for dyn BorrowableAggregationKey + '_ { fn hash(&self, state: &mut H) { self.borrowed_aggregation_key().hash(state) } diff --git a/data-pipeline/src/trace_exporter/builder.rs b/data-pipeline/src/trace_exporter/builder.rs index bfae8472f2..48b1773a14 100644 --- a/data-pipeline/src/trace_exporter/builder.rs +++ b/data-pipeline/src/trace_exporter/builder.rs @@ -193,12 +193,8 @@ impl TraceExporterBuilder { } /// Enables sending telemetry metrics. - pub fn enable_telemetry(&mut self, cfg: Option) -> &mut Self { - if let Some(cfg) = cfg { - self.telemetry = Some(cfg); - } else { - self.telemetry = Some(TelemetryConfig::default()); - } + pub fn enable_telemetry(&mut self, cfg: TelemetryConfig) -> &mut Self { + self.telemetry = Some(cfg); self } @@ -383,11 +379,11 @@ mod tests { .set_input_format(TraceExporterInputFormat::Proxy) .set_output_format(TraceExporterOutputFormat::V04) .set_client_computed_stats() - .enable_telemetry(Some(TelemetryConfig { + .enable_telemetry(TelemetryConfig { heartbeat: 1000, runtime_id: None, debug_enabled: false, - })); + }); let exporter = builder.build().unwrap(); assert_eq!( diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index 203af9009f..da5a37e2a3 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -1078,10 +1078,10 @@ mod tests { }; if enable_telemetry { - builder.enable_telemetry(Some(TelemetryConfig { + builder.enable_telemetry(TelemetryConfig { heartbeat: 100, ..Default::default() - })); + }); } builder.build().unwrap() @@ -1617,10 +1617,10 @@ mod tests { .set_language("nodejs") .set_language_version("1.0") .set_language_interpreter("v8") - .enable_telemetry(Some(TelemetryConfig { + .enable_telemetry(TelemetryConfig { heartbeat: 100, ..Default::default() - })); + }); let exporter = builder.build().unwrap(); let traces = vec![0x90]; @@ -1741,10 +1741,10 @@ mod tests { .set_language("nodejs") .set_language_version("1.0") .set_language_interpreter("v8") - .enable_telemetry(Some(TelemetryConfig { + .enable_telemetry(TelemetryConfig { heartbeat: 100, ..Default::default() - })) + }) .set_input_format(TraceExporterInputFormat::V04) .set_output_format(TraceExporterOutputFormat::V05); diff --git a/datadog-crashtracker/src/collector/emitters.rs b/datadog-crashtracker/src/collector/emitters.rs index 248f8738d9..276e40beab 100644 --- a/datadog-crashtracker/src/collector/emitters.rs +++ b/datadog-crashtracker/src/collector/emitters.rs @@ -71,35 +71,34 @@ unsafe fn emit_backtrace_by_frames( } if resolve_frames == StacktraceCollection::EnabledWithInprocessSymbols { backtrace::resolve_frame_unsynchronized(frame, |symbol| { - write!(w, "{{").unwrap(); - #[allow(clippy::unwrap_used)] - emit_absolute_addresses(w, frame).unwrap(); + // In crash handling context, we need to be resilient to write failures + // If any write fails, we continue to try the next operations + let _ = write!(w, "{{"); + let _ = emit_absolute_addresses(w, frame); if let Some(column) = symbol.colno() { - write!(w, ", \"column\": {column}").unwrap(); + let _ = write!(w, ", \"column\": {column}"); } if let Some(file) = symbol.filename() { // The debug printer for path already wraps it in `"` marks. - write!(w, ", \"file\": {file:?}").unwrap(); + let _ = write!(w, ", \"file\": {file:?}"); } if let Some(function) = symbol.name() { - write!(w, ", \"function\": \"{function}\"").unwrap(); + let _ = write!(w, ", \"function\": \"{function}\""); } if let Some(line) = symbol.lineno() { - write!(w, ", \"line\": {line}").unwrap(); + let _ = write!(w, ", \"line\": {line}"); } - writeln!(w, "}}").unwrap(); + let _ = writeln!(w, "}}"); // Flush eagerly to ensure that each frame gets emitted even if the next one fails - #[allow(clippy::unwrap_used)] - w.flush().unwrap(); + let _ = w.flush(); }); } else { - write!(w, "{{").unwrap(); - #[allow(clippy::unwrap_used)] - emit_absolute_addresses(w, frame).unwrap(); - writeln!(w, "}}").unwrap(); + // In crash handling context, we need to be resilient to write failures + let _ = write!(w, "{{"); + let _ = emit_absolute_addresses(w, frame); + let _ = writeln!(w, "}}"); // Flush eagerly to ensure that each frame gets emitted even if the next one fails - #[allow(clippy::unwrap_used)] - w.flush().unwrap(); + let _ = w.flush(); } true // keep going to the next frame }); diff --git a/datadog-ipc/src/platform/unix/platform_handle.rs b/datadog-ipc/src/platform/unix/platform_handle.rs index bccb80a7f8..a8f40da84d 100644 --- a/datadog-ipc/src/platform/unix/platform_handle.rs +++ b/datadog-ipc/src/platform/unix/platform_handle.rs @@ -48,7 +48,7 @@ impl PlatformHandle where T: SocketlikeViewType, { - pub fn as_socketlike_view(&self) -> io::Result> { + pub fn as_socketlike_view(&self) -> io::Result> { Ok(self.as_owned_fd()?.as_socketlike_view()) } } diff --git a/datadog-ipc/src/rate_limiter.rs b/datadog-ipc/src/rate_limiter.rs index b647f5460e..87b27ce17c 100644 --- a/datadog-ipc/src/rate_limiter.rs +++ b/datadog-ipc/src/rate_limiter.rs @@ -207,7 +207,7 @@ impl Debug for ShmLimiter { } impl ShmLimiter { - fn limiter(&self) -> &ShmLimiterData { + fn limiter(&self) -> &ShmLimiterData<'_, Inner> { #[allow(clippy::unwrap_used)] unsafe { &*self diff --git a/datadog-live-debugger/src/expr_defs.rs b/datadog-live-debugger/src/expr_defs.rs index d614f59bdd..248e139d89 100644 --- a/datadog-live-debugger/src/expr_defs.rs +++ b/datadog-live-debugger/src/expr_defs.rs @@ -141,14 +141,14 @@ impl Display for Condition { Condition::Never => f.write_str("false"), Condition::Disjunction(b) => { let (x, y) = &**b; - fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp { + fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp<'_> { NonAssocBoolOp(condition, matches!(condition, Condition::Conjunction(_))) } write!(f, "{} || {}", is_nonassoc(x), is_nonassoc(y)) } Condition::Conjunction(b) => { let (x, y) = &**b; - fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp { + fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp<'_> { NonAssocBoolOp(condition, matches!(condition, Condition::Disjunction(_))) } write!(f, "{} && {}", is_nonassoc(x), is_nonassoc(y)) diff --git a/datadog-profiling/src/api.rs b/datadog-profiling/src/api.rs index 8f7a6bf944..ccca3086e9 100644 --- a/datadog-profiling/src/api.rs +++ b/datadog-profiling/src/api.rs @@ -290,7 +290,7 @@ fn string_table_fetch(pprof: &prost_impls::Profile, id: i64) -> anyhow::Result<& .ok_or_else(|| anyhow::anyhow!("String {id} was not found.")) } -fn mapping_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result { +fn mapping_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result> { if id == 0 { return Ok(Mapping::default()); } @@ -307,7 +307,7 @@ fn mapping_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result anyhow::Result { +fn function_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result> { if id == 0 { return Ok(Function::default()); } @@ -322,7 +322,7 @@ fn function_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result anyhow::Result { +fn location_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result> { if id == 0 { return Ok(Location::default()); } diff --git a/datadog-remote-config/src/fetch/fetcher.rs b/datadog-remote-config/src/fetch/fetcher.rs index 1c5537de2a..d98b6a6e1a 100644 --- a/datadog-remote-config/src/fetch/fetcher.rs +++ b/datadog-remote-config/src/fetch/fetcher.rs @@ -150,7 +150,7 @@ impl ConfigFetcherState { /// - This files_lock() must always be called prior to locking any data structure locked within /// FileStorage::store(). /// - Also, files_lock() must not be called from within FileStorage::store(). - pub fn files_lock(&self) -> ConfigFetcherFilesLock { + pub fn files_lock(&self) -> ConfigFetcherFilesLock<'_, S> { assert!(!self.expire_unused_files); ConfigFetcherFilesLock { inner: self.target_files_by_path.lock_or_panic(), diff --git a/datadog-remote-config/src/file_storage.rs b/datadog-remote-config/src/file_storage.rs index 7ec7ba56c5..b14dde1442 100644 --- a/datadog-remote-config/src/file_storage.rs +++ b/datadog-remote-config/src/file_storage.rs @@ -58,7 +58,7 @@ impl

Deref for RawFileContentsGuard<'_, P> { impl

RawFile

{ /// Gets the contents behind a Deref impl (guarding a Mutex). - pub fn contents(&self) -> RawFileContentsGuard

{ + pub fn contents(&self) -> RawFileContentsGuard<'_, P> { RawFileContentsGuard(self.data.lock_or_panic()) } diff --git a/datadog-remote-config/src/path.rs b/datadog-remote-config/src/path.rs index d17e4bea25..4a88905c6e 100644 --- a/datadog-remote-config/src/path.rs +++ b/datadog-remote-config/src/path.rs @@ -59,7 +59,7 @@ pub struct RemoteConfigPathRef<'a> { } impl RemoteConfigPath { - pub fn try_parse(path: &str) -> anyhow::Result { + pub fn try_parse(path: &str) -> anyhow::Result> { let parts: Vec<_> = path.split('/').collect(); Ok(RemoteConfigPathRef { source: match parts[0] { diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index e6b10b330a..9452a89484 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -896,7 +896,7 @@ pub unsafe extern "C" fn ddog_sidecar_set_universal_service_tags( #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn ddog_sidecar_dump( transport: &mut Box, -) -> ffi::CharSlice { +) -> ffi::CharSlice<'_> { let str = match blocking::dump(transport) { Ok(dump) => dump, Err(e) => format!("{e:?}"), @@ -913,7 +913,7 @@ pub unsafe extern "C" fn ddog_sidecar_dump( #[allow(clippy::missing_safety_doc)] pub unsafe extern "C" fn ddog_sidecar_stats( transport: &mut Box, -) -> ffi::CharSlice { +) -> ffi::CharSlice<'_> { let str = match blocking::stats(transport) { Ok(stats) => stats, Err(e) => format!("{e:?}"), diff --git a/datadog-sidecar/src/log.rs b/datadog-sidecar/src/log.rs index bae3d103ec..34bd68f86d 100644 --- a/datadog-sidecar/src/log.rs +++ b/datadog-sidecar/src/log.rs @@ -172,7 +172,7 @@ impl MultiEnvFilter { } } - pub fn add(&self, key: String) -> TemporarilyRetainedMapGuard { + pub fn add(&self, key: String) -> TemporarilyRetainedMapGuard<'_, String, EnvFilter> { self.map.add(key) } diff --git a/datadog-sidecar/src/service/runtime_info.rs b/datadog-sidecar/src/service/runtime_info.rs index e1f84c7a94..7b8216092e 100644 --- a/datadog-sidecar/src/service/runtime_info.rs +++ b/datadog-sidecar/src/service/runtime_info.rs @@ -58,7 +58,7 @@ impl RuntimeInfo { /// /// * `MutexGuard>` - A mutable reference to the /// applications map. - pub(crate) fn lock_applications(&self) -> MutexGuard> { + pub(crate) fn lock_applications(&self) -> MutexGuard<'_, HashMap> { self.applications.lock_or_panic() } } diff --git a/datadog-sidecar/src/service/session_info.rs b/datadog-sidecar/src/service/session_info.rs index 9fabcbc616..2f09890d08 100644 --- a/datadog-sidecar/src/service/session_info.rs +++ b/datadog-sidecar/src/service/session_info.rs @@ -144,11 +144,13 @@ impl SessionInfo { } } - pub(crate) fn lock_runtimes(&self) -> MutexGuard> { + pub(crate) fn lock_runtimes(&self) -> MutexGuard<'_, HashMap> { self.runtimes.lock_or_panic() } - pub(crate) fn get_telemetry_config(&self) -> MutexGuard> { + pub(crate) fn get_telemetry_config( + &self, + ) -> MutexGuard<'_, Option> { let mut cfg = self.session_config.lock_or_panic(); if (*cfg).is_none() { @@ -167,7 +169,7 @@ impl SessionInfo { } } - pub(crate) fn get_trace_config(&self) -> MutexGuard { + pub(crate) fn get_trace_config(&self) -> MutexGuard<'_, tracer::Config> { self.tracer_config.lock_or_panic() } @@ -178,7 +180,7 @@ impl SessionInfo { f(&mut self.get_trace_config()); } - pub(crate) fn get_dogstatsd(&self) -> MutexGuard> { + pub(crate) fn get_dogstatsd(&self) -> MutexGuard<'_, Option> { self.dogstatsd.lock_or_panic() } @@ -189,7 +191,7 @@ impl SessionInfo { f(&mut self.get_dogstatsd()); } - pub fn get_debugger_config(&self) -> MutexGuard { + pub fn get_debugger_config(&self) -> MutexGuard<'_, datadog_live_debugger::sender::Config> { self.debugger_config.lock_or_panic() } @@ -204,7 +206,7 @@ impl SessionInfo { *self.remote_config_invariants.lock_or_panic() = Some(invariants); } - pub fn get_remote_config_invariants(&self) -> MutexGuard> { + pub fn get_remote_config_invariants(&self) -> MutexGuard<'_, Option> { self.remote_config_invariants.lock_or_panic() } diff --git a/datadog-trace-obfuscation/src/redis_tokenizer.rs b/datadog-trace-obfuscation/src/redis_tokenizer.rs index bacee1ed21..0707b63aa4 100644 --- a/datadog-trace-obfuscation/src/redis_tokenizer.rs +++ b/datadog-trace-obfuscation/src/redis_tokenizer.rs @@ -21,7 +21,7 @@ pub struct RedisTokenizerScanResult<'a> { } impl<'a> RedisTokenizer<'a> { - pub fn new(query: &str) -> RedisTokenizer { + pub fn new(query: &str) -> RedisTokenizer<'_> { let mut s = RedisTokenizer { data: query, offset: 0, diff --git a/datadog-trace-utils/src/msgpack_decoder/v04/mod.rs b/datadog-trace-utils/src/msgpack_decoder/v04/mod.rs index 19c4bc0084..09d4541dde 100644 --- a/datadog-trace-utils/src/msgpack_decoder/v04/mod.rs +++ b/datadog-trace-utils/src/msgpack_decoder/v04/mod.rs @@ -109,7 +109,7 @@ pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize) /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("test-span", decoded_span.name); /// ``` -pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { +pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| { DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned()) })?; diff --git a/datadog-trace-utils/src/msgpack_decoder/v05/mod.rs b/datadog-trace-utils/src/msgpack_decoder/v05/mod.rs index 5a32e3e0e8..f94d74d750 100644 --- a/datadog-trace-utils/src/msgpack_decoder/v05/mod.rs +++ b/datadog-trace-utils/src/msgpack_decoder/v05/mod.rs @@ -139,7 +139,7 @@ pub fn from_bytes(data: tinybytes::Bytes) -> Result<(Vec>, usize) /// let decoded_span = &decoded_traces[0][0]; /// assert_eq!("", decoded_span.name); /// ``` -pub fn from_slice(mut data: &[u8]) -> Result<(Vec>, usize), DecodeError> { +pub fn from_slice(mut data: &[u8]) -> Result<(Vec>>, usize), DecodeError> { let data_elem = rmp::decode::read_array_len(&mut data) .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?; diff --git a/ddcommon-ffi/src/array_queue.rs b/ddcommon-ffi/src/array_queue.rs index 17f3334430..49a3d9c829 100644 --- a/ddcommon-ffi/src/array_queue.rs +++ b/ddcommon-ffi/src/array_queue.rs @@ -327,11 +327,11 @@ mod tests { assert!(matches!(result, ArrayQueuePushResult::Ok)); let result = ddog_ArrayQueue_push(queue, item2_ptr as *mut c_void); assert!( - matches!(result, ArrayQueuePushResult::Full(ptr) if ptr == item2_ptr as *mut c_void) + matches!(result, ArrayQueuePushResult::Full(ptr) if std::ptr::eq(ptr, item2_ptr as *mut c_void)) ); let result = ddog_ArrayQueue_pop(queue); assert!( - matches!(result, ArrayQueuePopResult::Ok(ptr) if ptr == item_ptr as *mut c_void) + matches!(result, ArrayQueuePopResult::Ok(ptr) if std::ptr::eq(ptr, item_ptr as *mut c_void)) ); let item_ptr = match result { ArrayQueuePopResult::Ok(ptr) => ptr, diff --git a/ddcommon-ffi/src/error.rs b/ddcommon-ffi/src/error.rs index bf147f7c94..2c60b1b822 100644 --- a/ddcommon-ffi/src/error.rs +++ b/ddcommon-ffi/src/error.rs @@ -96,7 +96,7 @@ pub unsafe extern "C" fn ddog_Error_drop(error: Option<&mut Error>) { /// # Safety /// Only pass null or a valid reference to a `ddog_Error`. #[no_mangle] -pub unsafe extern "C" fn ddog_Error_message(error: Option<&Error>) -> CharSlice { +pub unsafe extern "C" fn ddog_Error_message(error: Option<&Error>) -> CharSlice<'_> { match error { None => CharSlice::empty(), Some(err) => CharSlice::from(err.as_ref()), diff --git a/ddcommon-ffi/src/string.rs b/ddcommon-ffi/src/string.rs index 330e2a9f9f..b270aced27 100644 --- a/ddcommon-ffi/src/string.rs +++ b/ddcommon-ffi/src/string.rs @@ -65,7 +65,7 @@ pub unsafe extern "C" fn ddog_StringWrapper_drop(s: Option<&mut StringWrapper>) /// Only pass null or a valid reference to a `ddog_StringWrapper`. /// The string should not be mutated nor dropped while the CharSlice is alive. #[no_mangle] -pub unsafe extern "C" fn ddog_StringWrapper_message(s: Option<&StringWrapper>) -> CharSlice { +pub unsafe extern "C" fn ddog_StringWrapper_message(s: Option<&StringWrapper>) -> CharSlice<'_> { match s { None => CharSlice::empty(), Some(s) => CharSlice::from(s.as_ref()), diff --git a/ddcommon-ffi/src/vec.rs b/ddcommon-ffi/src/vec.rs index 60a0893227..7953a6618a 100644 --- a/ddcommon-ffi/src/vec.rs +++ b/ddcommon-ffi/src/vec.rs @@ -75,7 +75,11 @@ impl From> for Vec { impl From for Vec { fn from(err: anyhow::Error) -> Self { let mut vec = vec![]; - write!(vec, "{err}").expect("write to vec to always succeed"); + // Writing to a Vec should never fail in practice, but we handle it gracefully + if write!(vec, "{err}").is_err() { + // Fallback to a basic error message if formatting fails + vec = b"Error formatting failed".to_vec(); + } Self::from(vec) } } @@ -105,7 +109,7 @@ impl Vec { self.replace(vec); } - pub fn as_slice(&self) -> Slice { + pub fn as_slice(&self) -> Slice<'_, T> { unsafe { Slice::from_raw_parts(self.ptr, self.len) } } diff --git a/ddtelemetry/Cargo.toml b/ddtelemetry/Cargo.toml index f8bf1ecbc1..44d953bee8 100644 --- a/ddtelemetry/Cargo.toml +++ b/ddtelemetry/Cargo.toml @@ -10,7 +10,7 @@ license.workspace = true bench = false [features] -default = [] +default = ["tracing"] tracing = ["tracing/std"] [dependencies] diff --git a/ddtelemetry/src/config.rs b/ddtelemetry/src/config.rs index ce63e3b24f..b224d861cc 100644 --- a/ddtelemetry/src/config.rs +++ b/ddtelemetry/src/config.rs @@ -4,6 +4,7 @@ use ddcommon::{config::parse_env, parse_uri, Endpoint}; use http::{uri::PathAndQuery, Uri}; use std::{borrow::Cow, time::Duration}; +use tracing::debug; pub const DEFAULT_DD_SITE: &str = "datadoghq.com"; pub const PROD_INTAKE_SUBDOMAIN: &str = "instrumentation-telemetry-intake"; @@ -119,6 +120,10 @@ impl Settings { const _DD_SHARED_LIB_DEBUG: &'static str = "_DD_SHARED_LIB_DEBUG"; pub fn from_env() -> Self { + debug!( + config.source = "environment", + "Loading telemetry settings from environment variables" + ); let default = Self::default(); Self { agent_host: parse_env::str_not_empty(Self::DD_AGENT_HOST), diff --git a/ddtelemetry/src/lib.rs b/ddtelemetry/src/lib.rs index 2d467c9a07..efc1992bab 100644 --- a/ddtelemetry/src/lib.rs +++ b/ddtelemetry/src/lib.rs @@ -10,6 +10,7 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] use ddcommon::entity_id; +use tracing::debug; pub mod config; pub mod data; @@ -18,11 +19,24 @@ pub mod metrics; pub mod worker; pub fn build_host() -> data::Host { + debug!("Building telemetry host information"); + let hostname = info::os::real_hostname().unwrap_or_else(|_| String::from("unknown_hostname")); + let container_id = entity_id::get_container_id().map(|f| f.to_string()); + let os_version = info::os::os_version().ok(); + + debug!( + host.hostname = %hostname, + host.container_id = ?container_id, + host.os = info::os::os_name(), + host.os_version = ?os_version, + "Built telemetry host information" + ); + data::Host { - hostname: info::os::real_hostname().unwrap_or_else(|_| String::from("unknown_hostname")), - container_id: entity_id::get_container_id().map(|f| f.to_string()), + hostname, + container_id, os: Some(String::from(info::os::os_name())), - os_version: info::os::os_version().ok(), + os_version, kernel_name: None, kernel_release: None, kernel_version: None, diff --git a/ddtelemetry/src/worker/http_client.rs b/ddtelemetry/src/worker/http_client.rs index cffc9c7303..56f6d73170 100644 --- a/ddtelemetry/src/worker/http_client.rs +++ b/ddtelemetry/src/worker/http_client.rs @@ -12,6 +12,7 @@ use std::{ }; use crate::config::Config; +use tracing::{debug, error, info, warn}; pub mod header { #![allow(clippy::declare_interior_mutable_const)] @@ -35,16 +36,29 @@ pub trait HttpClient { pub fn request_builder(c: &Config) -> anyhow::Result { match &c.endpoint { Some(e) => { + debug!( + endpoint.url = %e.url, + endpoint.timeout_ms = e.timeout_ms, + telemetry.version = env!("CARGO_PKG_VERSION"), + "Building telemetry request" + ); let mut builder = e.to_request_builder(concat!("telemetry/", env!("CARGO_PKG_VERSION"))); if c.debug_enabled { + debug!( + telemetry.debug_enabled = true, + "Telemetry debug mode enabled" + ); builder = Ok(builder?.header(header::DEBUG_ENABLED, "true")) } builder } - None => Err(anyhow::Error::msg( - "no valid endpoint found, can't build the request".to_string(), - )), + None => { + error!("No valid telemetry endpoint found, cannot build request"); + Err(anyhow::Error::msg( + "no valid endpoint found, can't build the request".to_string(), + )) + } } } @@ -54,6 +68,10 @@ pub fn from_config(c: &Config) -> Box { #[allow(clippy::expect_used)] let file_path = ddcommon::decode_uri_path_in_authority(&e.url) .expect("file urls should always have been encoded in authority"); + info!( + file.path = ?file_path, + "Using file-based mock telemetry client" + ); return Box::new(MockClient { #[allow(clippy::expect_used)] file: Arc::new(Mutex::new(Box::new( @@ -65,7 +83,19 @@ pub fn from_config(c: &Config) -> Box { ))), }); } - Some(_) | None => {} + Some(e) => { + info!( + endpoint.url = %e.url, + endpoint.timeout_ms = e.timeout_ms, + "Using HTTP telemetry client" + ); + } + None => { + warn!( + endpoint = "default", + "No telemetry endpoint configured, using default HTTP client" + ); + } }; Box::new(HyperClient { inner: hyper_migration::new_client_periodic(), @@ -78,8 +108,26 @@ pub struct HyperClient { impl HttpClient for HyperClient { fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture { + debug!("Sending HTTP request via HyperClient"); let resp = self.inner.request(req); - Box::pin(async { Ok(hyper_migration::into_response(resp.await?)) }) + Box::pin(async move { + match resp.await { + Ok(response) => { + debug!( + http.status = response.status().as_u16(), + "HTTP request completed successfully" + ); + Ok(hyper_migration::into_response(response)) + } + Err(e) => { + error!( + error = %e, + "HTTP request failed" + ); + Err(e.into()) + } + } + }) } } @@ -92,6 +140,7 @@ impl HttpClient for MockClient { fn request(&self, req: hyper_migration::HttpRequest) -> ResponseFuture { let s = self.clone(); Box::pin(async move { + debug!("MockClient writing request to file"); let mut body = req.collect().await?.to_bytes().to_vec(); body.push(b'\n'); @@ -99,9 +148,22 @@ impl HttpClient for MockClient { #[allow(clippy::expect_used)] let mut writer = s.file.lock().expect("mutex poisoned"); - writer.write_all(body.as_ref())?; + match writer.write_all(body.as_ref()) { + Ok(()) => debug!( + file.bytes_written = body.len(), + "Successfully wrote payload to mock file" + ), + Err(e) => { + error!( + error = %e, + "Failed to write to mock file" + ); + return Err(e.into()); + } + } } + debug!(http.status = 202, "MockClient returning success response"); hyper_migration::empty_response(hyper::Response::builder().status(202)) }) } diff --git a/ddtelemetry/src/worker/mod.rs b/ddtelemetry/src/worker/mod.rs index 668c560aa5..928cb63355 100644 --- a/ddtelemetry/src/worker/mod.rs +++ b/ddtelemetry/src/worker/mod.rs @@ -41,6 +41,7 @@ use tokio::{ task::JoinHandle, }; use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; const CONTINUE: ControlFlow<()> = ControlFlow::Continue(()); const BREAK: ControlFlow<()> = ControlFlow::Break(()); @@ -56,18 +57,28 @@ fn time_now() -> f64 { macro_rules! telemetry_worker_log { ($worker:expr , ERROR , $fmt_str:tt, $($arg:tt)*) => { { - #[cfg(feature = "tracing")] - tracing::error!($fmt_str, $($arg)*); + error!( + worker.runtime_id = %$worker.runtime_id, + worker.debug_logging = $worker.config.telemetry_debug_logging_enabled, + $fmt_str, + $($arg)* + ); if $worker.config.telemetry_debug_logging_enabled { eprintln!(concat!("{}: Telemetry worker ERROR: ", $fmt_str), time_now(), $($arg)*); } } }; ($worker:expr , DEBUG , $fmt_str:tt, $($arg:tt)*) => { - #[cfg(feature = "tracing")] - tracing::debug!($fmt_str, $($arg)*); - if $worker.config.telemetry_debug_logging_enabled { - println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*); + { + debug!( + worker.runtime_id = %$worker.runtime_id, + worker.debug_logging = $worker.config.telemetry_debug_logging_enabled, + $fmt_str, + $($arg)* + ); + if $worker.config.telemetry_debug_logging_enabled { + println!(concat!("{}: Telemetry worker DEBUG: ", $fmt_str), time_now(), $($arg)*); + } } }; } @@ -147,12 +158,27 @@ impl Worker for TelemetryWorker { // Runs a state machine that waits for actions, either from the worker's // mailbox, or scheduled actions from the worker's deadline object. async fn run(&mut self) { + info!( + worker.flavor = ?self.flavor, + worker.runtime_id = %self.runtime_id, + "Starting telemetry worker" + ); + loop { if self.cancellation_token.is_cancelled() { + info!( + worker.runtime_id = %self.runtime_id, + "Telemetry worker cancelled, shutting down" + ); return; } let action = self.recv_next_action().await; + debug!( + worker.runtime_id = %self.runtime_id, + action = ?action, + "Received telemetry action" + ); let action_result = match self.flavor { TelemetryWorkerFlavor::Full => self.dispatch_action(action).await, @@ -164,12 +190,22 @@ impl Worker for TelemetryWorker { match action_result { ControlFlow::Continue(()) => {} ControlFlow::Break(()) => { + info!( + worker.runtime_id = %self.runtime_id, + worker.restartable = self.config.restartable, + "Telemetry worker received break signal" + ); if !self.config.restartable { break; } } }; } + + info!( + worker.runtime_id = %self.runtime_id, + "Telemetry worker stopped" + ); } } @@ -644,8 +680,28 @@ impl TelemetryWorker { } async fn send_payload(&self, payload: &data::Payload) -> Result<()> { + debug!( + worker.runtime_id = %self.runtime_id, + payload.type = payload.request_type(), + seq_id = self.seq_id.load(Ordering::Acquire), + "Sending telemetry payload" + ); let req = self.build_request(payload)?; - self.send_request(req).await + let result = self.send_request(req).await; + match &result { + Ok(()) => info!( + worker.runtime_id = %self.runtime_id, + payload.type = payload.request_type(), + "Successfully sent telemetry payload" + ), + Err(e) => error!( + worker.runtime_id = %self.runtime_id, + payload.type = payload.request_type(), + error = %e, + "Failed to send telemetry payload" + ), + } + result } fn build_request(&self, payload: &data::Payload) -> Result { @@ -691,24 +747,51 @@ impl TelemetryWorker { } async fn send_request(&self, req: hyper_migration::HttpRequest) -> Result<()> { + let timeout_ms = if let Some(endpoint) = self.config.endpoint.as_ref() { + endpoint.timeout_ms + } else { + Endpoint::DEFAULT_TIMEOUT + }; + + debug!( + worker.runtime_id = %self.runtime_id, + http.timeout_ms = timeout_ms, + "Sending HTTP request" + ); + tokio::select! { _ = self.cancellation_token.cancelled() => { + warn!( + worker.runtime_id = %self.runtime_id, + "Telemetry request cancelled" + ); Err(anyhow::anyhow!("Request cancelled")) }, - _ = tokio::time::sleep(time::Duration::from_millis( - if let Some(endpoint) = self.config.endpoint.as_ref() { - endpoint.timeout_ms - } else { - Endpoint::DEFAULT_TIMEOUT - })) => { + _ = tokio::time::sleep(time::Duration::from_millis(timeout_ms)) => { + warn!( + worker.runtime_id = %self.runtime_id, + http.timeout_ms = timeout_ms, + "Telemetry request timed out" + ); Err(anyhow::anyhow!("Request timed out")) }, r = self.client.request(req) => { match r { Ok(_) => { + debug!( + worker.runtime_id = %self.runtime_id, + "HTTP request completed successfully" + ); Ok(()) } - Err(e) => Err(e), + Err(e) => { + error!( + worker.runtime_id = %self.runtime_id, + error = %e, + "HTTP request failed" + ); + Err(e) + }, } } } diff --git a/examples/ffi/README.md b/examples/ffi/README.md index 0acc71ff9a..399810e153 100644 --- a/examples/ffi/README.md +++ b/examples/ffi/README.md @@ -17,6 +17,11 @@ cmake --build ./examples/ffi/build # Run FFI examples The build command will create executables in the examples/ffi/build folder. You can run any of them with: -```` -./examples/ffi/build/test-name -```` + +``` +./examples/ffi/build/ddsketch +./examples/ffi/build/telemetry +./examples/ffi/build/crashtracker +./examples/ffi/build/trace_exporter +# ... etc +``` diff --git a/examples/ffi/trace_exporter.c b/examples/ffi/trace_exporter.c index 90500cf823..d9d0e788cd 100644 --- a/examples/ffi/trace_exporter.c +++ b/examples/ffi/trace_exporter.c @@ -92,6 +92,18 @@ int main(int argc, char** argv) ddog_trace_exporter_config_set_tracer_version(config, tracer_version); ddog_trace_exporter_config_set_language(config, language); + ddog_TelemetryClientConfig telemetry_config = { + .interval = 60000, + .runtime_id = DDOG_CHARSLICE_C("12345678-1234-1234-1234-123456789abc"), + .debug_enabled = true + }; + + ret = ddog_trace_exporter_config_enable_telemetry(config, &telemetry_config); + if (ret) { + handle_error(ret); + goto error; + } + ret = ddog_trace_exporter_new(&trace_exporter, config); assert(ret == NULL);