Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datadog-sidecar-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ paste = "1"
libc = "0.2"
tracing = { version = "0.1", default-features = false }
rmp-serde = "1.1.1"
serde_json = "1.0"


[target.'cfg(windows)'.dependencies]
Expand Down
83 changes: 83 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ use std::ptr::NonNull;
use std::slice;
use std::sync::Arc;
use std::time::Duration;
use serde_json::Value;


#[no_mangle]
#[cfg(target_os = "windows")]
Expand Down Expand Up @@ -410,6 +412,87 @@ pub unsafe extern "C" fn ddog_sidecar_telemetry_enqueueConfig(
MaybeError::None
}

/// Reports an endpoint to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_sidecar_telemetry_addEndpoint(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: &QueueId,
r#type: CharSlice,
method: ddtelemetry::data::Method,
path: CharSlice,
operation_name: CharSlice,
resource_name: CharSlice,
request_body_type:*mut ffi::Vec<CharSlice>,
response_body_type:*mut ffi::Vec<CharSlice>,
response_code:i32,
authentication:*mut ffi::Vec<ddtelemetry::data::Authentication>,
metadata: CharSlice
) -> MaybeError {

let response_code_vec = vec![response_code];
let request_body_type_local = Box::into_raw(Box::new(ffi::Vec::<CharSlice>::new()));
let response_body_type_local = Box::into_raw(Box::new(ffi::Vec::<CharSlice>::new()));
let authentication_local = Box::into_raw(Box::new(ffi::Vec::<ddtelemetry::data::Authentication>::new()));
if let Some(req_vec) = request_body_type.as_ref() {
if let Some(local_vec) = request_body_type_local.as_mut() {
for item in req_vec.to_vec() {
local_vec.push(item);
}
}
}
if let Some(resp_vec) = response_body_type.as_ref() {
if let Some(local_vec) = response_body_type_local.as_mut() {
for item in resp_vec.to_vec() {
local_vec.push(item);
}
}
}
if let Some(auth_vec) = authentication.as_ref() {
if let Some(local_vec) = authentication_local.as_mut() {
for item in auth_vec.to_vec() {
local_vec.push(item);
}
}
}

let maybe_metadata: Result<Value, serde_json::Error> = serde_json::from_slice::<serde_json::Value>(std::slice::from_raw_parts(
metadata.as_ptr() as *const u8,
metadata.len(),
));
if let Err(e) = maybe_metadata {
return MaybeError::Some(e.to_string().into());
}
#[allow(clippy::unwrap_used)]
let metadata_json = maybe_metadata.unwrap();
let endpoint = TelemetryActions::AddEndpoint(ddtelemetry::data::Endpoint {
r#type: Some(r#type.to_utf8_lossy().into_owned()),
method: Some(method),
path: Some(path.to_utf8_lossy().into_owned()),
operation_name: operation_name.to_utf8_lossy().into_owned(),
resource_name: resource_name.to_utf8_lossy().into_owned(),
request_body_type: Some(request_body_type_local.as_ref().unwrap().to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()),
response_body_type: Some(response_body_type_local.as_ref().unwrap().to_vec().iter().map(|s| s.to_utf8_lossy().into_owned()).collect()),
response_code: Some(response_code_vec),
authentication: Some(authentication_local.as_ref().unwrap().to_vec()),
metadata: Some(metadata_json),
});

try_c!(blocking::enqueue_actions(
transport,
instance_id,
queue_id,
vec![SidecarAction::Telemetry(endpoint)],
));

std::ptr::drop_in_place(request_body_type);
std::ptr::drop_in_place(response_body_type);
std::ptr::drop_in_place(authentication);

MaybeError::None
}

/// Reports a dependency to the telemetry.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
Expand Down
39 changes: 23 additions & 16 deletions datadog-sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tracing::{debug, error, info, trace, warn};

use futures::FutureExt;
Expand Down Expand Up @@ -297,12 +297,12 @@ impl SidecarServer {

let futures = clients
.values()
.filter_map(|client| client.client.stats().ok())
.filter_map(|client| client.client.lock_or_panic().worker.stats().ok())
.collect::<Vec<_>>();

let metric_counts = clients
.values()
.map(|client| client.telemetry_metrics.lock_or_panic().len() as u32)
.map(|client| client.client.lock_or_panic().telemetry_metrics.len() as u32)
.collect::<Vec<_>>();

(futures, metric_counts)
Expand Down Expand Up @@ -403,7 +403,7 @@ impl SidecarInterface for SidecarServer {
let env = entry.get().env.as_deref().unwrap_or("none");

// Lock telemetry client
let mut telemetry = self.telemetry_clients.get_or_create(
let telemetry_mutex = self.telemetry_clients.get_or_create(
service,
env,
&instance_id,
Expand All @@ -420,8 +420,9 @@ impl SidecarInterface for SidecarServer {
})
},
);
let mut telemetry = telemetry_mutex.lock_or_panic();

let mut actions_to_process = vec![];
let mut actions_to_process: Vec<SidecarAction> = vec![];
let mut composer_paths_to_process = vec![];
let mut buffered_info_changed = false;
let mut remove_entry = false;
Expand All @@ -448,6 +449,11 @@ impl SidecarInterface for SidecarServer {
}
SidecarAction::ClearQueueId => {
remove_entry = true;
},
SidecarAction::Telemetry(TelemetryActions::AddEndpoint(_)) => {
telemetry.last_endpoints_push = SystemTime::now();
buffered_info_changed = true;
actions_to_process.push(action);
}
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop,
Expand All @@ -462,32 +468,33 @@ impl SidecarInterface for SidecarServer {
}

if !actions_to_process.is_empty() {
let client_clone = telemetry.clone();
let mut handle = telemetry.handle.lock_or_panic();
let last_handle = handle.take();
*handle = Some(tokio::spawn(async move {
let telemetry_mutex_clone = telemetry_mutex.clone();
let worker = telemetry.worker.clone();
let last_handle = telemetry.handle.take();
telemetry.handle = Some(tokio::spawn(async move {
if let Some(last_handle) = last_handle {
last_handle.await.ok();
};
let processed = client_clone.process_actions(actions_to_process);
let processed = telemetry_mutex_clone
.lock_or_panic()
.process_actions(actions_to_process);
debug!("Sending Processed Actions :{processed:?}");
client_clone.client.send_msgs(processed).await.ok();
worker.send_msgs(processed).await.ok();
}));
}

if !composer_paths_to_process.is_empty() {
let client_clone = telemetry.clone();
let mut handle = telemetry.handle.lock_or_panic();
let last_handle = handle.take();
*handle = Some(tokio::spawn(async move {
let worker = telemetry.worker.clone();
let last_handle = telemetry.handle.take();
telemetry.handle = Some(tokio::spawn(async move {
if let Some(last_handle) = last_handle {
last_handle.await.ok();
};
let composer_actions =
TelemetryCachedClient::process_composer_paths(composer_paths_to_process)
.await;
debug!("Sending Composer Paths :{composer_actions:?}");
client_clone.client.send_msgs(composer_actions).await.ok();
worker.send_msgs(composer_actions).await.ok();
}));
}

Expand Down
Loading
Loading