1- use futures:: TryFuture ;
2- use linkerd_stack:: { NewService , Proxy } ;
1+ use linkerd_stack:: { layer, NewService , Proxy } ;
32use pin_project:: pin_project;
4- use std:: future:: Future ;
5- use std:: pin:: Pin ;
63use std:: task:: { Context , Poll } ;
74use tracing:: instrument:: { Instrument as _, Instrumented } ;
85use tracing:: { trace, Span } ;
@@ -12,133 +9,105 @@ pub trait GetSpan<T> {
129 fn get_span ( & self , target : & T ) -> tracing:: Span ;
1310}
1411
15- /// A middleware that instruments tracing for stacks.
1612#[ derive( Clone , Debug ) ]
17- pub struct InstrumentMakeLayer < G > {
13+ pub struct NewInstrumentLayer < G > {
1814 get_span : G ,
1915}
2016
2117/// Instruments a `MakeService` or `NewService` stack.
2218#[ derive( Clone , Debug ) ]
23- pub struct InstrumentMake < G , M > {
19+ pub struct NewInstrument < G , N > {
2420 get_span : G ,
25- make : M ,
21+ inner : N ,
2622}
2723
28- /// Instruments a service produced by `InstrumentMake `.
24+ /// Instruments a service produced by `NewInstrument `.
2925#[ pin_project]
30- #[ derive( Clone , Debug ) ]
31- pub struct Instrument < S > {
32- span : Span ,
26+ #[ derive( Debug ) ]
27+ pub struct Instrument < T , G , S > {
28+ target : T ,
29+ /// When this is a `Service` (and not a `Proxy`), we consider the `poll_ready`
30+ /// calls that drive the service to readiness and the `call` future that
31+ /// consumes that readiness to be part of one logical span (so, for example,
32+ /// we track time waiting for readiness as part of the request span's idle
33+ /// time).
34+ ///
35+ /// Therefore, we hang onto one instance of the span that's created when we
36+ /// are first polled after having been called, and take that span instance
37+ /// in `call`.
38+ current_span : Option < Span > ,
39+ get_span : G ,
3340 #[ pin]
3441 inner : S ,
3542}
3643
37- // === impl InstrumentMakeLayer ===
44+ // === impl NewInstrumentLayer ===
3845
39- impl < G > InstrumentMakeLayer < G > {
46+ impl < G > NewInstrumentLayer < G > {
4047 pub fn new ( get_span : G ) -> Self {
4148 Self { get_span }
4249 }
4350}
4451
45- impl InstrumentMakeLayer < ( ) > {
52+ impl NewInstrumentLayer < ( ) > {
4653 pub fn from_target ( ) -> Self {
4754 Self :: new ( ( ) )
4855 }
4956}
5057
51- impl < G : Clone , M > tower :: layer:: Layer < M > for InstrumentMakeLayer < G > {
52- type Service = InstrumentMake < G , M > ;
58+ impl < G : Clone , N > layer:: Layer < N > for NewInstrumentLayer < G > {
59+ type Service = NewInstrument < G , N > ;
5360
54- fn layer ( & self , make : M ) -> Self :: Service {
55- Self :: Service {
56- make ,
61+ fn layer ( & self , inner : N ) -> Self :: Service {
62+ NewInstrument {
63+ inner ,
5764 get_span : self . get_span . clone ( ) ,
5865 }
5966 }
6067}
6168
62- // === impl InstrumentMake ===
63-
64- impl < T , G , N > NewService < T > for InstrumentMake < G , N >
65- where
66- G : GetSpan < T > ,
67- N : NewService < T > ,
68- {
69- type Service = Instrument < N :: Service > ;
69+ // === impl NewInstrument ===
7070
71- fn new_service ( & mut self , target : T ) -> Self :: Service {
72- let span = self . get_span . get_span ( & target) ;
73- let inner = span. in_scope ( move || {
74- trace ! ( "new" ) ;
75- self . make . new_service ( target)
76- } ) ;
77- Instrument { inner, span }
71+ impl < G : Clone , N > NewInstrument < G , N > {
72+ pub fn layer ( get_span : G ) -> NewInstrumentLayer < G > {
73+ NewInstrumentLayer :: new ( get_span)
7874 }
7975}
8076
81- impl < T , G , M > tower :: Service < T > for InstrumentMake < G , M >
77+ impl < T , G , N > NewService < T > for NewInstrument < G , N >
8278where
83- G : GetSpan < T > ,
84- M : tower:: Service < T > ,
79+ T : Clone ,
80+ G : GetSpan < T > + Clone ,
81+ N : NewService < T > ,
8582{
86- type Response = Instrument < M :: Response > ;
87- type Error = M :: Error ;
88- type Future = Instrument < M :: Future > ;
83+ type Service = Instrument < T , G , N :: Service > ;
8984
90- fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
91- let ready = self . make . poll_ready ( cx) ;
92- match ready {
93- Poll :: Pending => trace ! ( ready = false , "make" ) ,
94- Poll :: Ready ( ref res) => trace ! ( ready = true , ok = res. is_ok( ) , "make" ) ,
85+ fn new_service ( & mut self , target : T ) -> Self :: Service {
86+ let _span = self . get_span . get_span ( & target) . entered ( ) ;
87+ trace ! ( "new" ) ;
88+ let inner = self . inner . new_service ( target. clone ( ) ) ;
89+ Instrument {
90+ inner,
91+ target,
92+ current_span : None ,
93+ get_span : self . get_span . clone ( ) ,
9594 }
96- ready
97- }
98-
99- fn call ( & mut self , target : T ) -> Self :: Future {
100- let span = self . get_span . get_span ( & target) ;
101- let inner = span. in_scope ( || {
102- trace ! ( "make" ) ;
103- self . make . call ( target)
104- } ) ;
105- Instrument { inner, span }
10695 }
10796}
10897
10998// === impl Instrument ===
11099
111- impl < F > Future for Instrument < F >
112- where
113- F : TryFuture ,
114- {
115- type Output = Result < Instrument < F :: Ok > , F :: Error > ;
116-
117- fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
118- let this = self . project ( ) ;
119- let _enter = this. span . enter ( ) ;
120-
121- trace ! ( "making" ) ;
122- match this. inner . try_poll ( cx) ? {
123- Poll :: Pending => {
124- trace ! ( ready = false ) ;
125- Poll :: Pending
126- }
127- Poll :: Ready ( inner) => {
128- trace ! ( ready = true ) ;
129- let svc = Instrument {
130- inner,
131- span : this. span . clone ( ) ,
132- } ;
133- Poll :: Ready ( Ok ( svc) )
134- }
135- }
100+ impl < T , G : GetSpan < T > , S > Instrument < T , G , S > {
101+ #[ inline]
102+ fn get_span ( & self ) -> Span {
103+ self . get_span . get_span ( & self . target )
136104 }
137105}
138106
139- impl < Req , S , P > Proxy < Req , S > for Instrument < P >
107+ impl < Req , S , T , G , P > Proxy < Req , S > for Instrument < T , G , P >
140108where
141109 Req : std:: fmt:: Debug ,
110+ G : GetSpan < T > ,
142111 P : Proxy < Req , S > ,
143112 S : tower:: Service < P :: Request > ,
144113{
@@ -148,23 +117,32 @@ where
148117 type Future = Instrumented < P :: Future > ;
149118
150119 fn proxy ( & self , svc : & mut S , request : Req ) -> Self :: Future {
151- let _enter = self . span . enter ( ) ;
120+ let span = self . get_span ( ) . entered ( ) ;
152121 trace ! ( ?request, "proxy" ) ;
153- self . inner . proxy ( svc, request) . instrument ( self . span . clone ( ) )
122+ self . inner . proxy ( svc, request) . instrument ( span. exit ( ) )
154123 }
155124}
156125
157- impl < Req , S > tower:: Service < Req > for Instrument < S >
126+ impl < Req , T , G , S > tower:: Service < Req > for Instrument < T , G , S >
158127where
159128 Req : std:: fmt:: Debug ,
129+ G : GetSpan < T > ,
160130 S : tower:: Service < Req > ,
161131{
162132 type Response = S :: Response ;
163133 type Error = S :: Error ;
164134 type Future = Instrumented < S :: Future > ;
165135
166136 fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
167- let _enter = self . span . enter ( ) ;
137+ // These need to be borrowed individually, or else the
138+ // `get_or_insert_with` closure will borrow *all* of `self` while
139+ // `self.current_span` is borrowed mutably... T_T
140+ let get_span = & self . get_span ;
141+ let target = & self . target ;
142+ let _enter = self
143+ . current_span
144+ . get_or_insert_with ( || get_span. get_span ( target) )
145+ . enter ( ) ;
168146
169147 let ready = self . inner . poll_ready ( cx) ;
170148 match ready {
@@ -175,10 +153,40 @@ where
175153 }
176154
177155 fn call ( & mut self , request : Req ) -> Self :: Future {
178- let _enter = self . span . enter ( ) ;
156+ let span = self
157+ . current_span
158+ . take ( )
159+ // NOTE(eliza): if `current_span` is `None` here, we were called
160+ // before being driven to readiness, which is invalid --- we're
161+ // permitted to panic here, so we could unwrap this. But, it's not
162+ // important...we can just make a new span, so I thought it was
163+ // better to err on the side of not panicking.
164+ . unwrap_or_else ( || self . get_span ( ) )
165+ . entered ( ) ;
179166
180167 trace ! ( ?request, "service" ) ;
181- self . inner . call ( request) . instrument ( self . span . clone ( ) )
168+ self . inner . call ( request) . instrument ( span. exit ( ) )
169+ }
170+ }
171+
172+ impl < T , G , S > Clone for Instrument < T , G , S >
173+ where
174+ T : Clone ,
175+ G : Clone ,
176+ S : Clone ,
177+ {
178+ fn clone ( & self ) -> Self {
179+ // Manually implement `Clone` so that each clone of an instrumented
180+ // service has its own "current span" state, since each clone of the
181+ // inner service will have its own independent readiness state.
182+ Self {
183+ target : self . target . clone ( ) ,
184+ inner : self . inner . clone ( ) ,
185+ get_span : self . get_span . clone ( ) ,
186+ // If this is a `Service`, the clone will construct its own span
187+ // when it's first driven to readiness.
188+ current_span : None ,
189+ }
182190 }
183191}
184192
@@ -198,9 +206,3 @@ impl<T: GetSpan<()>> GetSpan<T> for () {
198206 t. get_span ( & ( ) )
199207 }
200208}
201-
202- impl < T > GetSpan < T > for tracing:: Span {
203- fn get_span ( & self , _: & T ) -> tracing:: Span {
204- self . clone ( )
205- }
206- }
0 commit comments