Skip to content

Commit 740b396

Browse files
Leiyksbwoebi
andauthored
feat(data-pipeline-ffi): add functions to manipulate span from C (#994)
Co-authored-by: bwoebi <[email protected]>
1 parent 36d16a2 commit 740b396

File tree

19 files changed

+1623
-27
lines changed

19 files changed

+1623
-27
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-pipeline-ffi/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ datadog-trace-utils = { path = "../datadog-trace-utils" }
3131
data-pipeline = { path = "../data-pipeline" }
3232
ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
3333
tinybytes = { path = "../tinybytes" }
34-
tracing = { version = "0.1", default-features = false }
34+
tracing = { version = "0.1", default-features = false }

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::{
2828
use arc_swap::{ArcSwap, ArcSwapOption};
2929
use bytes::Bytes;
3030
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
31+
use datadog_trace_utils::msgpack_encoder;
3132
use datadog_trace_utils::send_with_retry::{
3233
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
3334
};
@@ -1057,9 +1058,7 @@ impl TraceExporter {
10571058
payload: &tracer_payload::TraceChunks<T>,
10581059
) -> Result<Vec<u8>, TraceExporterError> {
10591060
match payload {
1060-
tracer_payload::TraceChunks::V04(p) => {
1061-
rmp_serde::to_vec_named(p).map_err(TraceExporterError::Serialization)
1062-
}
1061+
tracer_payload::TraceChunks::V04(p) => Ok(msgpack_encoder::v04::to_vec(p)),
10631062
tracer_payload::TraceChunks::V05(p) => {
10641063
rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)
10651064
}
@@ -1544,7 +1543,7 @@ mod tests {
15441543
..Default::default()
15451544
}];
15461545

1547-
let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();
1546+
let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
15481547

15491548
// Wait for the info fetcher to get the config
15501549
while agent_info::get_agent_info().is_none() {
@@ -1645,7 +1644,7 @@ mod tests {
16451644
..Default::default()
16461645
}];
16471646

1648-
let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();
1647+
let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
16491648

16501649
// Wait for agent_info to be present so that sending a trace will trigger the stats worker
16511650
// to start
@@ -1751,7 +1750,7 @@ mod tests {
17511750
..Default::default()
17521751
}],
17531752
];
1754-
let data = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
1753+
let data = msgpack_encoder::v04::to_vec(&traces);
17551754

17561755
let _result = exporter
17571756
.send(data.as_ref(), 2)
@@ -1851,7 +1850,7 @@ mod tests {
18511850
name: BytesString::from_slice(b"test").unwrap(),
18521851
..Default::default()
18531852
}]];
1854-
let data = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
1853+
let data = msgpack_encoder::v04::to_vec(&traces);
18551854
let result = exporter.send(data.as_ref(), 1);
18561855

18571856
assert!(result.is_err());
@@ -1958,7 +1957,7 @@ mod tests {
19581957
name: BytesString::from_slice(b"test").unwrap(),
19591958
..Default::default()
19601959
}]];
1961-
let data = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
1960+
let data = msgpack_encoder::v04::to_vec(&traces);
19621961
let result = exporter.send(data.as_ref(), 1);
19631962

19641963
assert!(result.is_err());
@@ -2068,7 +2067,7 @@ mod tests {
20682067
name: BytesString::from_slice(b"test").unwrap(),
20692068
..Default::default()
20702069
}]];
2071-
let data = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
2070+
let data = msgpack_encoder::v04::to_vec(&traces);
20722071
let result = exporter.send(data.as_ref(), 1).unwrap();
20732072

20742073
assert_eq!(
@@ -2110,7 +2109,7 @@ mod tests {
21102109
name: BytesString::from_slice(b"test").unwrap(),
21112110
..Default::default()
21122111
}]];
2113-
let data = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
2112+
let data = msgpack_encoder::v04::to_vec(&traces);
21142113
let code = match exporter.send(data.as_ref(), 1).unwrap_err() {
21152114
TraceExporterError::Request(e) => Some(e.status()),
21162115
_ => None,
@@ -2145,7 +2144,7 @@ mod tests {
21452144
name: BytesString::from_slice(b"test").unwrap(),
21462145
..Default::default()
21472146
}]];
2148-
let data = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
2147+
let data = msgpack_encoder::v04::to_vec(&traces);
21492148
let err = exporter.send(data.as_ref(), 1);
21502149

21512150
assert!(err.is_err());
@@ -2513,7 +2512,7 @@ mod tests {
25132512
..Default::default()
25142513
}];
25152514

2516-
let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();
2515+
let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
25172516

25182517
// Wait for the info fetcher to get the config
25192518
while mock_info.hits() == 0 {

datadog-sidecar-ffi/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@ ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
2020
ddtelemetry-ffi = { path = "../ddtelemetry-ffi", default-features = false }
2121
datadog-remote-config = { path = "../datadog-remote-config" }
2222
datadog-live-debugger = { path = "../datadog-live-debugger" }
23+
dogstatsd-client = { path = "../dogstatsd-client" }
24+
tinybytes = { path = "../tinybytes", features = ["bytes_string"] }
2325
paste = "1"
2426
libc = "0.2"
25-
dogstatsd-client = { path = "../dogstatsd-client" }
27+
tracing = { version = "0.1", default-features = false }
28+
rmp-serde = "1.1.1"
29+
2630

2731
[target.'cfg(windows)'.dependencies]
2832
datadog-crashtracker-ffi = { path = "../datadog-crashtracker-ffi", features = ["collector", "collector_windows"] }

datadog-sidecar-ffi/cbindgen.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ language = "C"
55
tab_width = 2
66
header = """// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
77
// SPDX-License-Identifier: Apache-2.0
8+
9+
typedef struct ddog_SpanBytes ddog_SpanBytes;
10+
typedef struct ddog_SpanLinkBytes ddog_SpanLinkBytes;
11+
typedef struct ddog_SpanEventBytes ddog_SpanEventBytes;
12+
typedef struct ddog_AttributeAnyValueBytes ddog_AttributeAnyValueBytes;
13+
typedef struct ddog_AttributeArrayValueBytes ddog_AttributeArrayValueBytes;
814
"""
915
include_guard = "DDOG_SIDECAR_H"
1016
style = "both"

datadog-sidecar-ffi/src/lib.rs

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#![cfg_attr(not(test), deny(clippy::todo))]
88
#![cfg_attr(not(test), deny(clippy::unimplemented))]
99

10+
pub mod span;
11+
12+
use crate::span::TracesBytes;
1013
#[cfg(windows)]
1114
use datadog_crashtracker_ffi::Metadata;
1215
use datadog_ipc::platform::{
@@ -29,14 +32,14 @@ use datadog_sidecar::service::{
2932
};
3033
use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions};
3134
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
35+
use datadog_trace_utils::msgpack_encoder;
3236
use ddcommon::tag::Tag;
3337
use ddcommon::Endpoint;
3438
use ddcommon_ffi as ffi;
3539
use ddcommon_ffi::{CharSlice, MaybeError};
36-
use ddtelemetry::worker::LogIdentifier;
3740
use ddtelemetry::{
3841
data::{self, Dependency, Integration},
39-
worker::{LifecycleAction, TelemetryActions},
42+
worker::{LifecycleAction, LogIdentifier, TelemetryActions},
4043
};
4144
use ddtelemetry_ffi::try_c;
4245
use dogstatsd_client::DogStatsDActionOwned;
@@ -1111,6 +1114,111 @@ pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
11111114
.unwrap_or(ffi::CharSlice::empty())
11121115
}
11131116

1117+
#[macro_export]
1118+
macro_rules! check {
1119+
($failable:expr, $msg:expr) => {
1120+
match $failable {
1121+
Ok(o) => o,
1122+
Err(e) => {
1123+
tracing::error!("{}: {}", $msg, e);
1124+
return;
1125+
}
1126+
}
1127+
};
1128+
}
1129+
1130+
#[repr(C)]
1131+
#[derive()]
1132+
pub struct SenderParameters {
1133+
pub tracer_headers_tags: TracerHeaderTags<'static>,
1134+
pub transport: Box<SidecarTransport>,
1135+
pub instance_id: Box<InstanceId>,
1136+
pub limit: usize,
1137+
pub n_requests: i64,
1138+
pub buffer_size: i64,
1139+
pub url: CharSlice<'static>,
1140+
}
1141+
1142+
#[no_mangle]
1143+
#[allow(clippy::missing_safety_doc)]
1144+
pub unsafe extern "C" fn ddog_send_traces_to_sidecar(
1145+
traces: &mut TracesBytes,
1146+
parameters: &mut SenderParameters,
1147+
) {
1148+
let size: usize = traces.iter().map(|trace| trace.len()).sum();
1149+
1150+
// Check connection to the sidecar
1151+
if parameters.transport.is_closed() {
1152+
tracing::info!(
1153+
"Skipping flushing traces of size {} as connection to sidecar failed",
1154+
size
1155+
);
1156+
return;
1157+
}
1158+
1159+
// Create and map shared memory
1160+
let shm = check!(
1161+
ShmHandle::new(parameters.limit),
1162+
"Failed to create shared memory"
1163+
);
1164+
1165+
let mut mapped_shm = check!(shm.clone().map(), "Failed to map shared memory");
1166+
1167+
// Write traces to the shared memory
1168+
let mut shm_slice = mapped_shm.as_slice_mut();
1169+
let shm_slice_len = shm_slice.len();
1170+
let written = match msgpack_encoder::v04::write_to_slice(&mut shm_slice, traces) {
1171+
Ok(()) => shm_slice_len - shm_slice.len(),
1172+
Err(_) => {
1173+
tracing::error!("Failed serializing the traces");
1174+
return;
1175+
}
1176+
};
1177+
1178+
// Send traces to the sidecar via the shared memory handler
1179+
let mut size_hint = written;
1180+
if parameters.n_requests > 0 {
1181+
size_hint = size_hint.max((parameters.buffer_size / parameters.n_requests + 1) as usize);
1182+
}
1183+
1184+
let send_error = blocking::send_trace_v04_shm(
1185+
&mut parameters.transport,
1186+
&parameters.instance_id,
1187+
shm,
1188+
size_hint,
1189+
check!(
1190+
(&parameters.tracer_headers_tags).try_into(),
1191+
"Failed to convert tracer headers tags"
1192+
),
1193+
);
1194+
1195+
// Retry sending traces via bytes if there was an error
1196+
if send_error.is_err() {
1197+
match blocking::send_trace_v04_bytes(
1198+
&mut parameters.transport,
1199+
&parameters.instance_id,
1200+
msgpack_encoder::v04::to_vec_with_capacity(traces, written as u32),
1201+
check!(
1202+
(&parameters.tracer_headers_tags).try_into(),
1203+
"Failed to convert tracer headers tags"
1204+
),
1205+
) {
1206+
Ok(_) => {}
1207+
Err(_) => tracing::debug!(
1208+
"Failed sending traces via shm to sidecar: {}",
1209+
send_error.err().unwrap_unchecked().to_string()
1210+
),
1211+
};
1212+
}
1213+
1214+
tracing::event!(target: "info", tracing::Level::INFO, "Flushing trace of size {} to send-queue for {}", size, parameters.url);
1215+
// tracing::info!(
1216+
// "Flushing traces of size {} to send-queue for {}",
1217+
// size,
1218+
// parameters.url
1219+
// );
1220+
}
1221+
11141222
/// Drops the agent info reader.
11151223
#[no_mangle]
11161224
#[allow(clippy::missing_safety_doc)]

0 commit comments

Comments
 (0)