Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 139 additions & 31 deletions opentelemetry/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,18 @@ thread_local! {
/// assert_eq!(current.get::<ValueB>(), None);
/// ```
#[derive(Clone, Default)]
#[cfg_attr(target_pointer_width = "64", repr(align(16)))]
#[cfg_attr(target_pointer_width = "32", repr(align(8)))]
pub struct Context {
pub(crate) inner: Option<Arc<InnerContext>>,
flags: ContextFlags,
}

#[derive(Default)]
pub(crate) struct InnerContext {
#[cfg(feature = "trace")]
pub(crate) span: Option<Arc<SynchronizedSpan>>,
entries: Option<Arc<EntryMap>>,
suppress_telemetry: bool,
}

type EntryMap = HashMap<TypeId, Arc<dyn Any + Sync + Send>, BuildHasherDefault<IdHasher>>;
Expand Down Expand Up @@ -198,7 +205,9 @@ impl Context {
/// assert_eq!(cx.get::<MyUser>(), None);
/// ```
pub fn get<T: 'static>(&self) -> Option<&T> {
self.entries
self.inner
.as_ref()?
.entries
.as_ref()?
.get(&TypeId::of::<T>())?
.downcast_ref()
Expand Down Expand Up @@ -232,20 +241,29 @@ impl Context {
/// assert_eq!(cx_with_a_and_b.get::<ValueB>(), Some(&ValueB(42)));
/// ```
pub fn with_value<T: 'static + Send + Sync>(&self, value: T) -> Self {
let entries = if let Some(current_entries) = &self.entries {
let mut inner_entries = (**current_entries).clone();
inner_entries.insert(TypeId::of::<T>(), Arc::new(value));
Some(Arc::new(inner_entries))
} else {
fn new_entries<T: 'static + Send + Sync>(value: T) -> Option<Arc<EntryMap>> {
let mut entries = EntryMap::default();
entries.insert(TypeId::of::<T>(), Arc::new(value));
Some(Arc::new(entries))
}
let (entries, span) = if let Some(inner) = &self.inner {
if let Some(current_entries) = &inner.entries {
let mut inner_entries = (**current_entries).clone();
inner_entries.insert(TypeId::of::<T>(), Arc::new(value));
(Some(Arc::new(inner_entries)), &inner.span)
} else {
(new_entries(value), &inner.span)
}
} else {
(new_entries(value), &None)
};
Context {
entries,
#[cfg(feature = "trace")]
span: self.span.clone(),
suppress_telemetry: self.suppress_telemetry,
inner: Some(Arc::new(InnerContext {
entries,
#[cfg(feature = "trace")]
span: span.clone(),
})),
flags: self.flags,
}
}

Expand Down Expand Up @@ -335,16 +353,14 @@ impl Context {
/// Returns whether telemetry is suppressed in this context.
#[inline]
pub fn is_telemetry_suppressed(&self) -> bool {
self.suppress_telemetry
self.flags.is_telemetry_suppressed()
}

/// Returns a new context with telemetry suppression enabled.
pub fn with_telemetry_suppressed(&self) -> Self {
Context {
entries: self.entries.clone(),
#[cfg(feature = "trace")]
span: self.span.clone(),
suppress_telemetry: true,
inner: self.inner.clone(),
flags: self.flags.with_telemetry_suppressed(),
}
}

Expand Down Expand Up @@ -409,48 +425,140 @@ impl Context {
}

#[cfg(feature = "trace")]
pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
Self::map_current(|cx| Context {
span: Some(Arc::new(value)),
entries: cx.entries.clone(),
suppress_telemetry: cx.suppress_telemetry,
})
pub(crate) fn current_with_synchronized_span(span: Option<Arc<SynchronizedSpan>>) -> Self {
Self::map_current(|cx| cx.with_synchronized_span(span))
}

#[cfg(feature = "trace")]
pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
Context {
span: Some(Arc::new(value)),
entries: self.entries.clone(),
suppress_telemetry: self.suppress_telemetry,
pub(crate) fn with_synchronized_span(&self, span: Option<Arc<SynchronizedSpan>>) -> Self {
let active = span.is_some();
if let Some(inner) = &self.inner {
if span.is_none() && inner.span.is_none() {
self.clone()
} else {
Context {
inner: Some(Arc::new(InnerContext {
span,
entries: inner.entries.clone(),
})),
flags: self.flags.with_active_span(active),
}
}
} else {
if span.is_none() {
self.clone()
} else {
Context {
inner: Some(Arc::new(InnerContext {
span,
entries: None,
})),
flags: self.flags.with_active_span(active),
}
}
}
}

#[cfg(feature = "trace")]
#[inline(always)]
pub(crate) const fn has_active_span_flag(&self) -> bool {
self.flags.is_span_active()
}
}

impl fmt::Debug for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut dbg = f.debug_struct("Context");

#[cfg(feature = "trace")]
let mut entries = self.entries.as_ref().map_or(0, |e| e.len());
let mut entries = self
.inner
.as_ref()
.map_or(0, |i| i.entries.as_ref().map_or(0, |e| e.len()));
#[cfg(feature = "trace")]
{
if let Some(span) = &self.span {
if let Some(Some(span)) = self.inner.as_ref().map(|i| i.span.as_ref()) {
dbg.field("span", &span.span_context());
entries += 1;
} else {
dbg.field("span", &"None");
}
}
#[cfg(not(feature = "trace"))]
let entries = self.entries.as_ref().map_or(0, |e| e.len());
let entries = self
.inner
.as_ref()
.map_or(0, |i| i.entries.as_ref().map_or(0, |e| e.len()));

dbg.field("entries count", &entries)
.field("suppress_telemetry", &self.suppress_telemetry)
.field("flags", &self.flags)
.finish()
}
}

/// Bit flags for context state.
#[derive(Clone, Copy, Default)]
struct ContextFlags(u16);

impl ContextFlags {
const SUPPRESS_TELEMETRY: u16 = 1 << 0;
#[cfg(feature = "trace")]
const ACTIVE_SPAN: u16 = 1 << 1;

/// Returns true if telemetry suppression is enabled.
#[inline(always)]
const fn is_telemetry_suppressed(self) -> bool {
(self.0 & Self::SUPPRESS_TELEMETRY) != 0
}

/// Returns a new ContextFlags with telemetry suppression enabled.
#[inline(always)]
const fn with_telemetry_suppressed(self) -> Self {
ContextFlags(self.0 | Self::SUPPRESS_TELEMETRY)
}

/// Returns true if the active span flag is set.
#[cfg(feature = "trace")]
#[inline(always)]
const fn is_span_active(self) -> bool {
(self.0 & Self::ACTIVE_SPAN) != 0
}

/// Returns a new ContextFlags with the active span flag set.
#[cfg(feature = "trace")]
#[inline(always)]
const fn with_active_span(self, active: bool) -> Self {
if active {
ContextFlags(self.0 | Self::ACTIVE_SPAN)
} else {
ContextFlags(self.0 & !Self::ACTIVE_SPAN)
}
}
}

impl fmt::Debug for ContextFlags {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(feature = "trace")]
let mut comma = false;
f.write_str("ContextFlags(")?;
if self.is_telemetry_suppressed() {
f.write_str("TELEMETRY_SUPPRESSED")?;
#[cfg(feature = "trace")]
{
comma = true;
}
}
#[cfg(feature = "trace")]
if self.is_span_active() {
if comma {
f.write_str(", ")?;
}
f.write_str("ACTIVE_SPAN")?;
}
f.write_str(")")
}
}

/// A guard that resets the current context to the prior context when dropped.
#[derive(Debug)]
pub struct ContextGuard {
Expand Down
47 changes: 29 additions & 18 deletions opentelemetry/src/trace/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use crate::{
trace::{Span, SpanContext, Status},
Context, ContextGuard, KeyValue,
};
use std::{borrow::Cow, error::Error, sync::Mutex};
use std::{
borrow::Cow,
error::Error,
sync::{Arc, Mutex},
};

// Re-export for compatability. This used to be contained here.
pub use crate::context::{FutureExt, WithContext};
Expand All @@ -30,22 +34,29 @@ impl SynchronizedSpan {
pub(crate) fn span_context(&self) -> &SpanContext {
&self.span_context
}
}

impl From<SpanContext> for SynchronizedSpan {
fn from(value: SpanContext) -> Self {
Self {
span_context: value,
inner: None,
fn maybe_from_span<T>(value: T) -> Option<Arc<SynchronizedSpan>>
where
T: Span + Send + Sync + 'static,
{
if !value.span_context().is_valid() {
None
} else {
Some(Arc::new(SynchronizedSpan {
span_context: value.span_context().clone(),
inner: Some(Mutex::new(global::BoxedSpan::new(value))),
}))
}
}
}

impl<T: Span + Send + Sync + 'static> From<T> for SynchronizedSpan {
fn from(value: T) -> Self {
Self {
span_context: value.span_context().clone(),
inner: Some(Mutex::new(global::BoxedSpan::new(value))),
fn maybe_from_span_context(value: SpanContext) -> Option<Arc<SynchronizedSpan>> {
if !value.is_valid() {
None
} else {
Some(Arc::new(SynchronizedSpan {
span_context: value,
inner: None,
}))
}
}
}
Expand Down Expand Up @@ -300,27 +311,27 @@ pub trait TraceContextExt {

impl TraceContextExt for Context {
fn current_with_span<T: crate::trace::Span + Send + Sync + 'static>(span: T) -> Self {
Context::current_with_synchronized_span(span.into())
Context::current_with_synchronized_span(SynchronizedSpan::maybe_from_span(span))
}

fn with_span<T: crate::trace::Span + Send + Sync + 'static>(&self, span: T) -> Self {
self.with_synchronized_span(span.into())
self.with_synchronized_span(SynchronizedSpan::maybe_from_span(span))
}

fn span(&self) -> SpanRef<'_> {
if let Some(span) = self.span.as_ref() {
if let Some(Some(span)) = self.inner.as_ref().map(|i| i.span.as_ref()) {
SpanRef(span)
} else {
SpanRef(&NOOP_SPAN)
}
}

fn has_active_span(&self) -> bool {
self.span.is_some()
self.has_active_span_flag()
}

fn with_remote_span_context(&self, span_context: crate::trace::SpanContext) -> Self {
self.with_synchronized_span(span_context.into())
self.with_synchronized_span(SynchronizedSpan::maybe_from_span_context(span_context))
}
}

Expand Down
19 changes: 14 additions & 5 deletions opentelemetry/src/trace/span_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,29 @@ mod tests {
let cx = Context::current();
assert_eq!(
format!("{cx:?}"),
"Context { span: \"None\", entries count: 0, suppress_telemetry: false }"
"Context { span: \"None\", entries count: 0, flags: ContextFlags() }"
);
let cx = Context::current().with_remote_span_context(SpanContext::NONE);
let remote_span_cx = SpanContext::new(
0x123456789abcdef0123456789abcdef0.into(),
0x876543210fedcba0.into(),
TraceFlags::NOT_SAMPLED,
false,
TraceState::NONE,
);
let cx = Context::current()
.with_remote_span_context(remote_span_cx)
.with_telemetry_suppressed();
assert_eq!(
format!("{cx:?}"),
"Context { \
span: SpanContext { \
trace_id: 00000000000000000000000000000000, \
span_id: 0000000000000000, \
trace_id: 123456789abcdef0123456789abcdef0, \
span_id: 876543210fedcba0, \
trace_flags: TraceFlags(0), \
is_remote: false, \
trace_state: TraceState(None) \
}, \
entries count: 1, suppress_telemetry: false \
entries count: 1, flags: ContextFlags(TELEMETRY_SUPPRESSED, ACTIVE_SPAN) \
}"
);
}
Expand Down
Loading