Skip to content

Commit 2ed49d7

Browse files
committed
Cleanup async
1 parent 5157bdd commit 2ed49d7

File tree

3 files changed

+60
-32
lines changed

3 files changed

+60
-32
lines changed

libdd-profiling-ffi/src/exporter.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,13 @@ unsafe fn parse_json(
213213
/// * `optional_info_json` - System info as JSON string.
214214
/// * `cancel` - Optional cancellation token.
215215
///
216+
/// # Thread Affinity
217+
///
218+
/// **Important**: This function uses a cached single-threaded runtime with thread affinity.
219+
/// For best results, all calls on the same exporter instance should be made from the same
220+
/// thread. If you need to use the exporter from multiple threads, create a separate
221+
/// exporter instance per thread.
222+
///
216223
/// # Safety
217224
/// All non-null arguments MUST have been created by APIs in this module.
218225
#[no_mangle]

libdd-profiling/src/cxx.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,13 @@ pub mod ffi {
169169
/// * `info` - System/environment info as JSON string (e.g., `{"os": "linux", "arch":
170170
/// "x86_64"}`) See Datadog-internal "RFC: Pprof System Info Support" Pass empty string ""
171171
/// if not needed
172+
///
173+
/// # Thread Affinity
174+
///
175+
/// **Important**: This method uses a cached single-threaded runtime with thread affinity.
176+
/// For best results, all calls on the same exporter instance should be made from the same
177+
/// thread. If you need to use the exporter from multiple threads, create a separate
178+
/// exporter instance per thread.
172179
#[allow(clippy::too_many_arguments)]
173180
fn send_profile(
174181
self: &mut ProfileExporter,
@@ -198,6 +205,13 @@ pub mod ffi {
198205
/// "x86_64"}`) See Datadog-internal "RFC: Pprof System Info Support" Pass empty string ""
199206
/// if not needed
200207
/// * `cancel` - Cancellation token to cancel the send operation
208+
///
209+
/// # Thread Affinity
210+
///
211+
/// **Important**: This method uses a cached single-threaded runtime with thread affinity.
212+
/// For best results, all calls on the same exporter instance should be made from the same
213+
/// thread. If you need to use the exporter from multiple threads, create a separate
214+
/// exporter instance per thread.
201215
#[allow(clippy::too_many_arguments)]
202216
fn send_profile_with_cancellation(
203217
self: &mut ProfileExporter,

libdd-profiling/src/exporter/profile_exporter.rs

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use anyhow::Context;
2929
use libdd_common::tag::Tag;
3030
use libdd_common::{azure_app_services, tag, Endpoint};
3131
use serde_json::json;
32-
use std::{future, io::Write};
32+
use std::io::Write;
3333
use tokio::runtime::Runtime;
3434
use tokio_util::sync::CancellationToken;
3535

@@ -57,6 +57,13 @@ impl ProfileExporter {
5757
/// The default configuration automatically retries safe errors and low-level protocol NACKs.
5858
/// For custom retry policies, users can configure the reqwest client before creating the
5959
/// exporter.
60+
///
61+
/// # Thread Safety
62+
///
63+
/// The exporter can be used from any thread, but if using `send_blocking()`, the exporter
64+
/// should remain on the same thread for all blocking calls. See [`send_blocking`] for details.
65+
///
66+
/// [`send_blocking`]: ProfileExporter::send_blocking
6067
pub fn new(
6168
profiling_library_name: &str,
6269
profiling_library_version: &str,
@@ -176,6 +183,23 @@ impl ProfileExporter {
176183
})
177184
}
178185

186+
/// Synchronously sends a profile to the configured endpoint.
187+
///
188+
/// This is a blocking wrapper around the async [`send`] method. It lazily creates and caches
189+
/// a single-threaded tokio runtime on first use.
190+
///
191+
/// # Thread Affinity
192+
///
193+
/// **Important**: The cached runtime uses `new_current_thread()`, which has thread affinity.
194+
/// For best results, all calls to `send_blocking()` on the same exporter instance should be
195+
/// made from the same thread. Moving the exporter across threads between blocking calls may
196+
/// cause issues.
197+
///
198+
/// If you need to use the exporter from multiple threads, consider either:
199+
/// - Creating a separate exporter instance per thread
200+
/// - Using the async [`send`] method directly from within a tokio runtime
201+
///
202+
/// [`send`]: ProfileExporter::send
179203
#[allow(clippy::too_many_arguments)]
180204
pub fn send_blocking(
181205
&mut self,
@@ -233,25 +257,20 @@ impl ProfileExporter {
233257

234258
let form = self.build_multipart_form(event, profile, additional_files)?;
235259

236-
// Build request
237-
let request = self
260+
let request_builder = self
238261
.client
239262
.post(&self.request_url)
240263
.headers(self.headers.clone())
241-
.multipart(form)
242-
.build()?;
243-
244-
// Send request with cancellation support
245-
tokio::select! {
246-
_ = async {
247-
match cancel {
248-
Some(token) => token.cancelled().await,
249-
None => future::pending().await,
250-
}
251-
} => Err(anyhow::anyhow!("Operation cancelled by user")),
252-
result = self.client.execute(request) => {
253-
Ok(result?.status())
264+
.multipart(form);
265+
266+
// Send request with optional cancellation support
267+
if let Some(token) = cancel {
268+
tokio::select! {
269+
_ = token.cancelled() => anyhow::bail!("Operation cancelled by user"),
270+
result = request_builder.send() => Ok(result?.status()),
254271
}
272+
} else {
273+
Ok(request_builder.send().await?.status())
255274
}
256275
}
257276

@@ -290,7 +309,7 @@ impl ProfileExporter {
290309
let mut internal = internal_metadata.unwrap_or_else(|| json!({}));
291310
internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
292311

293-
let mut event = json!({
312+
json!({
294313
"attachments": attachments,
295314
"tags_profiler": tags_profiler,
296315
"start": chrono::DateTime::<chrono::Utc>::from(profile.start)
@@ -299,23 +318,11 @@ impl ProfileExporter {
299318
.format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
300319
"family": self.family,
301320
"version": "4",
302-
"endpoint_counts": if profile.endpoints_stats.is_empty() {
303-
None
304-
} else {
305-
Some(&profile.endpoints_stats)
306-
},
321+
"endpoint_counts": (!profile.endpoints_stats.is_empty()).then_some(&profile.endpoints_stats),
322+
"process_tags": process_tags.filter(|s| !s.is_empty()),
307323
"internal": internal,
308324
"info": info.unwrap_or_else(|| json!({})),
309-
});
310-
311-
// Add process_tags if provided
312-
if let Some(tags) = process_tags {
313-
if !tags.is_empty() {
314-
event["process_tags"] = json!(tags);
315-
}
316-
}
317-
318-
event
325+
})
319326
}
320327

321328
fn build_multipart_form(

0 commit comments

Comments
 (0)