diff --git a/Cargo.lock b/Cargo.lock index 14c5115daa..1c2d7ffafa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1655,7 +1655,7 @@ dependencies = [ "hyper 1.6.0", "hyper-multipart-rfc7578", "indexmap 2.6.0", - "lz4_flex", + "lz4_flex 0.9.5", "mime", "prost", "rustc-hash 1.1.0", @@ -1672,11 +1672,13 @@ version = "19.0.1" dependencies = [ "anyhow", "build_common", + "crossbeam-channel", "data-pipeline-ffi", "datadog-crashtracker-ffi", "datadog-library-config-ffi", "datadog-log-ffi", "datadog-profiling", + "datadog-profiling-protobuf", "ddcommon", "ddcommon-ffi", "ddtelemetry-ffi", @@ -1685,8 +1687,11 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "libc", + "lz4_flex 0.11.3", + "prost", "serde_json", "symbolizer-ffi", + "tokio", "tokio-util", ] @@ -3493,6 +3498,15 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "manual_future" version = "0.1.1" diff --git a/datadog-profiling-ffi/Cargo.toml b/datadog-profiling-ffi/Cargo.toml index 275959d42c..b57eb26b51 100644 --- a/datadog-profiling-ffi/Cargo.toml +++ b/datadog-profiling-ffi/Cargo.toml @@ -35,6 +35,7 @@ build_common = { path = "../build-common" } [dependencies] anyhow = "1.0" +crossbeam-channel = "0.5.15" data-pipeline-ffi = { path = "../data-pipeline-ffi", default-features = false, optional = true } datadog-crashtracker-ffi = { path = "../datadog-crashtracker-ffi", default-features = false, optional = true} datadog-library-config-ffi = { path = "../datadog-library-config-ffi", default-features = false, optional = true } @@ -50,4 +51,10 @@ hyper = { version = "1.6", features = ["http1", "client"] } libc = "0.2" serde_json = { version = "1.0" } symbolizer-ffi = { path = "../symbolizer-ffi", optional = true, default-features = false } +tokio = { version = "1.36", features = ["sync", "rt"] } tokio-util = "0.7.1" + +[dev-dependencies] +lz4_flex = "0.11.3" +prost = "0.13.5" +datadog-profiling-protobuf = { path = "../datadog-profiling-protobuf" } diff --git a/datadog-profiling-ffi/src/lib.rs b/datadog-profiling-ffi/src/lib.rs index b070cce1a8..0391c97d43 100644 --- a/datadog-profiling-ffi/src/lib.rs +++ b/datadog-profiling-ffi/src/lib.rs @@ -11,7 +11,8 @@ pub use symbolizer_ffi::*; mod exporter; -mod profiles; +mod manager; +pub mod profiles; mod string_storage; // re-export crashtracker ffi @@ -37,3 +38,17 @@ pub use datadog_log_ffi::*; // re-export tracer metadata functions #[cfg(feature = "ddcommon-ffi")] pub use ddcommon_ffi::*; + +pub use manager::*; + +// Re-export for integration tests +pub use crate::manager::ffi_api::{ + ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_parent, ddog_prof_ProfilerManager_start, + ddog_prof_ProfilerManager_terminate, +}; +pub use crate::manager::{ManagedSampleCallbacks, ProfilerManagerConfig, SendSample}; +pub use crate::profiles::datatypes::{ + ddog_prof_Profile_new, Function, Location, ProfileNewResult, Sample, ValueType, +}; diff --git a/datadog-profiling-ffi/src/manager/client.rs b/datadog-profiling-ffi/src/manager/client.rs new file mode 100644 index 0000000000..465ac61f7c --- /dev/null +++ b/datadog-profiling-ffi/src/manager/client.rs @@ -0,0 +1,49 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; +use std::{ffi::c_void, sync::atomic::AtomicBool}; + +use crossbeam_channel::SendError; + +use super::ClientSampleChannels; +use super::SendSample; + +#[derive(Debug, Clone)] +pub struct ManagedProfilerClient { + channels: ClientSampleChannels, + is_shutdown: Arc, +} + +impl ManagedProfilerClient { + pub fn new(channels: ClientSampleChannels, is_shutdown: Arc) -> Self { + Self { + channels, + is_shutdown, + } + } + + /// # Safety + /// The caller must ensure that: + /// 1. The sample pointer is valid and points to a properly initialized sample + /// 2. The caller transfers ownership of the sample to this function + /// - The sample is not being used by any other thread + /// - The sample must not be accessed by the caller after this call + /// - The manager will either free the sample or recycle it back + /// 3. The sample will be properly cleaned up if it cannot be sent + pub unsafe fn send_sample(&self, sample: *mut c_void) -> Result<(), SendError> { + if self.is_shutdown.load(std::sync::atomic::Ordering::SeqCst) { + return Err(SendError(unsafe { SendSample::new(sample) })); + } + self.channels.send_sample(sample) + } + + pub fn try_recv_recycled( + &self, + ) -> Result<*mut std::ffi::c_void, crossbeam_channel::TryRecvError> { + if self.is_shutdown.load(std::sync::atomic::Ordering::SeqCst) { + return Err(crossbeam_channel::TryRecvError::Disconnected); + } + self.channels.try_recv_recycled() + } +} diff --git a/datadog-profiling-ffi/src/manager/ffi_api.rs b/datadog-profiling-ffi/src/manager/ffi_api.rs new file mode 100644 index 0000000000..2ad7e274b1 --- /dev/null +++ b/datadog-profiling-ffi/src/manager/ffi_api.rs @@ -0,0 +1,193 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::manager::{ + profiler_manager::{ + ManagedSampleCallbacks, ManagerCallbacks, ProfilerManager, ProfilerManagerConfig, + }, + ManagedProfilerClient, +}; +use crate::profiles::datatypes::{Profile, ProfilePtrExt}; +use crossbeam_channel::TryRecvError; +use datadog_profiling::internal; +use ddcommon_ffi::{ + wrap_with_ffi_result, wrap_with_void_ffi_result, Handle, Result as FFIResult, ToInner, + VoidResult, +}; +use function_name::named; +use std::ffi::c_void; +use tokio_util::sync::CancellationToken; + +/// # Safety +/// - The caller is responsible for eventually calling the appropriate shutdown and cleanup +/// functions. +/// - The sample_callbacks must remain valid for the lifetime of the profiler. +/// - This function is not thread-safe. +/// - This function takes ownership of the profile. The profile must not be used after this call. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_start( + profile: *mut Profile, + cpu_sampler_callback: extern "C" fn(*mut internal::Profile), + upload_callback: extern "C" fn(*mut Handle, &mut Option), + sample_callbacks: ManagedSampleCallbacks, + config: ProfilerManagerConfig, +) -> FFIResult> { + wrap_with_ffi_result!({ + let internal_profile = *profile.take()?; + let callbacks = ManagerCallbacks { + cpu_sampler_callback, + upload_callback, + sample_callbacks, + }; + let client = ProfilerManager::start(internal_profile, callbacks, config)?; + anyhow::Ok(Handle::from(client)) + }) +} + +/// # Safety +/// - The handle must have been returned by ddog_prof_ProfilerManager_start and not yet dropped. +/// - The caller must ensure that: +/// 1. The sample pointer is valid and points to a properly initialized sample +/// 2. The caller transfers ownership of the sample to this function +/// - The sample is not being used by any other thread +/// - The sample must not be accessed by the caller after this call +/// - The manager will either free the sample or recycle it back +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_enqueue_sample( + mut handle: *mut Handle, + sample_ptr: *mut c_void, +) -> VoidResult { + wrap_with_void_ffi_result!({ + handle + .to_inner_mut()? + .send_sample(sample_ptr) + .map_err(|e| anyhow::anyhow!("Failed to send sample: {:?}", e))?; + }) +} + +/// Attempts to receive a recycled sample from the profiler manager. +/// +/// This function will: +/// - Return a valid sample pointer if a recycled sample is available +/// - Return a null pointer if the queue is empty (this is a valid success case) +/// - Return an error if the channel is disconnected +/// +/// The caller should check if the returned pointer is null to determine if there were no samples +/// available. +/// +/// # Safety +/// - The handle must have been returned by ddog_prof_ProfilerManager_start and not yet dropped. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_try_recv_recycled( + mut handle: *mut Handle, +) -> FFIResult<*mut c_void> { + wrap_with_ffi_result!({ + match handle.to_inner_mut()?.try_recv_recycled() { + Ok(sample_ptr) => anyhow::Ok(sample_ptr), + Err(TryRecvError::Empty) => anyhow::Ok(std::ptr::null_mut()), + Err(TryRecvError::Disconnected) => Err(anyhow::anyhow!("Channel disconnected")), + } + }) +} + +/// Pauses the global profiler manager, shutting down the current instance and storing the profile. +/// The manager can be restarted later using restart functions. +/// +/// # Safety +/// - This function is thread-safe and can be called from any thread. +/// - The manager must be in running state. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_pause() -> VoidResult { + wrap_with_void_ffi_result!({ + ProfilerManager::pause().context("Failed to pause global manager")?; + }) +} + +/// Restarts the profiler manager in the parent process after a fork. +/// This preserves the profile data from before the pause. +/// +/// # Safety +/// - This function is thread-safe and can be called from any thread. +/// - The manager must be in paused state. +/// - This should be called in the parent process after a fork. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_restart_in_parent( +) -> FFIResult> { + wrap_with_ffi_result!({ + let client = + ProfilerManager::restart_in_parent().context("Failed to restart manager in parent")?; + anyhow::Ok(Handle::from(client)) + }) +} + +/// Restarts the profiler manager in the child process after a fork. +/// This discards the profile data from before the pause and starts fresh. +/// +/// # Safety +/// - This function is thread-safe and can be called from any thread. +/// - The manager must be in paused state. +/// - This should be called in the child process after a fork. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_restart_in_child( +) -> FFIResult> { + wrap_with_ffi_result!({ + let client = + ProfilerManager::restart_in_child().context("Failed to restart manager in child")?; + anyhow::Ok(Handle::from(client)) + }) +} + +/// Terminates the global profiler manager and returns the final profile. +/// This should be called when the profiler is no longer needed. +/// +/// # Safety +/// - This function is thread-safe and can be called from any thread. +/// - The manager must be in running or paused state. +/// - The returned profile handle must be properly managed by the caller. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_terminate( +) -> FFIResult> { + wrap_with_ffi_result!({ + let profile = ProfilerManager::terminate().context("Failed to terminate global manager")?; + anyhow::Ok(Handle::from(profile)) + }) +} + +/// Drops a profiler client handle. +/// This only drops the client handle and does not affect the global manager state. +/// +/// # Safety +/// - The handle must have been returned by ddog_prof_ProfilerManager_start and not yet dropped. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerClient_drop( + mut handle: *mut Handle, +) -> VoidResult { + wrap_with_void_ffi_result!({ + handle + .take() + .context("Failed to drop profiler client handle")?; + }) +} + +/// Resets the global profiler manager state to uninitialized. +/// This is intended for testing purposes only and should not be used in production. +/// +/// # Safety +/// - This function is thread-safe and can be called from any thread. +/// - This function will forcefully reset the state without proper cleanup. +/// - This should only be used in test environments. +#[no_mangle] +#[named] +pub unsafe extern "C" fn ddog_prof_ProfilerManager_reset_for_testing() -> VoidResult { + wrap_with_void_ffi_result!({ + ProfilerManager::reset_for_testing().map_err(|msg| anyhow::anyhow!(msg))?; + }) +} diff --git a/datadog-profiling-ffi/src/manager/ffi_utils.rs b/datadog-profiling-ffi/src/manager/ffi_utils.rs new file mode 100644 index 0000000000..d07b3c5a53 --- /dev/null +++ b/datadog-profiling-ffi/src/manager/ffi_utils.rs @@ -0,0 +1,38 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::num::NonZeroI64; + +use crate::profiles::datatypes::{ProfileResult, Sample}; +use anyhow::Context; +use datadog_profiling::internal; + +/// # Safety +/// The `profile` ptr must point to a valid internal::Profile object. +/// All pointers inside the `sample` need to be valid for the duration of this call. +/// This call is _NOT_ thread-safe. +#[must_use] +#[no_mangle] +pub unsafe extern "C" fn ddog_prof_Profile_add_internal( + profile: *mut internal::Profile, + sample: Sample, + timestamp: Option, +) -> ProfileResult { + (|| { + let profile = profile + .as_mut() + .ok_or_else(|| anyhow::anyhow!("profile pointer was null"))?; + let uses_string_ids = sample + .labels + .first() + .is_some_and(|label| label.key.is_empty() && label.key_id.value > 0); + + if uses_string_ids { + profile.add_string_id_sample(sample.into(), timestamp) + } else { + profile.add_sample(sample.try_into()?, timestamp) + } + })() + .context("ddog_prof_Profile_add_internal failed") + .into() +} diff --git a/datadog-profiling-ffi/src/manager/mod.rs b/datadog-profiling-ffi/src/manager/mod.rs new file mode 100644 index 0000000000..c51d454436 --- /dev/null +++ b/datadog-profiling-ffi/src/manager/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 +#![allow(unused_variables)] +#![allow(dead_code)] +#![allow(clippy::todo)] + +mod client; +pub mod ffi_api; +mod ffi_utils; +mod profiler_manager; +mod samples; +#[cfg(test)] +mod tests; + +pub use client::ManagedProfilerClient; +pub use profiler_manager::{ + ManagedProfilerController, ManagedSampleCallbacks, ProfilerManager, ProfilerManagerConfig, +}; +pub use samples::{ClientSampleChannels, SendSample}; + +// Re-export FFI functions for integration tests +pub use ffi_api::*; diff --git a/datadog-profiling-ffi/src/manager/profiler_manager.rs b/datadog-profiling-ffi/src/manager/profiler_manager.rs new file mode 100644 index 0000000000..70f90f73d3 --- /dev/null +++ b/datadog-profiling-ffi/src/manager/profiler_manager.rs @@ -0,0 +1,457 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![allow(clippy::unwrap_used)] + +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use crossbeam_channel::{select_biased, tick, Receiver, Sender}; +use datadog_profiling::internal; +use ddcommon_ffi::Handle; +use tokio_util::sync::CancellationToken; + +use super::client::ManagedProfilerClient; +use super::samples::{ClientSampleChannels, ManagerSampleChannels, SendSample}; +use crate::profiles::datatypes::Sample; + +/// Holds the callbacks needed to restart the profile manager +#[derive(Copy, Clone)] +pub struct ManagerCallbacks { + pub cpu_sampler_callback: extern "C" fn(*mut internal::Profile), + pub upload_callback: + extern "C" fn(*mut Handle, &mut Option), + pub sample_callbacks: ManagedSampleCallbacks, +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct ProfilerManagerConfig { + pub channel_depth: usize, + pub cpu_sampling_interval_ms: u64, + pub upload_interval_ms: u64, +} + +impl Default for ProfilerManagerConfig { + fn default() -> Self { + Self { + channel_depth: 10, + cpu_sampling_interval_ms: 100, // 100ms + upload_interval_ms: 60000, // 1 minute + } + } +} + +#[repr(C)] +#[derive(Copy, Clone)] +pub struct ManagedSampleCallbacks { + converter: extern "C" fn(&SendSample) -> Sample, + reset: extern "C" fn(&mut SendSample), + drop: extern "C" fn(SendSample), +} + +impl ManagedSampleCallbacks { + pub fn new( + converter: extern "C" fn(&SendSample) -> Sample, + reset: extern "C" fn(&mut SendSample), + drop: extern "C" fn(SendSample), + ) -> Self { + Self { + converter, + reset, + drop, + } + } +} + +/// Controller for managing the profiler manager lifecycle +pub struct ManagedProfilerController { + handle: JoinHandle<()>, + shutdown_sender: Sender<()>, + profile_result_receiver: Receiver, + is_shutdown: Arc, +} + +impl ManagedProfilerController { + pub fn new( + handle: JoinHandle<()>, + shutdown_sender: Sender<()>, + profile_result_receiver: Receiver, + is_shutdown: Arc, + ) -> Self { + Self { + handle, + shutdown_sender, + profile_result_receiver, + is_shutdown, + } + } + + pub fn shutdown(self) -> Result { + anyhow::ensure!( + !self.is_shutdown.load(std::sync::atomic::Ordering::SeqCst), + "Profiler manager is already shutdown" + ); + self.shutdown_sender.send(())?; + + // Wait for the profile result from the manager thread + let profile = self + .profile_result_receiver + .recv() + .map_err(|e| anyhow::anyhow!("Failed to receive profile result: {e}"))?; + + // Wait for the manager thread to finish + self.handle + .join() + .map_err(|e| anyhow::anyhow!("Failed to join manager thread: {:?}", e))?; + + Ok(profile) + } +} + +/// Global state for the profile manager during fork operations +enum ManagerState { + /// Manager has not been initialized yet + Uninitialized, + /// Manager is running with active profiler client and controller + Running { + client: ManagedProfilerClient, + controller: ManagedProfilerController, + config: ProfilerManagerConfig, + callbacks: ManagerCallbacks, + }, + /// Manager is paused (shutdown) with stored profile + Paused { + profile: Box, + config: ProfilerManagerConfig, + callbacks: ManagerCallbacks, + }, + /// Manager is in a temporarily invalid state during transitions + Invalid, +} + +// Global state for the profile manager +static MANAGER_STATE: Mutex = Mutex::new(ManagerState::Uninitialized); + +pub struct ProfilerManager { + channels: ManagerSampleChannels, + cpu_ticker: Receiver, + upload_ticker: Receiver, + shutdown_receiver: Receiver<()>, + profile: internal::Profile, + callbacks: ManagerCallbacks, + cancellation_token: CancellationToken, + upload_sender: Sender, + upload_thread: JoinHandle<()>, + is_shutdown: Arc, + profile_result_sender: Sender, +} + +impl ProfilerManager { + // --- Member functions (instance methods) --- + fn handle_sample( + &mut self, + raw_sample: Result, + ) -> Result<()> { + let mut sample = raw_sample?; + let converted_sample = (self.callbacks.sample_callbacks.converter)(&sample); + let add_result = self.profile.add_sample(converted_sample.try_into()?, None); + (self.callbacks.sample_callbacks.reset)(&mut sample); + self.channels + .recycled_samples_sender + .send(sample) + .map_or_else(|e| (self.callbacks.sample_callbacks.drop)(e.0), |_| ()); + add_result + } + + fn handle_cpu_tick(&mut self) { + (self.callbacks.cpu_sampler_callback)(&mut self.profile); + } + + fn handle_upload_tick(&mut self) -> Result<()> { + let old_profile = self.profile.reset_and_return_previous()?; + self.upload_sender + .send(old_profile) + .map_err(|e| anyhow::anyhow!("Failed to send profile for upload: {e}"))?; + Ok(()) + } + + /// # Safety + /// - The caller must ensure that the callbacks remain valid for the lifetime of the profiler. + /// - The callbacks must be thread-safe. + pub fn handle_shutdown(mut self) { + // Mark as shutdown + self.is_shutdown + .store(true, std::sync::atomic::Ordering::SeqCst); + + // Cancel any ongoing upload + self.cancellation_token.cancel(); + + // Drop the sender to signal the upload thread that no more messages will be sent + // This is necessary to allow the upload thread to exit its message processing loop + drop(self.upload_sender); + + // Process any remaining samples + while let Ok(sample) = self.channels.samples_receiver.try_recv() { + let converted_sample = (self.callbacks.sample_callbacks.converter)(&sample); + if let Ok(s) = converted_sample.try_into() { + let _ = self.profile.add_sample(s, None); + } + (self.callbacks.sample_callbacks.drop)(sample); + } + + // Drain recycled samples + while let Ok(sample) = self.channels.recycled_samples_receiver.try_recv() { + (self.callbacks.sample_callbacks.drop)(sample); + } + + // Wait for the upload thread to finish + if let Err(e) = self.upload_thread.join() { + eprintln!("Error joining upload thread: {e:?}"); + } + + // Send the profile through the channel + let _ = self.profile_result_sender.send(self.profile); + } + + /// # Safety + /// - The caller must ensure that the callbacks remain valid for the lifetime of the profiler. + /// - The callbacks must be thread-safe. + pub fn main(mut self) { + loop { + select_biased! { + // Prioritize shutdown signal to ensure quick response to shutdown requests + recv(self.shutdown_receiver) -> _ => { + self.handle_shutdown(); + return; + }, + recv(self.cpu_ticker) -> msg => { + self.handle_cpu_tick(); + }, + recv(self.channels.samples_receiver) -> raw_sample => { + let _ = self.handle_sample(raw_sample) + .map_err(|e| eprintln!("Failed to process sample: {e}")); + }, + recv(self.upload_ticker) -> msg => { + let _ = self.handle_upload_tick() + .map_err(|e| eprintln!("Failed to handle upload: {e}")); + }, + } + } + } +} + +impl ProfilerManager { + // --- Global functions (static methods) --- + /// Starts a new profile manager and stores the global state. + /// Returns the client for external use. + pub fn start( + profile: internal::Profile, + callbacks: ManagerCallbacks, + config: ProfilerManagerConfig, + ) -> Result { + let mut state = MANAGER_STATE.lock().map_err(|e| anyhow::anyhow!("{}", e))?; + + // Check if manager is already initialized + anyhow::ensure!( + matches!(&*state, ManagerState::Uninitialized), + "Manager is already initialized or in invalid state" + ); + + let (client, running_state) = Self::start_internal(profile, callbacks, config)?; + *state = running_state; + + Ok(client) + } + + /// Internal function that handles the actual startup logic. + /// This function does not acquire the lock and returns the state to be stored. + fn start_internal( + profile: internal::Profile, + callbacks: ManagerCallbacks, + config: ProfilerManagerConfig, + ) -> Result<(ManagedProfilerClient, ManagerState)> { + let (client_channels, manager_channels) = ClientSampleChannels::new(config.channel_depth); + let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); + let (upload_sender, upload_receiver) = crossbeam_channel::bounded(2); + let (profile_result_sender, profile_result_receiver) = crossbeam_channel::bounded(1); + + let cpu_ticker = tick(Duration::from_millis(config.cpu_sampling_interval_ms)); + let upload_ticker = tick(Duration::from_millis(config.upload_interval_ms)); + + // Create a single cancellation token for all uploads + let cancellation_token = CancellationToken::new(); + + // Create shared shutdown state + let is_shutdown = Arc::new(AtomicBool::new(false)); + + // Spawn the upload thread + let mut token = Some(cancellation_token.clone()); + let upload_thread = std::thread::spawn(move || { + while let Ok(profile) = upload_receiver.recv() { + let mut handle = Handle::from(profile); + (callbacks.upload_callback)(&mut handle, &mut token); + } + }); + + let manager = Self { + channels: manager_channels, + cpu_ticker, + upload_ticker, + shutdown_receiver, + profile, + callbacks, + cancellation_token, + upload_sender, + upload_thread, + is_shutdown: is_shutdown.clone(), + profile_result_sender, + }; + + let handle = std::thread::spawn(move || manager.main()); + + let client = ManagedProfilerClient::new(client_channels, is_shutdown.clone()); + + let controller = ManagedProfilerController::new( + handle, + shutdown_sender, + profile_result_receiver, + is_shutdown, + ); + + let running_state = ManagerState::Running { + client: client.clone(), + controller, + config, + callbacks, + }; + + Ok((client, running_state)) + } + + pub fn pause() -> Result<()> { + let mut state = MANAGER_STATE.lock().map_err(|e| anyhow::anyhow!("{}", e))?; + + // Extract the running state and replace with invalid during transition + let ManagerState::Running { + controller, + config, + callbacks, + .. + } = std::mem::replace(&mut *state, ManagerState::Invalid) + else { + // TODO: Consider cleanup when global state is unexpected (e.g., if state is Invalid or + // contains stale resources) + anyhow::bail!("Manager is not in running state"); + }; + + let profile = controller.shutdown()?; + + *state = ManagerState::Paused { + profile: Box::new(profile), + config, + callbacks, + }; + + Ok(()) + } + + pub fn restart_in_parent() -> Result { + let mut state = MANAGER_STATE.lock().map_err(|e| anyhow::anyhow!("{}", e))?; + + let ManagerState::Paused { + profile, + config, + callbacks, + } = std::mem::replace(&mut *state, ManagerState::Invalid) + else { + // TODO: Consider cleanup when global state is unexpected (e.g., if state is Invalid or + // contains stale resources) + anyhow::bail!("Manager is not in paused state"); + }; + + let (client, running_state) = Self::start_internal(*profile, callbacks, config)?; + *state = running_state; + + Ok(client) + } + + pub fn restart_in_child() -> Result { + let mut state = MANAGER_STATE.lock().map_err(|e| anyhow::anyhow!("{}", e))?; + + let ManagerState::Paused { + mut profile, + config, + callbacks, + } = std::mem::replace(&mut *state, ManagerState::Invalid) + else { + // TODO: Consider cleanup when global state is unexpected (e.g., if state is Invalid or + // contains stale resources) + anyhow::bail!("Manager is not in paused state"); + }; + + // Reset the profile, discarding the previous one + let _ = profile.reset_and_return_previous()?; + + let (client, running_state) = Self::start_internal(*profile, callbacks, config)?; + *state = running_state; + + Ok(client) + } + + /// Terminates the global profile manager and returns the final profile. + /// This should be called when the profiler is no longer needed. + pub fn terminate() -> Result { + let mut state = MANAGER_STATE.lock().map_err(|e| anyhow::anyhow!("{}", e))?; + + // Extract the profile and replace with invalid during transition + let profile = match std::mem::replace(&mut *state, ManagerState::Invalid) { + ManagerState::Running { controller, .. } => { + // Shutdown the controller and get the profile + controller.shutdown()? + } + ManagerState::Paused { profile, .. } => { + // Return the stored profile + *profile + } + _ => { + // TODO: Consider cleanup when global state is unexpected (e.g., if state is Invalid + // or contains stale resources) + anyhow::bail!("Manager is not in running or paused state") + } + }; + + // Set the final state to uninitialized + *state = ManagerState::Uninitialized; + + Ok(profile) + } + + /// Resets the global state to uninitialized. + /// This is useful for testing to ensure clean state between tests. + /// # Safety + /// This function should only be used in tests. It will forcefully reset the state + /// without proper cleanup, which could lead to resource leaks in production code. + /// If the lock cannot be acquired, this function will return an error (test-only behavior). + pub fn reset_for_testing() -> Result<(), String> { + match MANAGER_STATE.lock() { + Ok(mut state) => { + *state = ManagerState::Uninitialized; + Ok(()) + } + Err(e) => Err(format!("Failed to acquire state lock: {e}")), + } + } + + /// Checks if the manager is in an invalid state. + /// This can be used to detect when the manager is in a transitional state. + pub fn is_invalid() -> Result { + match MANAGER_STATE.lock() { + Ok(state) => Ok(matches!(&*state, ManagerState::Invalid)), + Err(e) => Err(format!("Failed to acquire state lock: {e}")), + } + } +} diff --git a/datadog-profiling-ffi/src/manager/samples.rs b/datadog-profiling-ffi/src/manager/samples.rs new file mode 100644 index 0000000000..44003ff33e --- /dev/null +++ b/datadog-profiling-ffi/src/manager/samples.rs @@ -0,0 +1,97 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::ffi::c_void; + +use crossbeam_channel::{Receiver, SendError, Sender, TryRecvError}; + +// TODO: this owns the memory. It should probably be a full wrapper, with a destructor. +#[repr(transparent)] +pub struct SendSample(*mut c_void); + +// SAFETY: This type is used to transfer ownership of a sample between threads via channels. +// The sample is only accessed by one thread at a time, and ownership is transferred along +// with the SendSample wrapper. The sample is either processed by the manager thread or +// recycled back to the original thread. +unsafe impl Send for SendSample {} + +impl SendSample { + /// # Safety + /// The caller must ensure that: + /// 1. The sample pointer is valid and points to a properly initialized sample + /// 2. The sample is not being used by any other thread + /// 3. The caller transfers ownership of the sample to this function + pub unsafe fn new(ptr: *mut c_void) -> Self { + Self(ptr) + } + + pub fn as_ptr(&self) -> *mut c_void { + self.0 + } +} + +pub struct ClientSampleChannels { + samples_sender: Sender, + recycled_samples_receiver: Receiver, +} + +impl std::fmt::Debug for ClientSampleChannels { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClientSampleChannels") + .field("samples_sender", &"Sender") + .field("recycled_samples_receiver", &"Receiver") + .finish() + } +} + +impl Clone for ClientSampleChannels { + fn clone(&self) -> Self { + Self { + samples_sender: self.samples_sender.clone(), + recycled_samples_receiver: self.recycled_samples_receiver.clone(), + } + } +} + +pub struct ManagerSampleChannels { + pub samples_receiver: Receiver, + pub recycled_samples_sender: Sender, + pub recycled_samples_receiver: Receiver, +} + +impl ClientSampleChannels { + pub fn new(channel_depth: usize) -> (Self, ManagerSampleChannels) { + let (samples_sender, samples_receiver) = crossbeam_channel::bounded(channel_depth); + let (recycled_samples_sender, recycled_samples_receiver) = + crossbeam_channel::bounded(channel_depth); + ( + Self { + samples_sender, + recycled_samples_receiver: recycled_samples_receiver.clone(), + }, + ManagerSampleChannels { + samples_receiver, + recycled_samples_sender, + recycled_samples_receiver, + }, + ) + } + + /// # Safety + /// The caller must ensure that: + /// 1. The sample pointer is valid and points to a properly initialized sample + /// 2. The caller transfers ownership of the sample to this function + /// - The sample is not being used by any other thread + /// - The sample must not be accessed by the caller after this call + /// - The sample will be properly cleaned up if it cannot be sent + /// 3. The sample will be properly cleaned up if it cannot be sent + pub unsafe fn send_sample(&self, sample: *mut c_void) -> Result<(), SendError> { + self.samples_sender.send(SendSample::new(sample)) + } + + pub fn try_recv_recycled(&self) -> Result<*mut c_void, TryRecvError> { + self.recycled_samples_receiver + .try_recv() + .map(|sample| sample.as_ptr()) + } +} diff --git a/datadog-profiling-ffi/src/manager/tests.rs b/datadog-profiling-ffi/src/manager/tests.rs new file mode 100644 index 0000000000..6ff8c75df7 --- /dev/null +++ b/datadog-profiling-ffi/src/manager/tests.rs @@ -0,0 +1,218 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::ffi::c_void; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use crate::profiles::datatypes::Sample; +use datadog_profiling::api::ValueType; +use datadog_profiling::internal::Profile; +use ddcommon_ffi::{Handle, ToInner}; +use tokio_util::sync::CancellationToken; + +use super::profiler_manager::{ + ManagedSampleCallbacks, ManagerCallbacks, ProfilerManager, ProfilerManagerConfig, +}; +use crate::manager::samples::SendSample; +use datadog_profiling_protobuf::prost_impls::Profile as ProstProfile; +use ddcommon_ffi::Slice; +use prost::Message; + +extern "C" fn test_cpu_sampler_callback(_profile: *mut Profile) {} + +static UPLOAD_COUNT: AtomicUsize = AtomicUsize::new(0); + +extern "C" fn test_upload_callback( + profile: *mut Handle, + _token: &mut Option, +) { + let upload_count = UPLOAD_COUNT.fetch_add(1, Ordering::SeqCst); + + // On the first upload (when upload_count is 0), verify the samples + if upload_count == 0 { + let profile = unsafe { *(*profile).take().unwrap() }; + verify_samples(profile); + } +} + +#[repr(C)] +struct TestSample<'a> { + values: [i64; 1], + locations: [crate::profiles::datatypes::Location<'a>; 1], +} + +fn create_test_sample(value: i64) -> TestSample<'static> { + let function = crate::profiles::datatypes::Function { + name: match value { + 42 => "function_1", + 43 => "function_2", + 44 => "function_3", + 45 => "function_4", + 46 => "function_5", + _ => "unknown_function", + } + .into(), + system_name: match value { + 42 => "function_1", + 43 => "function_2", + 44 => "function_3", + 45 => "function_4", + 46 => "function_5", + _ => "unknown_function", + } + .into(), + filename: "test.rs".into(), + ..Default::default() + }; + + TestSample { + values: [value], + locations: [crate::profiles::datatypes::Location { + function, + ..Default::default() + }], + } +} + +extern "C" fn test_converter(sample: &SendSample) -> Sample { + let test_sample = unsafe { &*(sample.as_ptr() as *const TestSample) }; + + Sample { + locations: Slice::from(&test_sample.locations[..]), + values: Slice::from(&test_sample.values[..]), + labels: Slice::empty(), + } +} + +extern "C" fn test_reset(sample: &mut SendSample) { + let test_sample = unsafe { &mut *(sample.as_ptr() as *mut TestSample) }; + test_sample.values[0] = 0; + test_sample.locations[0] = crate::profiles::datatypes::Location { + function: crate::profiles::datatypes::Function { + name: "".into(), + system_name: "".into(), + filename: "".into(), + ..Default::default() + }, + ..Default::default() + }; +} + +extern "C" fn test_drop(sample: SendSample) { + let test_sample = unsafe { Box::from_raw(sample.as_ptr() as *mut TestSample) }; + // Box will be dropped here, freeing the memory +} + +fn decode_pprof(encoded: &[u8]) -> ProstProfile { + let mut decoder = lz4_flex::frame::FrameDecoder::new(encoded); + let mut buf = Vec::new(); + use std::io::Read; + decoder.read_to_end(&mut buf).unwrap(); + ProstProfile::decode(buf.as_slice()).unwrap() +} + +fn roundtrip_to_pprof(profile: datadog_profiling::internal::Profile) -> ProstProfile { + let encoded = profile.serialize_into_compressed_pprof(None, None).unwrap(); + decode_pprof(&encoded.buffer) +} + +fn string_table_fetch(profile: &ProstProfile, id: i64) -> &str { + profile + .string_table + .get(id as usize) + .map(|s| s.as_str()) + .unwrap_or("") +} + +fn verify_samples(profile: datadog_profiling::internal::Profile) { + let pprof = roundtrip_to_pprof(profile); + println!("Number of samples in profile: {}", pprof.samples.len()); + println!( + "Sample values: {:?}", + pprof + .samples + .iter() + .map(|s| s.values[0]) + .collect::>() + ); + assert_eq!(pprof.samples.len(), 5); + + // Sort samples by their first value + let mut samples = pprof.samples.clone(); + samples.sort_by_key(|s| s.values[0]); + + // Check each sample's value and function name + for (i, sample) in samples.iter().enumerate() { + let value = 42 + i as i64; + assert_eq!(sample.values[0], value); + + // Get the function name from the location + let location_id = sample.location_ids[0]; + let location = pprof + .locations + .iter() + .find(|l| l.id == location_id) + .unwrap(); + let function_id = location.lines[0].function_id; + let function = pprof + .functions + .iter() + .find(|f| f.id == function_id) + .unwrap(); + let function_name = string_table_fetch(&pprof, function.name); + + let expected_function = match value { + 42 => "function_1", + 43 => "function_2", + 44 => "function_3", + 45 => "function_4", + 46 => "function_5", + _ => "unknown_function", + }; + assert_eq!(function_name, expected_function); + } +} + +#[test] +fn test_profiler_manager() { + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 100, // 100ms for faster testing + upload_interval_ms: 500, // 500ms for faster testing + }; + + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + let sample_types = [ValueType::new("samples", "count")]; + let profile = Profile::new(&sample_types, None); + let client = ProfilerManager::start( + profile, + ManagerCallbacks { + cpu_sampler_callback: test_cpu_sampler_callback, + upload_callback: test_upload_callback, + sample_callbacks, + }, + config, + ) + .unwrap(); + + // Send multiple samples + for i in 0..5 { + let test_sample = create_test_sample(42 + i as i64); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + unsafe { + client.send_sample(sample_ptr).unwrap(); + } + } + + // Give the manager thread time to process samples and trigger an upload + std::thread::sleep(std::time::Duration::from_millis(600)); + + // Verify samples were uploaded + assert_eq!(UPLOAD_COUNT.load(Ordering::SeqCst), 1); + + // Get the profile and verify it has no samples (they were consumed by the upload) + let profile = ProfilerManager::terminate().unwrap(); + let pprof = roundtrip_to_pprof(profile); + assert_eq!(pprof.samples.len(), 0); +} diff --git a/datadog-profiling-ffi/src/profiles/datatypes.rs b/datadog-profiling-ffi/src/profiles/datatypes.rs index 4956f1a34c..90e561a7e7 100644 --- a/datadog-profiling-ffi/src/profiles/datatypes.rs +++ b/datadog-profiling-ffi/src/profiles/datatypes.rs @@ -22,29 +22,52 @@ pub struct Profile { } impl Profile { - fn new(profile: internal::Profile) -> Self { + pub(crate) fn new(profile: internal::Profile) -> Self { Profile { inner: Box::into_raw(Box::new(profile)), } } +} - fn take(&mut self) -> Option> { - // Leaving a null will help with double-free issues that can - // arise in C. Of course, it's best to never get there in the - // first place! - let raw = std::mem::replace(&mut self.inner, std::ptr::null_mut()); +impl Drop for Profile { + fn drop(&mut self) { + // SAFETY: Profile's inner pointer is only set in new() and take(), and take() ensures + // the pointer is null after taking ownership. Since this is Drop, we know the Profile + // is being destroyed and won't be used again. + unsafe { drop(self.take()) } + } +} - if raw.is_null() { - None - } else { - Some(unsafe { Box::from_raw(raw) }) - } +impl ToInner for Profile { + unsafe fn to_inner_mut(&mut self) -> anyhow::Result<&mut internal::Profile> { + self.inner + .as_mut() + .context("inner pointer was null, indicates use after free") + } + + unsafe fn take(&mut self) -> anyhow::Result> { + let raw = std::mem::replace(&mut self.inner, std::ptr::null_mut()); + anyhow::ensure!( + !raw.is_null(), + "inner pointer was null, indicates use after free" + ); + Ok(Box::from_raw(raw)) } } -impl Drop for Profile { - fn drop(&mut self) { - drop(self.take()) +/// Extension trait for raw Profile pointers. +/// We need this trait because Rust's orphan rules prevent us from implementing methods directly +/// on raw pointers (*mut Profile). This trait provides a safe way to take ownership of a Profile +/// from a raw pointer while maintaining proper error handling. +pub trait ProfilePtrExt { + /// # Safety + /// The pointer must be non-null and point to a valid Profile that hasn't been dropped. + unsafe fn take(self) -> anyhow::Result>; +} + +impl ProfilePtrExt for *mut Profile { + unsafe fn take(self) -> anyhow::Result> { + self.as_mut().context("Null pointer")?.take() } } @@ -402,9 +425,17 @@ pub unsafe extern "C" fn ddog_prof_Profile_new( } /// Same as `ddog_profile_new` but also configures a `string_storage` for the profile. +/// +/// # Safety +/// - `sample_types` must be a valid slice of ValueType. +/// - `period`, if provided, must be a valid reference. +/// - `string_storage` must be a valid ManagedStringStorage. +/// - The caller is responsible for ensuring that all pointers remain valid for the duration of the +/// call. +/// +/// TODO: @ivoanjo Should this take a `*mut ManagedStringStorage` like Profile APIs do? #[no_mangle] #[must_use] -/// TODO: @ivoanjo Should this take a `*mut ManagedStringStorage` like Profile APIs do? pub unsafe extern "C" fn ddog_prof_Profile_with_string_storage( sample_types: Slice, period: Option<&Period>, @@ -759,6 +790,12 @@ pub unsafe extern "C" fn ddog_prof_Profile_serialize( .into() } +/// Returns a slice view of the given Vec. +/// +/// # Safety +/// - `vec` must be a valid reference to a ddcommon_ffi::Vec. +/// - The returned slice is only valid as long as the original Vec is valid and not mutated or +/// dropped. #[must_use] #[no_mangle] pub unsafe extern "C" fn ddog_Vec_U8_as_slice(vec: &ddcommon_ffi::Vec) -> Slice { diff --git a/datadog-profiling-ffi/src/profiles/mod.rs b/datadog-profiling-ffi/src/profiles/mod.rs index 86136a6fbf..b8fceee8d0 100644 --- a/datadog-profiling-ffi/src/profiles/mod.rs +++ b/datadog-profiling-ffi/src/profiles/mod.rs @@ -1,5 +1,5 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -mod datatypes; +pub mod datatypes; mod interning_api; diff --git a/datadog-profiling-ffi/tests/test_ffi_fork_child_only.rs b/datadog-profiling-ffi/tests/test_ffi_fork_child_only.rs new file mode 100644 index 0000000000..4f27a0cba0 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_fork_child_only.rs @@ -0,0 +1,185 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_child, ddog_prof_ProfilerManager_start, + ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, ProfileNewResult, + ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use std::ffi::c_void; +use test_utils::*; + +#[test] +fn test_ffi_fork_child_only() { + println!("[test] Starting fork child-only test"); + + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with short intervals for testing + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 100, // 100ms for faster testing + }; + + // Start the profiler manager + println!("[test] Starting profiler manager"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + // Send a sample before forking + println!("[test] Sending sample before fork"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully before fork"), + VoidResult::Err(e) => panic!("Failed to enqueue sample before fork: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Pause the profiler manager before forking + println!("[test] Pausing profiler manager before fork"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Fork the process + println!("[test] Forking process"); + match unsafe { libc::fork() } { + -1 => panic!("Failed to fork"), + 0 => { + // Child process - test restart_in_child + println!("[child] Child process started"); + + // Child should restart with fresh profile (discards previous data) + println!("[child] Restarting profiler manager in child"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_child() }; + let mut child_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager restarted successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to restart profiler manager in child: {e}") + } + }; + + // Send a sample in child process + println!("[child] Sending sample in child process"); + let child_sample = create_test_sample(100); + let child_sample_ptr = Box::into_raw(Box::new(child_sample)) as *mut c_void; + + let child_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample(&mut child_client_handle, child_sample_ptr) + }; + match child_enqueue_result { + VoidResult::Ok => println!("[child] Sample enqueued successfully in child"), + VoidResult::Err(e) => panic!("[child] Failed to enqueue sample in child: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Terminate the profiler manager in child + println!("[child] Terminating profiler manager in child"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager terminated successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to terminate profiler manager in child: {e}") + } + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[100]); + + // Drop the child client handle + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut child_client_handle) }; + match drop_result { + VoidResult::Ok => println!("[child] Child client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[child] Warning: failed to drop child client handle: {e}") + } + } + + println!("[child] Child process completed successfully"); + std::process::exit(0); + } + child_pid => { + // Parent process - just wait for child and clean up + println!("[parent] Parent process continuing, child PID: {child_pid}"); + + // Wait for child to complete + println!("[parent] Waiting for child process to complete"); + let mut status = 0; + let wait_result = unsafe { libc::waitpid(child_pid, &mut status, 0) }; + if wait_result == -1 { + panic!("[parent] Failed to wait for child process"); + } + + if libc::WIFEXITED(status) { + let exit_code = libc::WEXITSTATUS(status); + println!("[parent] Child process exited with code: {exit_code}"); + assert_eq!(exit_code, 0, "Child process should exit successfully"); + } else { + println!("[parent] Child process terminated by signal"); + } + + // Drop the original client handle + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[parent] Original client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop original client handle: {e}") + } + } + + println!("[parent] Parent process completed successfully"); + } + } +} diff --git a/datadog-profiling-ffi/tests/test_ffi_fork_data_preservation.rs b/datadog-profiling-ffi/tests/test_ffi_fork_data_preservation.rs new file mode 100644 index 0000000000..8091ca2e64 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_fork_data_preservation.rs @@ -0,0 +1,350 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_child, ddog_prof_ProfilerManager_restart_in_parent, + ddog_prof_ProfilerManager_start, ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, + ProfileNewResult, ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use std::ffi::c_void; +use std::sync::atomic::{AtomicUsize, Ordering}; +use test_utils::*; + +// Global counter to track uploads in different processes +static PARENT_UPLOAD_COUNT: AtomicUsize = AtomicUsize::new(0); +static CHILD_UPLOAD_COUNT: AtomicUsize = AtomicUsize::new(0); + +pub extern "C" fn parent_upload_callback( + _profile: *mut ddcommon_ffi::Handle, + _token: &mut std::option::Option, +) { + let upload_count = PARENT_UPLOAD_COUNT.fetch_add(1, Ordering::SeqCst); + println!("[parent_upload_callback] called, count: {upload_count}"); +} + +pub extern "C" fn child_upload_callback( + _profile: *mut ddcommon_ffi::Handle, + _token: &mut std::option::Option, +) { + let upload_count = CHILD_UPLOAD_COUNT.fetch_add(1, Ordering::SeqCst); + println!("[child_upload_callback] called, count: {upload_count}"); +} + +#[test] +fn test_ffi_fork_data_preservation() { + println!("[test] Starting fork data preservation test"); + + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload counts for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + PARENT_UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + CHILD_UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with very short intervals to trigger uploads quickly + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 10, // 10ms for very fast testing + upload_interval_ms: 100_000, // 100 seconds - prevent uploads + }; + + // Start the profiler manager + println!("[test] Starting profiler manager"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + // Send multiple samples before forking to accumulate data + println!("[test] Sending samples before fork"); + for i in 0..5 { + let test_sample = create_test_sample(42 + i); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + match enqueue_result { + VoidResult::Ok => println!("[test] Sample {i} enqueued successfully before fork"), + VoidResult::Err(e) => panic!("Failed to enqueue sample {i} before fork: {e}"), + } + } + + // Give the manager time to process and potentially upload + println!("[test] Waiting for processing before fork"); + std::thread::sleep(std::time::Duration::from_millis(100)); + + // Pause the profiler manager before forking + println!("[test] Pausing profiler manager before fork"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Fork the process + println!("[test] Forking process"); + match unsafe { libc::fork() } { + -1 => panic!("Failed to fork"), + 0 => { + // Child process - should restart with fresh profile (discards previous data) + println!("[child] Child process started"); + + // Child should restart with fresh profile (discards previous data) + println!("[child] Restarting profiler manager in child"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_child() }; + let mut child_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager restarted successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to restart profiler manager in child: {e}") + } + }; + + // Send a few samples in child process + println!("[child] Sending samples in child process"); + for i in 0..3 { + let child_sample = create_test_sample(100 + i); + let child_sample_ptr = Box::into_raw(Box::new(child_sample)) as *mut c_void; + + let child_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample( + &mut child_client_handle, + child_sample_ptr, + ) + }; + match child_enqueue_result { + VoidResult::Ok => { + println!("[child] Sample {i} enqueued successfully in child") + } + VoidResult::Err(e) => { + panic!("[child] Failed to enqueue sample {i} in child: {e}") + } + } + } + + // Give the manager time to process and potentially upload + println!("[child] Waiting for processing in child"); + std::thread::sleep(std::time::Duration::from_millis(100)); + + // Terminate the profiler manager in child (added back) + println!("[child] Terminating profiler manager in child"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut _final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager terminated successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to terminate profiler manager in child: {e}") + } + }; + // Extract the profile and assert expected values + let profile_result = unsafe { _final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[100, 101, 102]); + + // Drop the child client handle + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut child_client_handle) }; + match drop_result { + VoidResult::Ok => println!("[child] Child client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[child] Warning: failed to drop child client handle: {e}") + } + } + + println!("[child] Child process completed successfully"); + std::process::exit(0); + } + child_pid => { + // Parent process - should restart preserving profile data + println!("[parent] Parent process continuing, child PID: {child_pid}"); + + // Parent should restart preserving profile data + println!("[parent] Restarting profiler manager in parent"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_parent() }; + let mut parent_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager restarted successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to restart profiler manager in parent: {e}") + } + }; + + // Send a few more samples in parent process + println!("[parent] Sending samples in parent process"); + for i in 0..3 { + let parent_sample = create_test_sample(200 + i); + let parent_sample_ptr = Box::into_raw(Box::new(parent_sample)) as *mut c_void; + + let parent_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample( + &mut parent_client_handle, + parent_sample_ptr, + ) + }; + match parent_enqueue_result { + VoidResult::Ok => { + println!("[parent] Sample {i} enqueued successfully in parent"); + // Add debugging: try to print the profile contents after enqueue + // (This would require an FFI call to extract the profile, which we don't + // have, so just print a marker) + println!("[parent] (debug) Enqueued sample value {}", 200 + i); + } + VoidResult::Err(e) => { + panic!("[parent] Failed to enqueue sample {i} in parent: {e}") + } + } + } + + // Give the manager time to process and potentially upload + println!("[parent] Waiting for processing in parent"); + std::thread::sleep(std::time::Duration::from_millis(100)); + + // Print a marker before terminate + println!("[parent] (debug) About to terminate, expecting to see post-fork samples in profile"); + + // Wait for child to complete + println!("[parent] Waiting for child process to complete"); + let mut status = 0; + let wait_result = unsafe { libc::waitpid(child_pid, &mut status, 0) }; + if wait_result == -1 { + panic!("[parent] Failed to wait for child process"); + } + + if libc::WIFEXITED(status) { + let exit_code = libc::WEXITSTATUS(status); + println!("[parent] Child process exited with code: {exit_code}"); + assert_eq!(exit_code, 0, "Child process should exit successfully"); + } else { + println!("[parent] Child process terminated by signal {status}"); + } + + println!( + "[parent] Child process completed, parent profile state should still be intact" + ); + + // Terminate the profiler manager in parent + println!("[parent] About to terminate profiler manager in parent"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager terminated successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to terminate profiler manager in parent: {e}") + } + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + let pprof = roundtrip_to_pprof(profile_result); + println!("[debug] Profile contains {} samples", pprof.samples.len()); + for (i, sample) in pprof.samples.iter().enumerate() { + println!("[debug] Sample {}: values = {:?}", i, sample.values); + } + + // Check pre-fork values + let mut found = [false; 5]; + for sample in &pprof.samples { + for (i, &expected) in [42, 43, 44, 45, 46].iter().enumerate() { + if sample.values.contains(&expected) { + found[i] = true; + } + } + } + for (i, &was_found) in found.iter().enumerate() { + assert!( + was_found, + "Expected pre-fork sample value {} not found in profile", + [42, 43, 44, 45, 46][i] + ); + } + + // Check for merged post-fork sample + let mut found_merged = false; + for sample in &pprof.samples { + if sample.values.contains(&603) { + // Check function name + if let Some(loc_id) = sample.location_ids.first() { + let loc_obj = pprof.locations.iter().find(|l| l.id == *loc_id); + if let Some(loc_obj) = loc_obj { + let fn_id = loc_obj.lines.first().map(|l| l.function_id); + if let Some(fn_id) = fn_id { + let fn_obj = pprof.functions.iter().find(|f| f.id == fn_id); + if let Some(fn_obj) = fn_obj { + // fn_obj.name is an index into the string table + let name_idx = fn_obj.name as usize; + if let Some(name) = pprof.string_table.get(name_idx) { + if name == "unknown_function" { + found_merged = true; + } + } + } + } + } + } + } + } + assert!(found_merged, "Expected merged post-fork sample value 603 with function name 'unknown_function' not found in profile"); + + // Drop the client handles + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[parent] Original client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop original client handle: {e}") + } + } + + let drop_result2 = unsafe { ddog_prof_ProfilerClient_drop(&mut parent_client_handle) }; + match drop_result2 { + VoidResult::Ok => println!("[parent] Parent client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop parent client handle: {e}") + } + } + + // Note: We don't require uploads in this test since we're using a long upload interval + // to prevent premature uploads that could interfere with data preservation testing. + // The test has already verified that samples are correctly preserved across fork + // boundaries. + let total_uploads = UPLOAD_COUNT.load(Ordering::SeqCst); + println!("[parent] Total uploads across all processes: {total_uploads}"); + + println!("[parent] Parent process completed successfully"); + } + } +} diff --git a/datadog-profiling-ffi/tests/test_ffi_fork_lifecycle.rs b/datadog-profiling-ffi/tests/test_ffi_fork_lifecycle.rs new file mode 100644 index 0000000000..6f4a98d793 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_fork_lifecycle.rs @@ -0,0 +1,243 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_child, ddog_prof_ProfilerManager_restart_in_parent, + ddog_prof_ProfilerManager_start, ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, + ProfileNewResult, ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use std::ffi::c_void; +use test_utils::*; + +#[test] +fn test_ffi_fork_lifecycle() { + println!("[test] Starting fork lifecycle test"); + + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with long intervals to prevent uploads during test + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 10000, // 10 seconds - prevent uploads during test + }; + + // Start the profiler manager + println!("[test] Starting profiler manager"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + // Send a sample before forking + println!("[test] Sending sample before fork"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully before fork"), + VoidResult::Err(e) => panic!("Failed to enqueue sample before fork: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Pause the profiler manager before forking + println!("[test] Pausing profiler manager before fork"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Fork the process + println!("[test] Forking process"); + match unsafe { libc::fork() } { + -1 => panic!("Failed to fork"), + 0 => { + // Child process + println!("[child] Child process started"); + + // Child should restart with fresh profile (discards previous data) + println!("[child] Restarting profiler manager in child"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_child() }; + let mut child_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager restarted successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to restart profiler manager in child: {e}") + } + }; + + // Send a sample in child process + println!("[child] Sending sample in child process"); + let child_sample = create_test_sample(100); + let child_sample_ptr = Box::into_raw(Box::new(child_sample)) as *mut c_void; + + let child_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample(&mut child_client_handle, child_sample_ptr) + }; + match child_enqueue_result { + VoidResult::Ok => println!("[child] Sample enqueued successfully in child"), + VoidResult::Err(e) => panic!("[child] Failed to enqueue sample in child: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Terminate the profiler manager in child + println!("[child] Terminating profiler manager in child"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager terminated successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to terminate profiler manager in child: {e}") + } + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[100]); + + // Drop the child client handle + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut child_client_handle) }; + match drop_result { + VoidResult::Ok => println!("[child] Child client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[child] Warning: failed to drop child client handle: {e}") + } + } + + println!("[child] Child process completed successfully"); + std::process::exit(0); + } + child_pid => { + // Parent process + println!("[parent] Parent process continuing, child PID: {child_pid}"); + + // Parent should restart preserving profile data + println!("[parent] Restarting profiler manager in parent"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_parent() }; + let mut parent_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager restarted successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to restart profiler manager in parent: {e}") + } + }; + + // Send a sample in parent process + println!("[parent] Sending sample in parent process"); + let parent_sample = create_test_sample(200); + let parent_sample_ptr = Box::into_raw(Box::new(parent_sample)) as *mut c_void; + + let parent_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample( + &mut parent_client_handle, + parent_sample_ptr, + ) + }; + match parent_enqueue_result { + VoidResult::Ok => println!("[parent] Sample enqueued successfully in parent"), + VoidResult::Err(e) => panic!("[parent] Failed to enqueue sample in parent: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Wait for child to complete + println!("[parent] Waiting for child process to complete"); + let mut status = 0; + let wait_result = unsafe { libc::waitpid(child_pid, &mut status, 0) }; + if wait_result == -1 { + panic!("[parent] Failed to wait for child process"); + } + + if libc::WIFEXITED(status) { + let exit_code = libc::WEXITSTATUS(status); + println!("[parent] Child process exited with code: {exit_code}"); + assert_eq!(exit_code, 0, "Child process should exit successfully"); + } else { + println!("[parent] Child process terminated by signal"); + } + + // Terminate the profiler manager in parent + println!("[parent] Terminating profiler manager in parent"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager terminated successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to terminate profiler manager in parent: {e}") + } + }; + + // Check that the expected samples are present in the final profile + // Parent should have both pre-fork (42) and post-fork (200) samples + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[42, 200]); + + // Drop the client handles + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[parent] Original client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop original client handle: {e}") + } + } + + let drop_result2 = unsafe { ddog_prof_ProfilerClient_drop(&mut parent_client_handle) }; + match drop_result2 { + VoidResult::Ok => println!("[parent] Parent client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop parent client handle: {e}") + } + } + + println!("[parent] Parent process completed successfully"); + } + } +} diff --git a/datadog-profiling-ffi/tests/test_ffi_fork_parent_child.rs b/datadog-profiling-ffi/tests/test_ffi_fork_parent_child.rs new file mode 100644 index 0000000000..2ecf38c6ac --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_fork_parent_child.rs @@ -0,0 +1,243 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_child, ddog_prof_ProfilerManager_restart_in_parent, + ddog_prof_ProfilerManager_start, ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, + ProfileNewResult, ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use std::ffi::c_void; + +use test_utils::*; + +#[test] +fn test_ffi_fork_parent_child() { + println!("[test] Starting fork parent/child test"); + + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with long upload interval to prevent premature uploads during test + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 10000, // 10 seconds - very long to prevent uploads during test + }; + + // Start the profiler manager + println!("[test] Starting profiler manager"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + // Send a sample before forking + println!("[test] Sending sample before fork"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully before fork"), + VoidResult::Err(e) => panic!("Failed to enqueue sample before fork: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Pause the profiler manager before forking + println!("[test] Pausing profiler manager before fork"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Fork the process + println!("[test] Forking process"); + match unsafe { libc::fork() } { + -1 => panic!("Failed to fork"), + 0 => { + // Child process + println!("[child] Child process started"); + + // Child should restart with fresh profile (discards previous data) + println!("[child] Restarting profiler manager in child"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_child() }; + let mut child_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager restarted successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to restart profiler manager in child: {e}") + } + }; + + // Send a sample in child process + println!("[child] Sending sample in child process"); + let child_sample = create_test_sample(100); + let child_sample_ptr = Box::into_raw(Box::new(child_sample)) as *mut c_void; + + let child_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample(&mut child_client_handle, child_sample_ptr) + }; + match child_enqueue_result { + VoidResult::Ok => println!("[child] Sample enqueued successfully in child"), + VoidResult::Err(e) => panic!("[child] Failed to enqueue sample in child: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Terminate the profiler manager in child + println!("[child] Terminating profiler manager in child"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[child] Profiler manager terminated successfully in child"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[child] Failed to terminate profiler manager in child: {e}") + } + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[100]); + + // Drop the child client handle + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut child_client_handle) }; + match drop_result { + VoidResult::Ok => println!("[child] Child client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[child] Warning: failed to drop child client handle: {e}") + } + } + + println!("[child] Child process completed successfully"); + std::process::exit(0); + } + child_pid => { + // Parent process + println!("[parent] Parent process continuing, child PID: {child_pid}"); + + // Parent should restart preserving profile data + println!("[parent] Restarting profiler manager in parent"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_parent() }; + let mut parent_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager restarted successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to restart profiler manager in parent: {e}") + } + }; + + // Send another sample in parent process + println!("[parent] Sending sample in parent process"); + let parent_sample = create_test_sample(200); + let parent_sample_ptr = Box::into_raw(Box::new(parent_sample)) as *mut c_void; + + let parent_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample( + &mut parent_client_handle, + parent_sample_ptr, + ) + }; + match parent_enqueue_result { + VoidResult::Ok => println!("[parent] Sample enqueued successfully in parent"), + VoidResult::Err(e) => panic!("[parent] Failed to enqueue sample in parent: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Wait for child to complete + println!("[parent] Waiting for child process to complete"); + let mut status = 0; + let wait_result = unsafe { libc::waitpid(child_pid, &mut status, 0) }; + if wait_result == -1 { + panic!("[parent] Failed to wait for child process"); + } + + if libc::WIFEXITED(status) { + let exit_code = libc::WEXITSTATUS(status); + println!("[parent] Child process exited with code: {exit_code}"); + assert_eq!(exit_code, 0, "Child process should exit successfully"); + } else { + println!("[parent] Child process terminated by signal"); + } + + // Terminate the profiler manager in parent + println!("[parent] Terminating profiler manager in parent"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager terminated successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to terminate profiler manager in parent: {e}") + } + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[42, 200]); + + // Drop the client handles + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[parent] Original client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop original client handle: {e}") + } + } + + let drop_result2 = unsafe { ddog_prof_ProfilerClient_drop(&mut parent_client_handle) }; + match drop_result2 { + VoidResult::Ok => println!("[parent] Parent client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop parent client handle: {e}") + } + } + + println!("[parent] Parent process completed successfully"); + } + } +} diff --git a/datadog-profiling-ffi/tests/test_ffi_fork_parent_only.rs b/datadog-profiling-ffi/tests/test_ffi_fork_parent_only.rs new file mode 100644 index 0000000000..5f5ec2936d --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_fork_parent_only.rs @@ -0,0 +1,185 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_parent, ddog_prof_ProfilerManager_start, + ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, ProfileNewResult, + ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use std::ffi::c_void; +use test_utils::*; + +#[test] +fn test_ffi_fork_parent_only() { + println!("[test] Starting fork parent-only test"); + + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with short intervals for testing + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 100, // 100ms for faster testing + }; + + // Start the profiler manager + println!("[test] Starting profiler manager"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + // Send a sample before forking + println!("[test] Sending sample before fork"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully before fork"), + VoidResult::Err(e) => panic!("Failed to enqueue sample before fork: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Pause the profiler manager before forking + println!("[test] Pausing profiler manager before fork"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Fork the process + println!("[test] Forking process"); + match unsafe { libc::fork() } { + -1 => panic!("Failed to fork"), + 0 => { + // Child process - just exit immediately + println!("[child] Child process started, exiting immediately"); + std::process::exit(0); + } + child_pid => { + // Parent process - test restart_in_parent + println!("[parent] Parent process continuing, child PID: {child_pid}"); + + // Parent should restart preserving profile data + println!("[parent] Restarting profiler manager in parent"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_parent() }; + let mut parent_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager restarted successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to restart profiler manager in parent: {e}") + } + }; + + // Send another sample in parent process + println!("[parent] Sending sample in parent process"); + let parent_sample = create_test_sample(200); + let parent_sample_ptr = Box::into_raw(Box::new(parent_sample)) as *mut c_void; + + let parent_enqueue_result = unsafe { + ddog_prof_ProfilerManager_enqueue_sample( + &mut parent_client_handle, + parent_sample_ptr, + ) + }; + match parent_enqueue_result { + VoidResult::Ok => println!("[parent] Sample enqueued successfully in parent"), + VoidResult::Err(e) => panic!("[parent] Failed to enqueue sample in parent: {e}"), + } + + // Give the manager time to process + std::thread::sleep(std::time::Duration::from_millis(50)); + + // Wait for child to complete + println!("[parent] Waiting for child process to complete"); + let mut status = 0; + let wait_result = unsafe { libc::waitpid(child_pid, &mut status, 0) }; + if wait_result == -1 { + panic!("[parent] Failed to wait for child process"); + } + + if libc::WIFEXITED(status) { + let exit_code = libc::WEXITSTATUS(status); + println!("[parent] Child process exited with code: {exit_code}"); + assert_eq!(exit_code, 0, "Child process should exit successfully"); + } else { + println!("[parent] Child process terminated by signal {status}"); + } + + // Terminate the profiler manager in parent + println!("[parent] Terminating profiler manager in parent"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[parent] Profiler manager terminated successfully in parent"); + handle + } + ddcommon_ffi::Result::Err(e) => { + panic!("[parent] Failed to terminate profiler manager in parent: {e}") + } + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[42, 200]); + + // Drop the client handles + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[parent] Original client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop original client handle: {e}") + } + } + + let drop_result2 = unsafe { ddog_prof_ProfilerClient_drop(&mut parent_client_handle) }; + match drop_result2 { + VoidResult::Ok => println!("[parent] Parent client handle dropped successfully"), + VoidResult::Err(e) => { + println!("[parent] Warning: failed to drop parent client handle: {e}") + } + } + + println!("[parent] Parent process completed successfully"); + } + } +} diff --git a/datadog-profiling-ffi/tests/test_ffi_lifecycle_basic.rs b/datadog-profiling-ffi/tests/test_ffi_lifecycle_basic.rs new file mode 100644 index 0000000000..34ba000843 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_lifecycle_basic.rs @@ -0,0 +1,120 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_pause, + ddog_prof_ProfilerManager_reset_for_testing, ddog_prof_ProfilerManager_restart_in_parent, + ddog_prof_ProfilerManager_start, ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, + ProfileNewResult, ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use test_utils::*; + +#[test] +fn test_ffi_lifecycle_basic() { + println!("[test] Starting basic lifecycle test"); + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + println!("[test] Profile created"); + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + println!("[test] Creating sample callbacks"); + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with very long intervals to avoid timer issues + println!("[test] Creating config"); + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 10000, // 10 seconds - very long + upload_interval_ms: 10000, // 10 seconds - very long + }; + + // Start the profiler manager using FFI + println!("[test] Calling ddog_prof_ProfilerManager_start"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + println!("[test] ddog_prof_ProfilerManager_start returned"); + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + println!("[test] Profiler manager started successfully"); + + // Pause immediately without sending any samples + println!("[test] Calling ddog_prof_ProfilerManager_pause"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + println!("[test] ddog_prof_ProfilerManager_pause returned"); + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Restart the profiler manager in parent (preserves profile data) + println!("[test] Calling ddog_prof_ProfilerManager_restart_in_parent"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_parent() }; + println!("[test] ddog_prof_ProfilerManager_restart_in_parent returned"); + let mut new_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[test] Profiler manager restarted successfully"); + handle + } + ddcommon_ffi::Result::Err(e) => panic!("Failed to restart profiler manager: {e}"), + }; + + // Terminate the profiler manager immediately + println!("[test] Calling ddog_prof_ProfilerManager_terminate"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[test] Profiler manager terminated successfully"); + handle + } + ddcommon_ffi::Result::Err(e) => panic!("Failed to terminate profiler manager: {e}"), + }; + + // Check that the profile is empty (no samples) + let profile_result = unsafe { final_profile_handle.take() }; + let pprof = roundtrip_to_pprof(profile_result); + assert_eq!(pprof.samples.len(), 0, "Profile should have no samples"); + + // Drop the client handles + println!("[test] Dropping first client handle"); + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[test] First client handle dropped successfully"), + VoidResult::Err(e) => println!("Warning: failed to drop first client handle: {e}"), + } + + println!("[test] Dropping second client handle"); + let drop_result2 = unsafe { ddog_prof_ProfilerClient_drop(&mut new_client_handle) }; + match drop_result2 { + VoidResult::Ok => println!("[test] Second client handle dropped successfully"), + VoidResult::Err(e) => println!("Warning: failed to drop second client handle: {e}"), + } + + println!("[test] Basic lifecycle test completed successfully"); +} diff --git a/datadog-profiling-ffi/tests/test_ffi_lifecycle_with_data.rs b/datadog-profiling-ffi/tests/test_ffi_lifecycle_with_data.rs new file mode 100644 index 0000000000..1fa82e7c29 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_lifecycle_with_data.rs @@ -0,0 +1,140 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_restart_in_parent, ddog_prof_ProfilerManager_start, + ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, ProfileNewResult, + ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use ddcommon_ffi::ToInner; +use std::ffi::c_void; +use test_utils::*; + +#[test] +fn test_ffi_lifecycle_with_data() { + println!("[test] Starting lifecycle test with data processing"); + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + println!("[test] Profile created"); + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + println!("[test] Creating sample callbacks"); + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with very short intervals for testing + println!("[test] Creating config"); + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 100, // 100ms for faster testing + }; + + // Start the profiler manager using FFI + println!("[test] Calling ddog_prof_ProfilerManager_start"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + println!("[test] ddog_prof_ProfilerManager_start returned"); + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + println!("[test] Profiler manager started successfully"); + + // Send a sample using FFI + println!("[test] Sending sample"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + println!("[test] ddog_prof_ProfilerManager_enqueue_sample returned"); + + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully"), + VoidResult::Err(e) => panic!("Failed to enqueue sample: {e}"), + } + + // Give the manager a very short time to process + println!("[test] Sleeping briefly"); + std::thread::sleep(std::time::Duration::from_millis(50)); + println!("[test] Woke up"); + + // Pause the profiler manager using FFI + println!("[test] Calling ddog_prof_ProfilerManager_pause"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + println!("[test] ddog_prof_ProfilerManager_pause returned"); + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Restart the profiler manager in parent (preserves profile data) + println!("[test] Calling ddog_prof_ProfilerManager_restart_in_parent"); + let restart_result = unsafe { ddog_prof_ProfilerManager_restart_in_parent() }; + println!("[test] ddog_prof_ProfilerManager_restart_in_parent returned"); + let mut new_client_handle = match restart_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[test] Profiler manager restarted successfully"); + handle + } + ddcommon_ffi::Result::Err(e) => panic!("Failed to restart profiler manager: {e}"), + }; + + // Terminate the profiler manager immediately + println!("[test] Calling ddog_prof_ProfilerManager_terminate"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + let mut final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[test] Profiler manager terminated successfully"); + handle + } + ddcommon_ffi::Result::Err(e) => panic!("Failed to terminate profiler manager: {e}"), + }; + + // Check that the expected sample is present in the final profile + let profile_result = unsafe { final_profile_handle.take() }; + assert_profile_has_sample_values(profile_result, &[42]); + + // Drop the client handles + println!("[test] Dropping first client handle"); + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[test] First client handle dropped successfully"), + VoidResult::Err(e) => println!("Warning: failed to drop first client handle: {e}"), + } + + println!("[test] Dropping second client handle"); + let drop_result2 = unsafe { ddog_prof_ProfilerClient_drop(&mut new_client_handle) }; + match drop_result2 { + VoidResult::Ok => println!("[test] Second client handle dropped successfully"), + VoidResult::Err(e) => println!("Warning: failed to drop second client handle: {e}"), + } + + println!("[test] Lifecycle test with data processing completed successfully"); +} diff --git a/datadog-profiling-ffi/tests/test_ffi_start_pause_terminate.rs b/datadog-profiling-ffi/tests/test_ffi_start_pause_terminate.rs new file mode 100644 index 0000000000..9c8d45cb80 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_start_pause_terminate.rs @@ -0,0 +1,116 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_pause, ddog_prof_ProfilerManager_reset_for_testing, + ddog_prof_ProfilerManager_start, ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, + ProfileNewResult, ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use std::ffi::c_void; +use test_utils::*; + +#[test] +fn test_ffi_start_pause_terminate() { + println!("[test] Starting start/pause/terminate test"); + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + println!("[test] Profile created"); + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + println!("[test] Creating sample callbacks"); + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with very short intervals for testing + println!("[test] Creating config"); + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 100, // 100ms for faster testing + }; + + // Start the profiler manager using FFI + println!("[test] Calling ddog_prof_ProfilerManager_start"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + println!("[test] ddog_prof_ProfilerManager_start returned"); + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + println!("[test] Profiler manager started successfully"); + + // Send a sample using FFI + println!("[test] Sending sample"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + println!("[test] ddog_prof_ProfilerManager_enqueue_sample returned"); + + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully"), + VoidResult::Err(e) => panic!("Failed to enqueue sample: {e}"), + } + + // Give the manager a very short time to process + println!("[test] Sleeping briefly"); + std::thread::sleep(std::time::Duration::from_millis(50)); + println!("[test] Woke up"); + + // Pause the profiler manager using FFI + println!("[test] Calling ddog_prof_ProfilerManager_pause"); + let pause_result = unsafe { ddog_prof_ProfilerManager_pause() }; + println!("[test] ddog_prof_ProfilerManager_pause returned"); + match pause_result { + VoidResult::Ok => println!("[test] Profiler manager paused successfully"), + VoidResult::Err(e) => panic!("Failed to pause profiler manager: {e}"), + } + + // Terminate the profiler manager immediately + println!("[test] Calling ddog_prof_ProfilerManager_terminate"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + println!("[test] ddog_prof_ProfilerManager_terminate returned"); + let _final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[test] Profiler manager terminated successfully"); + handle + } + ddcommon_ffi::Result::Err(e) => panic!("Failed to terminate profiler manager: {e}"), + }; + + // Drop the client handle + println!("[test] Dropping client handle"); + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[test] Client handle dropped successfully"), + VoidResult::Err(e) => println!("Warning: failed to drop client handle: {e}"), + } + + println!("[test] Start/pause/terminate test completed successfully"); +} diff --git a/datadog-profiling-ffi/tests/test_ffi_start_terminate.rs b/datadog-profiling-ffi/tests/test_ffi_start_terminate.rs new file mode 100644 index 0000000000..0827346458 --- /dev/null +++ b/datadog-profiling-ffi/tests/test_ffi_start_terminate.rs @@ -0,0 +1,107 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#[cfg(unix)] +mod test_utils; +use datadog_profiling_ffi::{ + ddog_prof_Profile_new, ddog_prof_ProfilerClient_drop, ddog_prof_ProfilerManager_enqueue_sample, + ddog_prof_ProfilerManager_reset_for_testing, ddog_prof_ProfilerManager_start, + ddog_prof_ProfilerManager_terminate, ManagedSampleCallbacks, ProfileNewResult, + ProfilerManagerConfig, Slice, ValueType, VoidResult, +}; +use std::ffi::c_void; +use test_utils::*; + +#[test] +fn test_ffi_start_terminate() { + println!("[test] Starting simple start/terminate test"); + // Reset global state for this test + unsafe { ddog_prof_ProfilerManager_reset_for_testing() }.unwrap(); + // Reset upload count for this test + UPLOAD_COUNT.store(0, std::sync::atomic::Ordering::SeqCst); + + // Create a profile + println!("[test] Creating profile"); + let sample_types = [ValueType::new("samples", "count")]; + let profile_result = unsafe { ddog_prof_Profile_new(Slice::from(&sample_types[..]), None) }; + println!("[test] Profile created"); + let mut profile = match profile_result { + ProfileNewResult::Ok(p) => p, + ProfileNewResult::Err(e) => { + panic!("Failed to create profile: {e}") + } + }; + + // Create sample callbacks + println!("[test] Creating sample callbacks"); + let sample_callbacks = ManagedSampleCallbacks::new(test_converter, test_reset, test_drop); + + // Create config with very short intervals for testing + println!("[test] Creating config"); + let config = ProfilerManagerConfig { + channel_depth: 10, + cpu_sampling_interval_ms: 50, // 50ms for faster testing + upload_interval_ms: 100, // 100ms for faster testing + }; + + // Start the profiler manager using FFI + println!("[test] Calling ddog_prof_ProfilerManager_start"); + let client_result = unsafe { + ddog_prof_ProfilerManager_start( + &mut profile, + test_cpu_sampler_callback, + test_upload_callback, + sample_callbacks, + config, + ) + }; + println!("[test] ddog_prof_ProfilerManager_start returned"); + + let mut client_handle = match client_result { + ddcommon_ffi::Result::Ok(handle) => handle, + ddcommon_ffi::Result::Err(e) => panic!("Failed to start profiler manager: {e}"), + }; + + println!("[test] Profiler manager started successfully"); + + // Send a sample using FFI + println!("[test] Sending sample"); + let test_sample = create_test_sample(42); + let sample_ptr = Box::into_raw(Box::new(test_sample)) as *mut c_void; + + let enqueue_result = + unsafe { ddog_prof_ProfilerManager_enqueue_sample(&mut client_handle, sample_ptr) }; + println!("[test] ddog_prof_ProfilerManager_enqueue_sample returned"); + + match enqueue_result { + VoidResult::Ok => println!("[test] Sample enqueued successfully"), + VoidResult::Err(e) => panic!("Failed to enqueue sample: {e}"), + } + + // Give the manager a very short time to process + println!("[test] Sleeping briefly"); + std::thread::sleep(std::time::Duration::from_millis(50)); + println!("[test] Woke up"); + + // Terminate the profiler manager immediately + println!("[test] Calling ddog_prof_ProfilerManager_terminate"); + let terminate_result = unsafe { ddog_prof_ProfilerManager_terminate() }; + println!("[test] ddog_prof_ProfilerManager_terminate returned"); + let _final_profile_handle = match terminate_result { + ddcommon_ffi::Result::Ok(handle) => { + println!("[test] Profiler manager terminated successfully"); + handle + } + ddcommon_ffi::Result::Err(e) => panic!("Failed to terminate profiler manager: {e}"), + }; + + // Drop the client handle + println!("[test] Dropping client handle"); + let drop_result = unsafe { ddog_prof_ProfilerClient_drop(&mut client_handle) }; + match drop_result { + VoidResult::Ok => println!("[test] Client handle dropped successfully"), + VoidResult::Err(e) => println!("Warning: failed to drop client handle: {e}"), + } + + println!("[test] Simple start/terminate test completed successfully"); +} diff --git a/datadog-profiling-ffi/tests/test_utils.rs b/datadog-profiling-ffi/tests/test_utils.rs new file mode 100644 index 0000000000..4a8907016e --- /dev/null +++ b/datadog-profiling-ffi/tests/test_utils.rs @@ -0,0 +1,140 @@ +// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![allow(dead_code)] + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use datadog_profiling::internal::Profile; +use datadog_profiling_ffi::*; +use datadog_profiling_protobuf::prost_impls::Profile as ProstProfile; +use ddcommon_ffi::{Handle, Slice}; +use prost::Message; +use tokio_util::sync::CancellationToken; + +pub extern "C" fn test_cpu_sampler_callback(_profile: *mut Profile) {} + +pub static UPLOAD_COUNT: AtomicUsize = AtomicUsize::new(0); + +pub extern "C" fn test_upload_callback( + _profile: *mut Handle, + _token: &mut std::option::Option, +) { + let upload_count = UPLOAD_COUNT.fetch_add(1, Ordering::SeqCst); + println!("[upload_callback] called, count: {upload_count}"); +} + +#[repr(C)] +pub struct TestSample<'a> { + pub values: [i64; 1], + pub locations: [profiles::datatypes::Location<'a>; 1], +} + +#[allow(dead_code)] +pub fn create_test_sample(value: i64) -> TestSample<'static> { + let function = profiles::datatypes::Function { + name: match value { + 42 => "function_1", + 43 => "function_2", + 44 => "function_3", + 45 => "function_4", + 46 => "function_5", + _ => "unknown_function", + } + .into(), + system_name: match value { + 42 => "function_1", + 43 => "function_2", + 44 => "function_3", + 45 => "function_4", + 46 => "function_5", + _ => "unknown_function", + } + .into(), + filename: "test.rs".into(), + ..Default::default() + }; + + TestSample { + values: [value], + locations: [profiles::datatypes::Location { + function, + ..Default::default() + }], + } +} + +pub extern "C" fn test_converter(sample: &SendSample) -> profiles::datatypes::Sample { + let test_sample = unsafe { &*(sample.as_ptr() as *const TestSample) }; + + profiles::datatypes::Sample { + locations: Slice::from(&test_sample.locations[..]), + values: Slice::from(&test_sample.values[..]), + labels: Slice::empty(), + } +} + +pub extern "C" fn test_reset(sample: &mut SendSample) { + let test_sample = unsafe { &mut *(sample.as_ptr() as *mut TestSample) }; + test_sample.values[0] = 0; + test_sample.locations[0] = profiles::datatypes::Location { + function: profiles::datatypes::Function { + name: "".into(), + system_name: "".into(), + filename: "".into(), + ..Default::default() + }, + ..Default::default() + }; +} + +pub extern "C" fn test_drop(sample: SendSample) { + let _test_sample = unsafe { Box::from_raw(sample.as_ptr() as *mut TestSample) }; + // Box will be dropped here, freeing the memory +} + +// --- Helpers for profile sample checking --- + +pub fn decode_pprof(encoded: &[u8]) -> ProstProfile { + let mut decoder = lz4_flex::frame::FrameDecoder::new(encoded); + let mut buf = std::vec::Vec::new(); + use std::io::Read; + decoder.read_to_end(&mut buf).unwrap(); + ProstProfile::decode(buf.as_slice()).unwrap() +} + +pub fn roundtrip_to_pprof( + profile: std::result::Result, anyhow::Error>, +) -> ProstProfile { + let encoded = (*profile.expect("Failed to extract profile")) + .serialize_into_compressed_pprof(None, None) + .unwrap(); + decode_pprof(&encoded.buffer) +} + +pub fn assert_profile_has_sample_values( + profile: std::result::Result, anyhow::Error>, + expected_values: &[i64], +) { + let pprof = roundtrip_to_pprof(profile); + println!("[debug] Profile contains {} samples", pprof.samples.len()); + for (i, sample) in pprof.samples.iter().enumerate() { + println!("[debug] Sample {}: values = {:?}", i, sample.values); + } + + let mut found = vec![false; expected_values.len()]; + for sample in &pprof.samples { + for (i, &expected) in expected_values.iter().enumerate() { + if sample.values.contains(&expected) { + found[i] = true; + } + } + } + for (i, &was_found) in found.iter().enumerate() { + assert!( + was_found, + "Expected sample value {} not found in profile", + expected_values[i] + ); + } +} diff --git a/ddcommon-ffi/src/handle.rs b/ddcommon-ffi/src/handle.rs index 25a19b59b0..2740fa889c 100644 --- a/ddcommon-ffi/src/handle.rs +++ b/ddcommon-ffi/src/handle.rs @@ -8,6 +8,7 @@ use anyhow::Context; /// Represents an object that should only be referred to by its handle. /// Do not access its member for any reason, only use the C API functions on this struct. #[repr(C)] +#[derive(Debug)] pub struct Handle { // This may be null, but if not it will point to a valid . inner: *mut T,