Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 101 additions & 11 deletions foundations-macros/src/span_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ struct Options {

#[darling(default = "Options::default_async_local")]
async_local: bool,

#[darling(default = "Options::default_time_histogram")]
with_time_histogram: Option<Expr>,
}

impl Options {
Expand All @@ -54,6 +57,10 @@ impl Options {
fn default_async_local() -> bool {
false
}

fn default_time_histogram() -> Option<Expr> {
None
}
}

struct Args {
Expand Down Expand Up @@ -113,10 +120,19 @@ fn expand_from_parsed(args: Args, item_fn: ItemFn) -> TokenStream2 {
let span_name = args.span_name.as_tokens();
let crate_path = &args.options.crate_path;

quote!(
let __span = #crate_path::telemetry::tracing::span(#span_name);
#block
)
if let Some(timer) = args.options.with_time_histogram {
quote!(
let __span = #crate_path::telemetry::tracing::span(#span_name)
.into_context()
.with_time_histogram(#timer);
#block
)
} else {
quote!(
let __span = #crate_path::telemetry::tracing::span(#span_name);
#block
)
}
}),
};

Expand Down Expand Up @@ -182,13 +198,22 @@ fn wrap_with_span(args: &Args, block: TokenStream2) -> TokenStream2 {

let span_name = args.span_name.as_tokens();
let crate_path = &args.options.crate_path;

quote!(
#crate_path::telemetry::tracing::span(#span_name)
.into_context()
.#apply_fn(#block)
.await
)
if let Some(timer) = &args.options.with_time_histogram {
quote!(
#crate_path::telemetry::tracing::span(#span_name)
.into_context()
.with_time_histogram(#timer)
.#apply_fn(#block)
.await
)
} else {
quote!(
#crate_path::telemetry::tracing::span(#span_name)
.into_context()
.#apply_fn(#block)
.await
)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -482,4 +507,69 @@ mod tests {

assert_eq!(actual, expected);
}

#[test]
fn span_sync_fn_with_metric_tracker() {
let args = parse_attr! {
#[span_fn("sync_span", with_time_histogram = ::foo::bar::metrics::time_histogram(label = false))]
};

let item_fn = parse_quote! {
fn do_sync() -> io::Result<String> {
do_something_else();

Ok("foo".into())
}
};

let actual = expand_from_parsed(args, item_fn).to_string();

let expected = code_str! {
fn do_sync<>() -> io::Result<String> {
let __span = ::foundations::telemetry::tracing::span("sync_span")
.into_context()
.with_time_histogram(::foo::bar::metrics::time_histogram(label = false));
{
do_something_else();

Ok("foo".into())
}
}
};

assert_eq!(actual, expected);
}

#[test]
fn span_async_fn_with_metric_tracker() {
let args = parse_attr! {
#[span_fn("async_span", with_time_histogram = ::foo::bar::metrics::time_histogram(label = false))]
};

let item_fn = parse_quote! {
async fn do_async() -> io::Result<String> {
do_something_else().await;

Ok("foo".into())
}
};

let actual = expand_from_parsed(args, item_fn).to_string();

let expected = code_str! {
async fn do_async<>() -> io::Result<String> {
::foundations::telemetry::tracing::span("async_span")
.into_context()
.with_time_histogram(::foo::bar::metrics::time_histogram(label = false))
.apply(async move {{
do_something_else().await;

Ok("foo".into())
}})
.await
}
};

assert_eq!(actual, expected);
}
}
8 changes: 2 additions & 6 deletions foundations/src/telemetry/memory_profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,8 @@ fn collect_heap_profile() -> Result<String> {
let mut out_file_path_c_str = CString::new(out_file_path)?.into_bytes_with_nul();
let out_file_path_ptr = out_file_path_c_str.as_mut_ptr() as *mut c_char;

control::write(control::PROF_DUMP, out_file_path_ptr).map_err(|e| {
format!(
"failed to dump jemalloc heap profile to {:?}: {}",
out_file_path, e
)
})?;
control::write(control::PROF_DUMP, out_file_path_ptr)
.map_err(|e| format!("failed to dump jemalloc heap profile to {out_file_path:?}: {e}",))?;

let mut profile = Vec::new();

Expand Down
52 changes: 52 additions & 0 deletions foundations/src/telemetry/telemetry_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ feature_use!(cfg(feature = "tracing"), {
use super::tracing::internal::Tracer;
use super::tracing::testing::{current_test_tracer, TestTracerScope};
});

feature_use!(cfg(feature = "metrics"), {
use prometools::histogram::TimeHistogram;
});
});

#[cfg(feature = "testing")]
Expand Down Expand Up @@ -226,6 +230,54 @@ impl TelemetryContext {
TestTelemetryContext::new()
}

/// Track the duration of the current `span` in this `TelemetryContext`. The span's duration
/// will be inserted into `histogram`, regardless of whether `span` was sampled or not.
///
/// # Examples
/// ```
/// use foundations::telemetry::TelemetryContext;
/// use foundations::telemetry::metrics::{metrics, HistogramBuilder, TimeHistogram};
/// use foundations::telemetry::tracing::{self, test_trace};
///
/// #[metrics]
/// mod test_metrics {
/// #[ctor = HistogramBuilder { buckets: &[1E-4, 2E-4] }]
/// pub fn histogram(label: bool) -> TimeHistogram;
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // Test context is used for demonstration purposes to show the resulting traces.
/// let ctx = TelemetryContext::test();
/// let hist = test_metrics::histogram(true);
/// let clone = hist.clone();
///
/// {
/// let _scope = ctx.scope();
/// let _root = tracing::span("root");
/// let telemetry_ctx = TelemetryContext::current().with_time_histogram(clone);
///
/// let handle = std::thread::spawn(move || {
/// let _scope = telemetry_ctx.scope();
/// let _child = tracing::span("child");
/// });
///
/// handle.join();
/// }
///
/// let snapshot = hist.snapshot();
/// assert_eq!(snapshot.count(), 1);
/// }
/// ```
#[cfg(all(feature = "metrics", feature = "tracing"))]
pub fn with_time_histogram(mut self, histogram: TimeHistogram) -> Self {
if let Some(inner) = self.span.as_mut() {
inner.track_with_time_histogram(histogram);
}

self
}

/// Wraps a future with the telemetry context.
///
/// [`TelemetryScope`] can't be used across `await` points to propagate the telemetry context,
Expand Down
37 changes: 33 additions & 4 deletions foundations/src/telemetry/tracing/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use cf_rustracing::sampler::BoxSampler;
use cf_rustracing::tag::Tag;
use cf_rustracing_jaeger::span::{Span, SpanContext, SpanContextState};
use parking_lot::RwLock;
use prometools::histogram::{HistogramTimer, TimeHistogram};
use rand::{self, Rng};
use std::borrow::Cow;
use std::error::Error;
Expand Down Expand Up @@ -54,14 +55,31 @@ impl From<SharedSpanHandle> for Arc<RwLock<Span>> {
}
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct SharedSpan {
// NOTE: we intentionally use a lock without poisoning here to not
// panic the threads if they just share telemetry with failed thread.
pub(crate) inner: SharedSpanHandle,
// NOTE: store sampling flag separately, so we don't need to acquire lock
// every time we need to check the flag.
is_sampled: bool,
timer: Option<Arc<HistogramTimer>>,
}

impl SharedSpan {
#[cfg(all(feature = "metrics", feature = "tracing"))]
pub(crate) fn track_with_time_histogram(&mut self, timer: TimeHistogram) {
let _ = self.timer.insert(Arc::new(timer.start_timer()));
}
}

impl std::fmt::Debug for SharedSpan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedSpan")
.field("inner", &self.inner)
.field("is_sampled", &self.is_sampled)
.finish()
}
}

impl From<Span> for SharedSpan {
Expand All @@ -71,6 +89,7 @@ impl From<Span> for SharedSpan {
Self {
inner: SharedSpanHandle::new(inner),
is_sampled,
timer: None,
}
}
}
Expand All @@ -83,12 +102,22 @@ pub fn write_current_span(write_fn: impl FnOnce(&mut Span)) {
}
}

pub(crate) fn create_span(name: impl Into<Cow<'static, str>>) -> SharedSpan {
match current_span() {
pub(crate) fn create_span(
name: impl Into<Cow<'static, str>>,
histogram: Option<TimeHistogram>,
) -> SharedSpan {
let span = match current_span() {
Some(parent) => parent.inner.read().child(name, |o| o.start()),
None => start_trace(name, Default::default()),
};

let timer = histogram.map(|t| Arc::new(t.start_timer()));

SharedSpan {
is_sampled: span.is_sampled(),
inner: SharedSpanHandle::new(span),
timer,
}
.into()
}

pub(crate) fn current_span() -> Option<SharedSpan> {
Expand Down
3 changes: 1 addition & 2 deletions foundations/src/telemetry/tracing/live/event_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ impl TraceLogBuilder {

fn finalize(mut self, end_timestamp: u64) -> String {
self.out.push_str(&format!(
"{{\"pid\":1,\"name\":\"Trace dump requested\",\"ph\":\"i\",\"ts\":{},\"s\":\"g\"}}",
end_timestamp,
"{{\"pid\":1,\"name\":\"Trace dump requested\",\"ph\":\"i\",\"ts\":{end_timestamp},\"s\":\"g\"}}",
));

self.out.push(']');
Expand Down
Loading
Loading