Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion LICENSE-3rdparty.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
root_name: builder, build_common, tools, datadog-alloc, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, datadog-crashtracker-ffi, ddcommon-ffi, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, tinybytes, spawn_worker, cc_utils, datadog-library-config, datadog-library-config-ffi, datadog-live-debugger, datadog-live-debugger-ffi, datadog-profiling, datadog-profiling-protobuf, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-utils, datadog-trace-normalization, dogstatsd-client, datadog-log-ffi, datadog-log, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, datadog-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib
root_name: builder, build_common, tools, datadog-alloc, datadog-crashtracker, ddcommon, ddtelemetry, datadog-ddsketch, datadog-crashtracker-ffi, ddcommon-ffi, datadog-ipc, datadog-ipc-macros, tarpc, tarpc-plugins, tinybytes, spawn_worker, cc_utils, datadog-library-config, datadog-library-config-ffi, datadog-live-debugger, datadog-live-debugger-ffi, datadog-profiling, datadog-profiling-protobuf, datadog-profiling-ffi, data-pipeline-ffi, data-pipeline, datadog-trace-protobuf, datadog-trace-utils, datadog-trace-normalization, dogstatsd-client, datadog-log, datadog-log-ffi, ddtelemetry-ffi, symbolizer-ffi, datadog-profiling-replayer, datadog-remote-config, datadog-sidecar, datadog-sidecar-macros, datadog-sidecar-ffi, datadog-trace-obfuscation, datadog-tracer-flare, sidecar_mockgen, test_spawn_from_lib
third_party_libraries:
- package_name: addr2line
package_version: 0.24.2
Expand Down
15 changes: 7 additions & 8 deletions data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ddcommon_ffi::{
{slice::AsBytes, slice::ByteSlice},
};
use std::{ptr::NonNull, time::Duration};
use tracing::error;
use tracing::{debug, error};

#[cfg(all(feature = "catch_panic", panic = "unwind"))]
use std::panic::{catch_unwind, AssertUnwindSafe};
Expand Down Expand Up @@ -325,16 +325,16 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_enable_telemetry(
catch_panic!(
if let Option::Some(config) = config {
if let Option::Some(telemetry_cfg) = telemetry_cfg {
config.telemetry_cfg = Some(TelemetryConfig {
let cfg = TelemetryConfig {
heartbeat: telemetry_cfg.interval,
runtime_id: match sanitize_string(telemetry_cfg.runtime_id) {
Ok(s) => Some(s),
Err(e) => return Some(e),
},
debug_enabled: telemetry_cfg.debug_enabled,
})
} else {
config.telemetry_cfg = Some(TelemetryConfig::default());
};
debug!(telemetry_cfg = ?cfg, "Configuring telemetry");
config.telemetry_cfg = Some(cfg);
}
None
} else {
Expand Down Expand Up @@ -467,7 +467,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
}

if let Some(cfg) = &config.telemetry_cfg {
builder.enable_telemetry(Some(cfg.clone()));
builder.enable_telemetry(cfg.clone());
}

if let Some(token) = &config.test_session_token {
Expand Down Expand Up @@ -795,8 +795,7 @@ mod tests {
let mut cfg = TraceExporterConfig::default();
let error = ddog_trace_exporter_config_enable_telemetry(Some(&mut cfg), None);
assert!(error.is_none());
assert_eq!(cfg.telemetry_cfg.as_ref().unwrap().heartbeat, 0);
assert!(cfg.telemetry_cfg.as_ref().unwrap().runtime_id.is_none());
assert!(cfg.telemetry_cfg.is_none());

let mut cfg = TraceExporterConfig::default();
let error = ddog_trace_exporter_config_enable_telemetry(
Expand Down
2 changes: 2 additions & 0 deletions data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ harness = false
path = "benches/main.rs"

[dev-dependencies]
datadog-log = { path = "../datadog-log" }
clap = { version = "4.0", features = ["derive"] }
criterion = "0.5.1"
datadog-trace-utils = { path = "../datadog-trace-utils", features = ["test-utils"] }
httpmock = "0.7.0"
Expand Down
34 changes: 26 additions & 8 deletions data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use data_pipeline::trace_exporter::{
TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
TelemetryConfig, TraceExporter, TraceExporterInputFormat, TraceExporterOutputFormat,
};
use datadog_log::logger::{
logger_configure_std, logger_set_log_level, LogEventLevel, StdConfig, StdTarget,
};
use datadog_trace_protobuf::pb;
use std::{
Expand Down Expand Up @@ -30,6 +33,13 @@ fn get_span(now: i64, trace_id: u64, span_id: u64) -> pb::Span {
}

fn main() {
logger_configure_std(StdConfig {
target: StdTarget::Out,
})
.expect("Failed to configure logger");
logger_set_log_level(LogEventLevel::Debug).expect("Failed to set log level");

let telemetry_cfg = TelemetryConfig::default();
let mut builder = TraceExporter::builder();
builder
.set_url("http://localhost:8126")
Expand All @@ -42,20 +52,28 @@ fn main() {
.set_language_version(env!("CARGO_PKG_RUST_VERSION"))
.set_input_format(TraceExporterInputFormat::V04)
.set_output_format(TraceExporterOutputFormat::V04)
.enable_telemetry(telemetry_cfg)
.enable_stats(Duration::from_secs(10));
let exporter = builder.build().unwrap();
let now = UNIX_EPOCH.elapsed().unwrap().as_nanos() as i64;
let exporter = builder.build().expect("Failed to build TraceExporter");
let now = UNIX_EPOCH
.elapsed()
.expect("Failed to get time since UNIX_EPOCH")
.as_nanos() as i64;

let mut traces = Vec::new();
for trace_id in 1..=100 {
for trace_id in 1..=2 {
let mut trace = Vec::new();
for span_id in 1..=1000 {
for span_id in 1..=2 {
trace.push(get_span(now, trace_id, span_id));
}
traces.push(trace);
}
let data = rmp_serde::to_vec_named(&traces).unwrap();
let data = rmp_serde::to_vec_named(&traces).expect("Failed to serialize traces");

exporter.send(data.as_ref(), 100).unwrap();
exporter.shutdown(None).unwrap();
exporter
.send(data.as_ref(), 2)
.expect("Failed to send traces");
exporter
.shutdown(None)
.expect("Failed to shutdown exporter");
}
12 changes: 6 additions & 6 deletions data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ pub(super) struct BorrowedAggregationKey<'a> {
/// the key type `K` implements `Borrow<Q>`. Since `AggregationKey<'static>` cannot implement
/// `Borrow<AggregationKey<'a>>` we use `dyn BorrowableAggregationKey` as a placeholder.
trait BorrowableAggregationKey {
fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey;
fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_>;
}

impl BorrowableAggregationKey for AggregationKey<'_> {
fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey {
fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_> {
BorrowedAggregationKey {
resource_name: self.resource_name.borrow(),
service_name: self.service_name.borrow(),
Expand All @@ -76,7 +76,7 @@ impl BorrowableAggregationKey for AggregationKey<'_> {
}

impl BorrowableAggregationKey for BorrowedAggregationKey<'_> {
fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey {
fn borrowed_aggregation_key(&self) -> BorrowedAggregationKey<'_> {
self.clone()
}
}
Expand All @@ -90,16 +90,16 @@ where
}
}

impl Eq for (dyn BorrowableAggregationKey + '_) {}
impl Eq for dyn BorrowableAggregationKey + '_ {}

impl PartialEq for (dyn BorrowableAggregationKey + '_) {
impl PartialEq for dyn BorrowableAggregationKey + '_ {
fn eq(&self, other: &dyn BorrowableAggregationKey) -> bool {
self.borrowed_aggregation_key()
.eq(&other.borrowed_aggregation_key())
}
}

impl std::hash::Hash for (dyn BorrowableAggregationKey + '_) {
impl std::hash::Hash for dyn BorrowableAggregationKey + '_ {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.borrowed_aggregation_key().hash(state)
}
Expand Down
12 changes: 4 additions & 8 deletions data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,8 @@ impl TraceExporterBuilder {
}

/// Enables sending telemetry metrics.
pub fn enable_telemetry(&mut self, cfg: Option<TelemetryConfig>) -> &mut Self {
if let Some(cfg) = cfg {
self.telemetry = Some(cfg);
} else {
self.telemetry = Some(TelemetryConfig::default());
}
pub fn enable_telemetry(&mut self, cfg: TelemetryConfig) -> &mut Self {
self.telemetry = Some(cfg);
self
}

Expand Down Expand Up @@ -383,11 +379,11 @@ mod tests {
.set_input_format(TraceExporterInputFormat::Proxy)
.set_output_format(TraceExporterOutputFormat::V04)
.set_client_computed_stats()
.enable_telemetry(Some(TelemetryConfig {
.enable_telemetry(TelemetryConfig {
heartbeat: 1000,
runtime_id: None,
debug_enabled: false,
}));
});
let exporter = builder.build().unwrap();

assert_eq!(
Expand Down
12 changes: 6 additions & 6 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1078,10 +1078,10 @@ mod tests {
};

if enable_telemetry {
builder.enable_telemetry(Some(TelemetryConfig {
builder.enable_telemetry(TelemetryConfig {
heartbeat: 100,
..Default::default()
}));
});
}

builder.build().unwrap()
Expand Down Expand Up @@ -1617,10 +1617,10 @@ mod tests {
.set_language("nodejs")
.set_language_version("1.0")
.set_language_interpreter("v8")
.enable_telemetry(Some(TelemetryConfig {
.enable_telemetry(TelemetryConfig {
heartbeat: 100,
..Default::default()
}));
});
let exporter = builder.build().unwrap();

let traces = vec![0x90];
Expand Down Expand Up @@ -1741,10 +1741,10 @@ mod tests {
.set_language("nodejs")
.set_language_version("1.0")
.set_language_interpreter("v8")
.enable_telemetry(Some(TelemetryConfig {
.enable_telemetry(TelemetryConfig {
heartbeat: 100,
..Default::default()
}))
})
.set_input_format(TraceExporterInputFormat::V04)
.set_output_format(TraceExporterOutputFormat::V05);

Expand Down
31 changes: 15 additions & 16 deletions datadog-crashtracker/src/collector/emitters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,35 +71,34 @@ unsafe fn emit_backtrace_by_frames(
}
if resolve_frames == StacktraceCollection::EnabledWithInprocessSymbols {
backtrace::resolve_frame_unsynchronized(frame, |symbol| {
write!(w, "{{").unwrap();
#[allow(clippy::unwrap_used)]
emit_absolute_addresses(w, frame).unwrap();
// In crash handling context, we need to be resilient to write failures
// If any write fails, we continue to try the next operations
let _ = write!(w, "{{");
let _ = emit_absolute_addresses(w, frame);
if let Some(column) = symbol.colno() {
write!(w, ", \"column\": {column}").unwrap();
let _ = write!(w, ", \"column\": {column}");
}
if let Some(file) = symbol.filename() {
// The debug printer for path already wraps it in `"` marks.
write!(w, ", \"file\": {file:?}").unwrap();
let _ = write!(w, ", \"file\": {file:?}");
}
if let Some(function) = symbol.name() {
write!(w, ", \"function\": \"{function}\"").unwrap();
let _ = write!(w, ", \"function\": \"{function}\"");
}
if let Some(line) = symbol.lineno() {
write!(w, ", \"line\": {line}").unwrap();
let _ = write!(w, ", \"line\": {line}");
}
writeln!(w, "}}").unwrap();
let _ = writeln!(w, "}}");
// Flush eagerly to ensure that each frame gets emitted even if the next one fails
#[allow(clippy::unwrap_used)]
w.flush().unwrap();
let _ = w.flush();
});
} else {
write!(w, "{{").unwrap();
#[allow(clippy::unwrap_used)]
emit_absolute_addresses(w, frame).unwrap();
writeln!(w, "}}").unwrap();
// In crash handling context, we need to be resilient to write failures
let _ = write!(w, "{{");
let _ = emit_absolute_addresses(w, frame);
let _ = writeln!(w, "}}");
// Flush eagerly to ensure that each frame gets emitted even if the next one fails
#[allow(clippy::unwrap_used)]
w.flush().unwrap();
let _ = w.flush();
}
true // keep going to the next frame
});
Expand Down
2 changes: 1 addition & 1 deletion datadog-ipc/src/platform/unix/platform_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<T> PlatformHandle<T>
where
T: SocketlikeViewType,
{
pub fn as_socketlike_view(&self) -> io::Result<SocketlikeView<T>> {
pub fn as_socketlike_view(&self) -> io::Result<SocketlikeView<'_, T>> {
Ok(self.as_owned_fd()?.as_socketlike_view())
}
}
2 changes: 1 addition & 1 deletion datadog-ipc/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<Inner> Debug for ShmLimiter<Inner> {
}

impl<Inner> ShmLimiter<Inner> {
fn limiter(&self) -> &ShmLimiterData<Inner> {
fn limiter(&self) -> &ShmLimiterData<'_, Inner> {
#[allow(clippy::unwrap_used)]
unsafe {
&*self
Expand Down
4 changes: 2 additions & 2 deletions datadog-live-debugger/src/expr_defs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ impl Display for Condition {
Condition::Never => f.write_str("false"),
Condition::Disjunction(b) => {
let (x, y) = &**b;
fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp {
fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp<'_> {
NonAssocBoolOp(condition, matches!(condition, Condition::Conjunction(_)))
}
write!(f, "{} || {}", is_nonassoc(x), is_nonassoc(y))
}
Condition::Conjunction(b) => {
let (x, y) = &**b;
fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp {
fn is_nonassoc(condition: &Condition) -> NonAssocBoolOp<'_> {
NonAssocBoolOp(condition, matches!(condition, Condition::Disjunction(_)))
}
write!(f, "{} && {}", is_nonassoc(x), is_nonassoc(y))
Expand Down
6 changes: 3 additions & 3 deletions datadog-profiling/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ fn string_table_fetch(pprof: &prost_impls::Profile, id: i64) -> anyhow::Result<&
.ok_or_else(|| anyhow::anyhow!("String {id} was not found."))
}

fn mapping_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Mapping> {
fn mapping_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Mapping<'_>> {
if id == 0 {
return Ok(Mapping::default());
}
Expand All @@ -307,7 +307,7 @@ fn mapping_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Mappin
}
}

fn function_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Function> {
fn function_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Function<'_>> {
if id == 0 {
return Ok(Function::default());
}
Expand All @@ -322,7 +322,7 @@ fn function_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Funct
}
}

fn location_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Location> {
fn location_fetch(pprof: &prost_impls::Profile, id: u64) -> anyhow::Result<Location<'_>> {
if id == 0 {
return Ok(Location::default());
}
Expand Down
2 changes: 1 addition & 1 deletion datadog-remote-config/src/fetch/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl<S> ConfigFetcherState<S> {
/// - This files_lock() must always be called prior to locking any data structure locked within
/// FileStorage::store().
/// - Also, files_lock() must not be called from within FileStorage::store().
pub fn files_lock(&self) -> ConfigFetcherFilesLock<S> {
pub fn files_lock(&self) -> ConfigFetcherFilesLock<'_, S> {
assert!(!self.expire_unused_files);
ConfigFetcherFilesLock {
inner: self.target_files_by_path.lock_or_panic(),
Expand Down
2 changes: 1 addition & 1 deletion datadog-remote-config/src/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl<P> Deref for RawFileContentsGuard<'_, P> {

impl<P> RawFile<P> {
/// Gets the contents behind a Deref impl (guarding a Mutex).
pub fn contents(&self) -> RawFileContentsGuard<P> {
pub fn contents(&self) -> RawFileContentsGuard<'_, P> {
RawFileContentsGuard(self.data.lock_or_panic())
}

Expand Down
2 changes: 1 addition & 1 deletion datadog-remote-config/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct RemoteConfigPathRef<'a> {
}

impl RemoteConfigPath {
pub fn try_parse(path: &str) -> anyhow::Result<RemoteConfigPathRef> {
pub fn try_parse(path: &str) -> anyhow::Result<RemoteConfigPathRef<'_>> {
let parts: Vec<_> = path.split('/').collect();
Ok(RemoteConfigPathRef {
source: match parts[0] {
Expand Down
Loading
Loading