Skip to content

Commit 1ecd2fb

Browse files
committed
validate context behaviour across async boundaries
1 parent 408ff96 commit 1ecd2fb

File tree

2 files changed

+60
-7
lines changed

2 files changed

+60
-7
lines changed

opentelemetry/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ internal-logs = ["tracing"]
4343
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["spec_unstable_logs_enabled"]} # for documentation tests
4444
criterion = { workspace = true }
4545
rand = { workspace = true, features = ["os_rng", "thread_rng"] }
46+
tokio = { version = "1.0", features = ["full"] }
47+
futures = "0.3"
4648

4749
[[bench]]
4850
name = "metrics"

opentelemetry/src/context.rs

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,8 @@ impl Default for ContextStack {
520520
#[cfg(test)]
521521
mod tests {
522522
use super::*;
523+
use std::time::Duration;
524+
use tokio::time::sleep;
523525

524526
#[derive(Debug, PartialEq)]
525527
struct ValueA(u64);
@@ -588,7 +590,7 @@ mod tests {
588590

589591
assert!(Context::map_current(|cx| {
590592
assert_eq!(cx.get(), Some(&ValueA(1)));
591-
assert_eq!(cx.get::<ValueB>(), None);
593+
assert_eq!(cx.get(), Some(&ValueB(42)));
592594
true
593595
}));
594596
}
@@ -661,7 +663,7 @@ mod tests {
661663
// Push a new context and see that it works
662664
let cx_guard = Context::current().with_value(ValueA(2)).attach();
663665
assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS - 1);
664-
assert_eq!(Context::current().get(), Some(&ValueA(2)));
666+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(2)));
665667
assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
666668
guards.push(cx_guard);
667669
// Let's overflow the stack a couple of times again
@@ -674,8 +676,6 @@ mod tests {
674676
}
675677
}
676678

677-
678-
679679
/// Tests that a new ContextStack is created with the correct initial capacity.
680680
#[test]
681681
fn test_initial_capacity() {
@@ -684,7 +684,6 @@ mod tests {
684684
}
685685

686686
/// Tests that map_current_cx correctly accesses the current context.
687-
688687
#[test]
689688
fn test_map_current_cx() {
690689
let mut stack = ContextStack::default();
@@ -699,7 +698,6 @@ mod tests {
699698
}
700699

701700
/// Tests popping contexts in non-sequential order.
702-
703701
#[test]
704702
fn test_pop_id_out_of_order() {
705703
let mut stack = ContextStack::default();
@@ -779,5 +777,58 @@ mod tests {
779777
Some(&ValueA((max_pos - 2) as u64))
780778
);
781779
}
782-
780+
781+
/// Tests that:
782+
/// 1. Parent context values are properly propagated to async operations
783+
/// 2. Values added during async operations are captured by telemetry but don't affect parent
784+
#[tokio::test]
785+
async fn test_async_context_propagation() {
786+
// Set up initial context with ValueA
787+
let parent_cx = Context::new().with_value(ValueA(42));
788+
789+
// Create and run async operation with the parent context
790+
async {
791+
// Verify we can see the parent context's value
792+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)),
793+
"Parent context value should be available in async operation");
794+
795+
// Create new context with both values
796+
let cx_with_both = Context::current().with_value(ValueB(24));
797+
798+
// Run nested async operation with both values
799+
async {
800+
// Verify both values are available
801+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)),
802+
"Parent value should still be available after adding new value");
803+
assert_eq!(Context::current().get::<ValueB>(), Some(&ValueB(24)),
804+
"New value should be available in async operation");
805+
806+
// Do some async work to simulate real-world scenario
807+
sleep(Duration::from_millis(10)).await;
808+
809+
// Values should still be available after async work
810+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)),
811+
"Parent value should persist across await points");
812+
assert_eq!(Context::current().get::<ValueB>(), Some(&ValueB(24)),
813+
"New value should persist across await points");
814+
}
815+
.with_context(cx_with_both)
816+
.await;
817+
}
818+
.with_context(parent_cx.clone()) // Propagate the parent context to the async operation
819+
.await;
820+
821+
// After async operation completes:
822+
// 1. Parent context should be unchanged
823+
assert_eq!(parent_cx.get::<ValueA>(), Some(&ValueA(42)),
824+
"Parent context should be unchanged");
825+
assert_eq!(parent_cx.get::<ValueB>(), None,
826+
"Parent context should not see values added in async operation");
827+
828+
// 2. Current context should be back to default
829+
assert_eq!(Context::current().get::<ValueA>(), None,
830+
"Current context should be back to default");
831+
assert_eq!(Context::current().get::<ValueB>(), None,
832+
"Current context should not have async operation's values");
833+
}
783834
}

0 commit comments

Comments
 (0)