Skip to content

Commit 716cee2

Browse files
committed
move FutureExt over too
1 parent f781de0 commit 716cee2

File tree

4 files changed

+82
-81
lines changed

4 files changed

+82
-81
lines changed

opentelemetry/src/context/future_ext.rs

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,84 @@
1+
use std::pin::Pin;
2+
use std::task::Poll;
3+
use futures_core::Stream;
4+
use futures_sink::Sink;
5+
use pin_project_lite::pin_project;
16
use crate::Context;
2-
use crate::trace::WithContext;
7+
use std::task::Context as TaskContext;
8+
impl<T: Sized> FutureExt for T {}
9+
10+
impl<T: std::future::Future> std::future::Future for WithContext<T> {
11+
type Output = T::Output;
12+
13+
fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
14+
let this = self.project();
15+
let _guard = this.otel_cx.clone().attach();
16+
17+
this.inner.poll(task_cx)
18+
}
19+
}
20+
21+
impl<T: Stream> Stream for WithContext<T> {
22+
type Item = T::Item;
23+
24+
fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
25+
let this = self.project();
26+
let _guard = this.otel_cx.clone().attach();
27+
T::poll_next(this.inner, task_cx)
28+
}
29+
}
30+
31+
pin_project! {
32+
/// A future, stream, or sink that has an associated context.
33+
#[derive(Clone, Debug)]
34+
pub struct WithContext<T> {
35+
#[pin]
36+
inner: T,
37+
otel_cx: Context,
38+
}
39+
}
40+
41+
impl<I, T: Sink<I>> Sink<I> for WithContext<T>
42+
where
43+
T: Sink<I>,
44+
{
45+
type Error = T::Error;
46+
47+
fn poll_ready(
48+
self: Pin<&mut Self>,
49+
task_cx: &mut TaskContext<'_>,
50+
) -> Poll<Result<(), Self::Error>> {
51+
let this = self.project();
52+
let _guard = this.otel_cx.clone().attach();
53+
T::poll_ready(this.inner, task_cx)
54+
}
55+
56+
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
57+
let this = self.project();
58+
let _guard = this.otel_cx.clone().attach();
59+
T::start_send(this.inner, item)
60+
}
61+
62+
fn poll_flush(
63+
self: Pin<&mut Self>,
64+
task_cx: &mut TaskContext<'_>,
65+
) -> Poll<Result<(), Self::Error>> {
66+
let this = self.project();
67+
let _guard = this.otel_cx.clone().attach();
68+
T::poll_flush(this.inner, task_cx)
69+
}
70+
71+
fn poll_close(
72+
self: Pin<&mut Self>,
73+
task_cx: &mut TaskContext<'_>,
74+
) -> Poll<Result<(), Self::Error>> {
75+
let this = self.project();
76+
let _enter = this.otel_cx.clone().attach();
77+
T::poll_close(this.inner, task_cx)
78+
}
79+
}
80+
81+
382

483
/// Extension trait allowing futures, streams, and sinks to be traced with a span.
584
pub trait FutureExt: Sized {

opentelemetry/src/context/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ mod future_ext;
33

44
pub use context_store::Context;
55
pub use context_store::ContextGuard;
6-
pub use future_ext::FutureExt;
6+
pub use future_ext::FutureExt;

opentelemetry/src/trace/context.rs

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,10 @@ use crate::{
44
trace::{Span, SpanContext, Status},
55
Context, ContextGuard, KeyValue,
66
};
7-
use futures_core::stream::Stream;
8-
use futures_sink::Sink;
9-
use pin_project_lite::pin_project;
107
use std::{
118
borrow::Cow,
129
error::Error,
13-
pin::Pin,
1410
sync::Mutex,
15-
task::{Context as TaskContext, Poll},
1611
};
1712

1813
// Re-export for compatability. This used to be contained here.
@@ -375,76 +370,3 @@ where
375370
Context::map_current(|cx| f(cx.span()))
376371
}
377372

378-
pin_project! {
379-
/// A future, stream, or sink that has an associated context.
380-
#[derive(Clone, Debug)]
381-
pub struct WithContext<T> {
382-
#[pin]
383-
pub(crate) inner: T,
384-
pub(crate) otel_cx: Context,
385-
}
386-
}
387-
388-
impl<T: Sized> FutureExt for T {}
389-
390-
impl<T: std::future::Future> std::future::Future for WithContext<T> {
391-
type Output = T::Output;
392-
393-
fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
394-
let this = self.project();
395-
let _guard = this.otel_cx.clone().attach();
396-
397-
this.inner.poll(task_cx)
398-
}
399-
}
400-
401-
impl<T: Stream> Stream for WithContext<T> {
402-
type Item = T::Item;
403-
404-
fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
405-
let this = self.project();
406-
let _guard = this.otel_cx.clone().attach();
407-
T::poll_next(this.inner, task_cx)
408-
}
409-
}
410-
411-
impl<I, T: Sink<I>> Sink<I> for WithContext<T>
412-
where
413-
T: Sink<I>,
414-
{
415-
type Error = T::Error;
416-
417-
fn poll_ready(
418-
self: Pin<&mut Self>,
419-
task_cx: &mut TaskContext<'_>,
420-
) -> Poll<Result<(), Self::Error>> {
421-
let this = self.project();
422-
let _guard = this.otel_cx.clone().attach();
423-
T::poll_ready(this.inner, task_cx)
424-
}
425-
426-
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
427-
let this = self.project();
428-
let _guard = this.otel_cx.clone().attach();
429-
T::start_send(this.inner, item)
430-
}
431-
432-
fn poll_flush(
433-
self: Pin<&mut Self>,
434-
task_cx: &mut TaskContext<'_>,
435-
) -> Poll<Result<(), Self::Error>> {
436-
let this = self.project();
437-
let _guard = this.otel_cx.clone().attach();
438-
T::poll_flush(this.inner, task_cx)
439-
}
440-
441-
fn poll_close(
442-
self: Pin<&mut Self>,
443-
task_cx: &mut TaskContext<'_>,
444-
) -> Poll<Result<(), Self::Error>> {
445-
let this = self.project();
446-
let _enter = this.otel_cx.clone().attach();
447-
T::poll_close(this.inner, task_cx)
448-
}
449-
}
450-

opentelemetry/src/trace/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ mod tracer_provider;
178178

179179
pub use self::{
180180
context::{
181-
get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext,
181+
get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt
182182
},
183183
span::{Span, SpanKind, Status},
184184
span_context::{SpanContext, TraceState},

0 commit comments

Comments
 (0)