Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ internal-logs = ["tracing"]
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["spec_unstable_logs_enabled"]} # for documentation tests
criterion = { workspace = true }
rand = { workspace = true, features = ["os_rng", "thread_rng"] }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"

[[bench]]
name = "metrics"
Expand Down
199 changes: 194 additions & 5 deletions opentelemetry/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@
// The empty context is always at the bottom of the [`ContextStack`]
// and cannot be popped, and the overflow position is invalid, so do
// nothing.
otel_warn!(
name: "Context.PopInvalidPosition",
position = pos,
message = if pos == ContextStack::BASE_POS {
"Attempted to pop the base context which is not allowed"

Check warning on line 467 in opentelemetry/src/context.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context.rs#L465-L467

Added lines #L465 - L467 were not covered by tests
} else {
"Attempted to pop the overflow position which is not allowed"

Check warning on line 469 in opentelemetry/src/context.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context.rs#L469

Added line #L469 was not covered by tests
}
);
return;
}
let len: u16 = self.stack.len() as u16;
Expand All @@ -479,6 +488,12 @@
// This is an out of order pop.
if pos >= len {
// This is an invalid id, ignore it.
otel_warn!(
name: "Context.PopOutOfBounds",
position = pos,
stack_length = len,

Check warning on line 494 in opentelemetry/src/context.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context.rs#L493-L494

Added lines #L493 - L494 were not covered by tests
message = "Attempted to pop beyond the end of the context stack"
);
return;
}
// Clear out the entry at the given id.
Expand All @@ -505,6 +520,8 @@
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time::sleep;

#[derive(Debug, PartialEq)]
struct ValueA(u64);
Expand Down Expand Up @@ -659,14 +676,186 @@
}
}

/// Tests that a new ContextStack is created with the correct initial capacity.
#[test]
fn test_initial_capacity() {
let stack = ContextStack::default();
assert_eq!(stack.stack.capacity(), ContextStack::INITIAL_CAPACITY);
}

/// Tests that map_current_cx correctly accesses the current context.
#[test]
fn context_stack_pop_id() {
// This is to get full line coverage of the `pop_id` function.
// In real life the `Drop`` implementation of `ContextGuard` ensures that
// the ids are valid and inside the bounds.
fn test_map_current_cx() {
let mut stack = ContextStack::default();
let test_value = ValueA(42);
stack.current_cx = Context::new().with_value(test_value);

let result = stack.map_current_cx(|cx| {
assert_eq!(cx.get::<ValueA>(), Some(&ValueA(42)));
true
});
assert!(result);
}

/// Tests popping contexts in non-sequential order.
#[test]
fn test_pop_id_out_of_order() {
let mut stack = ContextStack::default();

// Push three contexts
let cx1 = Context::new().with_value(ValueA(1));
let cx2 = Context::new().with_value(ValueA(2));
let cx3 = Context::new().with_value(ValueA(3));

let id1 = stack.push(cx1);
let id2 = stack.push(cx2);
let id3 = stack.push(cx3);

// Pop middle context first - should not affect current context
stack.pop_id(id2);
assert_eq!(stack.current_cx.get::<ValueA>(), Some(&ValueA(3)));
assert_eq!(stack.stack.len(), 3); // Length unchanged for middle pops

// Pop last context - should restore previous valid context
stack.pop_id(id3);
assert_eq!(stack.current_cx.get::<ValueA>(), Some(&ValueA(1)));
assert_eq!(stack.stack.len(), 1);

// Pop first context - should restore to empty state
stack.pop_id(id1);
assert_eq!(stack.current_cx.get::<ValueA>(), None);
assert_eq!(stack.stack.len(), 0);
}

/// Tests edge cases in context stack operations. IRL these should log
/// warnings, and definitely not panic.
#[test]
fn test_pop_id_edge_cases() {
let mut stack = ContextStack::default();

// Test popping BASE_POS - should be no-op
stack.pop_id(ContextStack::BASE_POS);
assert_eq!(stack.stack.len(), 0);

// Test popping MAX_POS - should be no-op
stack.pop_id(ContextStack::MAX_POS);
stack.pop_id(4711);
assert_eq!(stack.stack.len(), 0);

// Test popping invalid position - should be no-op
stack.pop_id(1000);
assert_eq!(stack.stack.len(), 0);

// Test popping from empty stack - should be safe
stack.pop_id(1);
assert_eq!(stack.stack.len(), 0);
}

/// Tests stack behavior when reaching maximum capacity.
/// Once we push beyond this point, we should end up with a context
/// that points _somewhere_, but mutating it should not affect the current
/// active context.
#[test]
fn test_push_overflow() {
let mut stack = ContextStack::default();
let max_pos = ContextStack::MAX_POS as usize;

// Fill stack up to max position
for i in 0..max_pos {
let cx = Context::new().with_value(ValueA(i as u64));
let id = stack.push(cx);
assert_eq!(id, (i + 1) as u16);
}

// Try to push beyond capacity
let cx = Context::new().with_value(ValueA(max_pos as u64));
let id = stack.push(cx);
assert_eq!(id, ContextStack::MAX_POS);

// Verify current context remains unchanged after overflow
assert_eq!(
stack.current_cx.get::<ValueA>(),
Some(&ValueA((max_pos - 2) as u64))
);
}

/// Tests that:
/// 1. Parent context values are properly propagated to async operations
/// 2. Values added during async operations do not affect parent context
#[tokio::test]
async fn test_async_context_propagation() {
// Set up initial context with ValueA
let parent_cx = Context::new().with_value(ValueA(42));

// Create and run async operation with the parent context
async {
// Verify we can see the parent context's value
assert_eq!(
Context::current().get::<ValueA>(),
Some(&ValueA(42)),
"Parent context value should be available in async operation"
);

// Create new context with both values
let cx_with_both = Context::current().with_value(ValueB(24));

// Run nested async operation with both values
async {
// Verify both values are available
assert_eq!(
Context::current().get::<ValueA>(),
Some(&ValueA(42)),
"Parent value should still be available after adding new value"
);
assert_eq!(
Context::current().get::<ValueB>(),
Some(&ValueB(24)),
"New value should be available in async operation"
);

// Do some async work to simulate real-world scenario
sleep(Duration::from_millis(10)).await;

// Values should still be available after async work
assert_eq!(
Context::current().get::<ValueA>(),
Some(&ValueA(42)),
"Parent value should persist across await points"
);
assert_eq!(
Context::current().get::<ValueB>(),
Some(&ValueB(24)),
"New value should persist across await points"
);
}
.with_context(cx_with_both)
.await;
}
.with_context(parent_cx.clone()) // Propagate the parent context to the async operation
.await;

// After async operation completes:
// 1. Parent context should be unchanged
assert_eq!(
parent_cx.get::<ValueA>(),
Some(&ValueA(42)),
"Parent context should be unchanged"
);
assert_eq!(
parent_cx.get::<ValueB>(),
None,
"Parent context should not see values added in async operation"
);

// 2. Current context should be back to default
assert_eq!(
Context::current().get::<ValueA>(),
None,
"Current context should be back to default"
);
assert_eq!(
Context::current().get::<ValueB>(),
None,
"Current context should not have async operation's values"
);
}
}