Skip to content

Commit ed6a105

Browse files
committed
Merge branch 'main' of github.com:DataDog/libdatadog into r1viollet/run-ffi-examples-on-windows
2 parents 2b3eacf + 9aa79d2 commit ed6a105

File tree

24 files changed

+6314
-5844
lines changed

24 files changed

+6314
-5844
lines changed

Cargo.lock

Lines changed: 1236 additions & 1186 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ resolver = "2"
6565
[workspace.package]
6666
rust-version = "1.84.1"
6767
edition = "2021"
68-
version = "25.0.0"
68+
version = "26.0.0"
6969
license = "Apache-2.0"
7070
authors = ["Datadog Inc. <info@datadoghq.com>"]
7171

LICENSE-3rdparty.yml

Lines changed: 4670 additions & 4588 deletions
Large diffs are not rendered by default.

datadog-ipc/src/platform/unix/channel.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33

44
use crate::handles::TransferHandles;
55
use crate::platform::{Message, PlatformHandle};
6-
use nix::sys::select::FdSet;
7-
use nix::sys::time::{TimeVal, TimeValLike};
6+
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
87
use std::{
98
io::{self, ErrorKind, Read, Write},
109
os::{
@@ -61,11 +60,13 @@ impl Channel {
6160
pub fn probe_readable(&self) -> bool {
6261
#[allow(clippy::unwrap_used)]
6362
let raw_fd = self.inner.as_owned_fd().unwrap().as_fd();
64-
let mut fds = FdSet::new();
65-
fds.insert(raw_fd);
66-
nix::sys::select::select(None, Some(&mut fds), None, None, Some(&mut TimeVal::zero()))
67-
.is_err()
68-
|| fds.contains(raw_fd)
63+
64+
let mut fds = [PollFd::new(raw_fd, PollFlags::POLLIN)];
65+
poll(&mut fds, PollTimeout::ZERO).is_err()
66+
|| fds[0]
67+
.revents()
68+
.unwrap_or(PollFlags::empty())
69+
.intersects(PollFlags::POLLIN | PollFlags::POLLHUP | PollFlags::POLLERR)
6970
}
7071

7172
pub fn create_message<T>(&mut self, item: T) -> Result<Message<T>, io::Error>

datadog-ipc/tests/flock.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ fn test_file_locking_works_as_expected() {
4848
.unwrap();
4949

5050
let mut buf = [0; 10];
51-
local
52-
.set_read_timeout(Some(Duration::from_millis(500)))
53-
.unwrap();
51+
// give macOS runners on CI more time to read
52+
#[cfg(target_os = "macos")]
53+
let read_timeout = Duration::from_secs(10);
54+
#[cfg(not(target_os = "macos"))]
55+
let read_timeout = Duration::from_millis(500);
56+
local.set_read_timeout(Some(read_timeout)).unwrap();
5457
// wait for child to signal its ready
5558
assert!(local.read(&mut buf).unwrap() > 0);
5659

datadog-sidecar-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ paste = "1"
2727
libc = "0.2"
2828
tracing = { version = "0.1", default-features = false }
2929
rmp-serde = "1.1.1"
30+
serde_json = "1.0"
3031

3132

3233
[target.'cfg(windows)'.dependencies]

datadog-sidecar-ffi/src/lib.rs

Lines changed: 222 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datadog_sidecar::config::LogMethod;
2424
use datadog_sidecar::crashtracker::crashtracker_unix_socket_path;
2525
use datadog_sidecar::one_way_shared_memory::{OneWayShmReader, ReaderOpener};
2626
use datadog_sidecar::service::agent_info::AgentInfoReader;
27+
use datadog_sidecar::service::telemetry::InternalTelemetryAction;
2728
use datadog_sidecar::service::{
2829
blocking::{self, SidecarTransport},
2930
DynamicInstrumentationConfigState, InstanceId, QueueId, RuntimeMetadata,
@@ -39,6 +40,8 @@ use libdd_common_ffi::{self as ffi, MaybeError};
3940
#[cfg(windows)]
4041
use libdd_crashtracker_ffi::Metadata;
4142
use libdd_dogstatsd_client::DogStatsDActionOwned;
43+
use libdd_telemetry::data::metrics::{MetricNamespace, MetricType};
44+
use libdd_telemetry::metrics::MetricContext;
4245
use libdd_telemetry::{
4346
data::{self, Dependency, Integration},
4447
worker::{LifecycleAction, LogIdentifier, TelemetryActions},
@@ -402,6 +405,35 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig(
402405
MaybeError::None
403406
}
404407

408+
/// Reports an endpoint to the telemetry.
409+
#[no_mangle]
410+
#[allow(clippy::missing_safety_doc)]
411+
pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint(
412+
transport: &mut Box<SidecarTransport>,
413+
instance_id: &InstanceId,
414+
queue_id: &QueueId,
415+
method: libdd_telemetry::data::Method,
416+
path: CharSlice,
417+
operation_name: CharSlice,
418+
resource_name: CharSlice,
419+
) -> MaybeError {
420+
let endpoint = TelemetryActions::AddEndpoint(libdd_telemetry::data::Endpoint {
421+
method: Some(method),
422+
path: Some(path.to_utf8_lossy().into_owned()),
423+
operation_name: operation_name.to_utf8_lossy().into_owned(),
424+
resource_name: resource_name.to_utf8_lossy().into_owned(),
425+
});
426+
427+
try_c!(blocking::enqueue_actions(
428+
transport,
429+
instance_id,
430+
queue_id,
431+
vec![SidecarAction::Telemetry(endpoint)],
432+
));
433+
434+
MaybeError::None
435+
}
436+
405437
/// Reports a dependency to the telemetry.
406438
#[no_mangle]
407439
#[allow(clippy::missing_safety_doc)]
@@ -683,6 +715,60 @@ fn char_slice_to_string(slice: CharSlice) -> Result<String, String> {
683715
.map_err(|e| format!("Failed to convert CharSlice to String: {e}"))
684716
}
685717

718+
struct TelemetryContext {
719+
instance_id: InstanceId,
720+
service_name: String,
721+
env_name: String,
722+
}
723+
724+
impl TelemetryContext {
725+
fn from_ffi(
726+
session_id_ffi: CharSlice,
727+
runtime_id_ffi: CharSlice,
728+
service_name_ffi: CharSlice,
729+
env_name_ffi: CharSlice,
730+
) -> Result<Self, String> {
731+
if session_id_ffi.is_empty() {
732+
return Err("Null or empty session_id".into());
733+
}
734+
if runtime_id_ffi.is_empty() {
735+
return Err("Null or empty runtime_id".into());
736+
}
737+
if service_name_ffi.is_empty() {
738+
return Err("Null or empty service_name".into());
739+
}
740+
if env_name_ffi.is_empty() {
741+
return Err("Null or empty env_name".into());
742+
}
743+
744+
Ok(Self {
745+
instance_id: InstanceId::new(
746+
char_slice_to_string(session_id_ffi)?,
747+
char_slice_to_string(runtime_id_ffi)?,
748+
),
749+
service_name: char_slice_to_string(service_name_ffi)?,
750+
env_name: char_slice_to_string(env_name_ffi)?,
751+
})
752+
}
753+
754+
/// Sends a telemetry action through the internal telemetry channel
755+
fn send_action(self, action: InternalTelemetryAction) -> Result<(), String> {
756+
let sender = get_telemetry_action_sender()
757+
.map_err(|e| format!("Failed to get telemetry action sender: {e}"))?;
758+
759+
let msg = InternalTelemetryActions {
760+
instance_id: self.instance_id,
761+
service_name: self.service_name,
762+
env_name: self.env_name,
763+
actions: vec![action],
764+
};
765+
766+
sender
767+
.try_send(msg)
768+
.map_err(|e| format!("Failed to send telemetry action: {e}"))
769+
}
770+
}
771+
686772
#[allow(clippy::too_many_arguments)]
687773
fn ddog_sidecar_enqueue_telemetry_log_impl(
688774
session_id_ffi: CharSlice,
@@ -696,31 +782,19 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
696782
tags_ffi: Option<NonNull<CharSlice>>,
697783
is_sensitive: bool,
698784
) -> Result<(), String> {
699-
if session_id_ffi.is_empty()
700-
|| runtime_id_ffi.is_empty()
701-
|| service_name_ffi.is_empty()
702-
|| env_name_ffi.is_empty()
703-
|| identifier_ffi.is_empty()
704-
|| message_ffi.is_empty()
705-
{
785+
if identifier_ffi.is_empty() || message_ffi.is_empty() {
706786
return Err("Null or empty required arguments".into());
707787
}
708788

709-
let sender = match get_telemetry_action_sender() {
710-
Ok(s) => s,
711-
Err(e) => {
712-
return Err(format!("Failed to get telemetry action sender: {e}"));
713-
}
714-
};
789+
let ctx = TelemetryContext::from_ffi(
790+
session_id_ffi,
791+
runtime_id_ffi,
792+
service_name_ffi,
793+
env_name_ffi,
794+
)?;
715795

716-
let instance_id = InstanceId::new(
717-
char_slice_to_string(session_id_ffi)?,
718-
char_slice_to_string(runtime_id_ffi)?,
719-
);
720-
let service_name: String = char_slice_to_string(service_name_ffi)?;
721-
let env_name: String = char_slice_to_string(env_name_ffi)?;
722-
let identifier: String = char_slice_to_string(identifier_ffi)?;
723-
let message: String = char_slice_to_string(message_ffi)?;
796+
let identifier = char_slice_to_string(identifier_ffi)?;
797+
let message = char_slice_to_string(message_ffi)?;
724798

725799
let stack_trace = stack_trace_ffi
726800
.map(|s| char_slice_to_string(*unsafe { s.as_ref() }))
@@ -746,17 +820,136 @@ fn ddog_sidecar_enqueue_telemetry_log_impl(
746820
};
747821
let log_action = TelemetryActions::AddLog((log_id, log_data));
748822

749-
let msg = InternalTelemetryActions {
750-
instance_id,
751-
service_name,
752-
env_name,
753-
actions: vec![log_action],
823+
ctx.send_action(InternalTelemetryAction::TelemetryAction(log_action))
824+
}
825+
826+
/// Enqueues a telemetry point to be processed internally.
827+
///
828+
/// # Safety
829+
/// Pointers must be valid, strings must be null-terminated if not null.
830+
#[no_mangle]
831+
pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_point(
832+
session_id_ffi: CharSlice,
833+
runtime_id_ffi: CharSlice,
834+
service_name_ffi: CharSlice,
835+
env_name_ffi: CharSlice,
836+
metric_name_ffi: CharSlice,
837+
value: f64,
838+
tags_ffi: Option<NonNull<CharSlice>>,
839+
) -> MaybeError {
840+
try_c!(ddog_sidecar_enqueue_telemetry_point_impl(
841+
session_id_ffi,
842+
runtime_id_ffi,
843+
service_name_ffi,
844+
env_name_ffi,
845+
metric_name_ffi,
846+
value,
847+
tags_ffi,
848+
));
849+
MaybeError::None
850+
}
851+
852+
fn ddog_sidecar_enqueue_telemetry_point_impl(
853+
session_id_ffi: CharSlice,
854+
runtime_id_ffi: CharSlice,
855+
service_name_ffi: CharSlice,
856+
env_name_ffi: CharSlice,
857+
metric_name_ffi: CharSlice,
858+
value: f64,
859+
tags_ffi: Option<NonNull<CharSlice>>,
860+
) -> Result<(), String> {
861+
if metric_name_ffi.is_empty() {
862+
return Err("Null or empty metric_name".into());
863+
}
864+
865+
let ctx = TelemetryContext::from_ffi(
866+
session_id_ffi,
867+
runtime_id_ffi,
868+
service_name_ffi,
869+
env_name_ffi,
870+
)?;
871+
872+
let metric_name = char_slice_to_string(metric_name_ffi)?;
873+
874+
fn get_tags(tags_slice: CharSlice) -> Result<Vec<Tag>, String> {
875+
let tags = char_slice_to_string(tags_slice)?;
876+
let (tags, error) = libdd_common::tag::parse_tags(tags.as_str());
877+
if let Some(error) = error {
878+
return Err(error.to_string());
879+
}
880+
Ok(tags)
881+
}
882+
883+
let tags = match tags_ffi {
884+
Some(tags_slice) => get_tags(*unsafe { tags_slice.as_ref() })?,
885+
None => Vec::default(),
754886
};
755887

756-
match sender.try_send(msg) {
757-
Ok(_) => Ok(()),
758-
Err(err) => Err(format!("Failed to send telemetry action: {err}")),
888+
ctx.send_action(InternalTelemetryAction::AddMetricPoint((
889+
value,
890+
metric_name,
891+
tags,
892+
)))
893+
}
894+
895+
/// Registers a telemetry metric to be processed internally.
896+
///
897+
/// # Safety
898+
/// Pointers must be valid, strings must be null-terminated if not null.
899+
#[no_mangle]
900+
pub unsafe extern "C" fn ddog_sidecar_enqueue_telemetry_metric(
901+
session_id_ffi: CharSlice,
902+
runtime_id_ffi: CharSlice,
903+
service_name_ffi: CharSlice,
904+
env_name_ffi: CharSlice,
905+
metric_name_ffi: CharSlice,
906+
metric_type: MetricType,
907+
metric_namespace: MetricNamespace,
908+
) -> MaybeError {
909+
try_c!(ddog_sidecar_enqueue_telemetry_metric_impl(
910+
session_id_ffi,
911+
runtime_id_ffi,
912+
service_name_ffi,
913+
env_name_ffi,
914+
metric_name_ffi,
915+
metric_type,
916+
metric_namespace,
917+
));
918+
MaybeError::None
919+
}
920+
921+
#[allow(clippy::too_many_arguments)]
922+
fn ddog_sidecar_enqueue_telemetry_metric_impl(
923+
session_id_ffi: CharSlice,
924+
runtime_id_ffi: CharSlice,
925+
service_name_ffi: CharSlice,
926+
env_name_ffi: CharSlice,
927+
metric_name_ffi: CharSlice,
928+
metric_type: MetricType,
929+
metric_namespace: MetricNamespace,
930+
) -> Result<(), String> {
931+
if metric_name_ffi.is_empty() {
932+
return Err("Null or empty metric_name".into());
759933
}
934+
935+
let ctx = TelemetryContext::from_ffi(
936+
session_id_ffi,
937+
runtime_id_ffi,
938+
service_name_ffi,
939+
env_name_ffi,
940+
)?;
941+
942+
let metric_name = char_slice_to_string(metric_name_ffi)?;
943+
944+
ctx.send_action(InternalTelemetryAction::RegisterTelemetryMetric(
945+
MetricContext {
946+
name: metric_name,
947+
tags: Vec::default(),
948+
metric_type,
949+
common: true,
950+
namespace: metric_namespace,
951+
},
952+
))
760953
}
761954

762955
/// Sends a trace to the sidecar via shared memory.

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::collections::{HashMap, HashSet};
2929
use std::pin::Pin;
3030
use std::sync::atomic::{AtomicU64, Ordering};
3131
use std::sync::{Arc, Mutex};
32-
use std::time::Duration;
32+
use std::time::{Duration, SystemTime};
3333
use tracing::{debug, error, info, trace, warn};
3434

3535
use futures::FutureExt;
@@ -441,7 +441,7 @@ impl SidecarInterface for SidecarServer {
441441
);
442442
let mut telemetry = telemetry_mutex.lock_or_panic();
443443

444-
let mut actions_to_process = vec![];
444+
let mut actions_to_process: Vec<SidecarAction> = vec![];
445445
let mut composer_paths_to_process = vec![];
446446
let mut buffered_info_changed = false;
447447
let mut remove_entry = false;
@@ -469,6 +469,11 @@ impl SidecarInterface for SidecarServer {
469469
SidecarAction::ClearQueueId => {
470470
remove_entry = true;
471471
}
472+
SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => {
473+
telemetry.last_endpoints_push = SystemTime::now();
474+
buffered_info_changed = true;
475+
actions_to_process.push(action);
476+
}
472477
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
473478
LifecycleAction::Stop,
474479
)) => {

0 commit comments

Comments
 (0)