Skip to content

Commit 0b609cf

Browse files
dd-octo-sts[bot]brettlangdonsamschlegelchristophe-papazian
authored
fix(grpc): leaking spans when using gRPC future interface [backport 3.16] (#14900)
Backport 75a8246 from #14875 to 3.16. We've tracked down some issues related to broken span hierarchy down to the way that the gRPC instrumentation deals with entering spans, which causes the grpc span to leak outside it's call when using the .future() interface. This PR changes the instrumentation to explicitly only enter the span when calling the continuations instead of just entering immediately. ## Description <!-- Provide an overview of the change and motivation for the change --> ## Testing I've added tests, but haven't run them for a while and working on getting it running locally. Will update if I run into any issues ## Risks <!-- Note any risks associated with this change, or "None" if no risks --> ## Additional Notes Co-authored-by: Sam Schlegel <[email protected]> Signed-off-by: Sam Schlegel <[email protected]> Co-authored-by: Brett Langdon <[email protected]> Co-authored-by: Sam Schlegel <[email protected]> Co-authored-by: Christophe Papazian <[email protected]>
1 parent ddf29d2 commit 0b609cf

File tree

3 files changed

+110
-27
lines changed

3 files changed

+110
-27
lines changed

ddtrace/contrib/internal/grpc/client_interceptor.py

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import collections
2+
from contextlib import contextmanager
23

34
import grpc
45
import wrapt
@@ -20,6 +21,8 @@
2021
from ddtrace.internal.schema import schematize_url_operation
2122
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
2223
from ddtrace.propagation.http import HTTPPropagator
24+
from ddtrace.trace import Span
25+
from ddtrace.trace import Tracer
2326

2427

2528
log = get_logger(__name__)
@@ -120,10 +123,21 @@ def _handle_error(span, response_error, status_code):
120123
status_code = str(response_error.code())
121124

122125

126+
@contextmanager
127+
def _activated_span(tracer: Tracer, span: Span):
128+
prev_span = tracer.context_provider.active()
129+
tracer.context_provider.activate(span)
130+
try:
131+
yield
132+
finally:
133+
tracer.context_provider.activate(prev_span)
134+
135+
123136
class _WrappedResponseCallFuture(wrapt.ObjectProxy):
124-
def __init__(self, wrapped, span):
137+
def __init__(self, wrapped, span, tracer):
125138
super(_WrappedResponseCallFuture, self).__init__(wrapped)
126139
self._span = span
140+
self._tracer = tracer
127141
# Registers callback on the _MultiThreadedRendezvous future to finish
128142
# span in case StopIteration is never raised but RPC is terminated
129143
_handle_response(self._span, self.__wrapped__)
@@ -140,7 +154,8 @@ def _next(self):
140154
# https://github.com/googleapis/python-api-core/blob/35e87e0aca52167029784379ca84e979098e1d6c/google/api_core/grpc_helpers.py#L84
141155
# https://github.com/GoogleCloudPlatform/grpc-gcp-python/blob/5a2cd9807bbaf1b85402a2a364775e5b65853df6/src/grpc_gcp/_channel.py#L102
142156
try:
143-
return next(self.__wrapped__)
157+
with _activated_span(self._tracer, self._span):
158+
return next(self.__wrapped__)
144159
except StopIteration:
145160
# Callback will handle span finishing
146161
raise
@@ -178,13 +193,17 @@ def __init__(self, pin, host, port):
178193
self._port = port
179194

180195
def _intercept_client_call(self, method_kind, client_call_details):
181-
tracer = self._pin.tracer
196+
tracer: Tracer = self._pin.tracer
182197

183-
span = tracer.trace(
198+
# Instead of using .trace, create the span and activate it at points where we call the continuations
199+
# This avoids the issue of spans being leaked when using the .future interface.
200+
parent = tracer.context_provider.active()
201+
span = tracer.start_span(
184202
schematize_url_operation("grpc", protocol="grpc", direction=SpanDirection.OUTBOUND),
185203
span_type=SpanTypes.GRPC,
186204
service=trace_utils.ext_service(self._pin, config.grpc),
187205
resource=client_call_details.method,
206+
child_of=parent,
188207
)
189208

190209
span.set_tag_str(COMPONENT, config.grpc.integration_name)
@@ -206,7 +225,8 @@ def _intercept_client_call(self, method_kind, client_call_details):
206225
# propagate distributed tracing headers if available
207226
headers = {}
208227
if config.grpc.distributed_tracing_enabled:
209-
HTTPPropagator.inject(span.context, headers)
228+
# NOTE: We need to pass the span to the HTTPPropagator since it isn't active at this point
229+
HTTPPropagator.inject(span.context, headers, span)
210230

211231
metadata = []
212232
if client_call_details.metadata is not None:
@@ -227,15 +247,16 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
227247
constants.GRPC_METHOD_KIND_UNARY,
228248
client_call_details,
229249
)
230-
try:
231-
response = continuation(client_call_details, request)
232-
_handle_response(span, response)
233-
except grpc.RpcError as rpc_error:
234-
# DEV: grpcio<1.18.0 grpc.RpcError is raised rather than returned as response
235-
# https://github.com/grpc/grpc/commit/8199aff7a66460fbc4e9a82ade2e95ef076fd8f9
236-
# handle as a response
237-
_handle_response(span, rpc_error)
238-
raise
250+
with _activated_span(self._pin.tracer, span):
251+
try:
252+
response = continuation(client_call_details, request)
253+
_handle_response(span, response)
254+
except grpc.RpcError as rpc_error:
255+
# DEV: grpcio<1.18.0 grpc.RpcError is raised rather than returned as response
256+
# https://github.com/grpc/grpc/commit/8199aff7a66460fbc4e9a82ade2e95ef076fd8f9
257+
# handle as a response
258+
_handle_response(span, rpc_error)
259+
raise
239260

240261
return response
241262

@@ -244,24 +265,26 @@ def intercept_unary_stream(self, continuation, client_call_details, request):
244265
constants.GRPC_METHOD_KIND_SERVER_STREAMING,
245266
client_call_details,
246267
)
247-
response_iterator = continuation(client_call_details, request)
248-
response_iterator = _WrappedResponseCallFuture(response_iterator, span)
268+
with _activated_span(self._pin.tracer, span):
269+
response_iterator = continuation(client_call_details, request)
270+
response_iterator = _WrappedResponseCallFuture(response_iterator, span, self._pin.tracer)
249271
return response_iterator
250272

251273
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
252274
span, client_call_details = self._intercept_client_call(
253275
constants.GRPC_METHOD_KIND_CLIENT_STREAMING,
254276
client_call_details,
255277
)
256-
try:
257-
response = continuation(client_call_details, request_iterator)
258-
_handle_response(span, response)
259-
except grpc.RpcError as rpc_error:
260-
# DEV: grpcio<1.18.0 grpc.RpcError is raised rather than returned as response
261-
# https://github.com/grpc/grpc/commit/8199aff7a66460fbc4e9a82ade2e95ef076fd8f9
262-
# handle as a response
263-
_handle_response(span, rpc_error)
264-
raise
278+
with _activated_span(self._pin.tracer, span):
279+
try:
280+
response = continuation(client_call_details, request_iterator)
281+
_handle_response(span, response)
282+
except grpc.RpcError as rpc_error:
283+
# DEV: grpcio<1.18.0 grpc.RpcError is raised rather than returned as response
284+
# https://github.com/grpc/grpc/commit/8199aff7a66460fbc4e9a82ade2e95ef076fd8f9
285+
# handle as a response
286+
_handle_response(span, rpc_error)
287+
raise
265288

266289
return response
267290

@@ -270,6 +293,7 @@ def intercept_stream_stream(self, continuation, client_call_details, request_ite
270293
constants.GRPC_METHOD_KIND_BIDI_STREAMING,
271294
client_call_details,
272295
)
273-
response_iterator = continuation(client_call_details, request_iterator)
274-
response_iterator = _WrappedResponseCallFuture(response_iterator, span)
296+
with _activated_span(self._pin.tracer, span):
297+
response_iterator = continuation(client_call_details, request_iterator)
298+
response_iterator = _WrappedResponseCallFuture(response_iterator, span, self._pin.tracer)
275299
return response_iterator
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
grpc: This fix resolves an issue where the internal span was left active in the caller when using the future interface.

tests/contrib/grpc/test_grpc.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,43 @@ def test_schematized_operation_name_v1(self):
612612
assert client_span.name == "grpc.client.request", "Expected 'grpc.client.request', got %s" % client_span.name
613613
assert server_span.name == "grpc.server.request", "Expected 'grpc.server.request', got %s" % server_span.name
614614

615+
def test_span_parent_is_maintained(self):
616+
with self.tracer.trace("root") as _:
617+
with grpc.insecure_channel("localhost:%d" % (_GRPC_PORT)) as channel:
618+
stub = HelloStub(channel)
619+
_ = stub.SayHello(HelloRequest(name="test"))
620+
621+
spans = self.get_spans_with_sync_and_assert(size=3)
622+
root = spans[0]
623+
client = spans[1]
624+
server = spans[2]
625+
assert root.span_id == client.parent_id
626+
assert client.span_id == server.parent_id
627+
628+
def test_active_span_doesnt_leak_with_future(self):
629+
with grpc.insecure_channel("localhost:%d" % (_GRPC_PORT)) as channel:
630+
stub = HelloStub(channel)
631+
future = stub.SayHello.future(HelloRequest(name="test"))
632+
assert self.tracer.current_span() is None
633+
# wait so that we don't cancel the request
634+
future.result()
635+
636+
self.get_spans_with_sync_and_assert(size=2)
637+
638+
def test_span_active_in_custom_interceptor(self):
639+
# add an interceptor that raises a custom exception and check error tags
640+
# are added to spans
641+
interceptor = _SpanActivationClientInterceptor(self.tracer)
642+
with grpc.insecure_channel("localhost:%d" % (_GRPC_PORT)) as channel:
643+
intercept_channel = grpc.intercept_channel(channel, interceptor)
644+
stub = HelloStub(intercept_channel)
645+
stub.SayHello(HelloRequest(name="test"))
646+
647+
spans = self.get_spans_with_sync_and_assert(size=2)
648+
client_span = spans[0]
649+
650+
assert client_span.get_tag("custom_interceptor_worked")
651+
615652

616653
class _CustomException(Exception):
617654
pass
@@ -686,3 +723,21 @@ def handler(request, context):
686723
channel.unary_unary("/pkg.Servicer/Handler")(b"request")
687724
finally:
688725
server.stop(None)
726+
727+
728+
class _SpanActivationClientInterceptor(grpc.UnaryUnaryClientInterceptor):
729+
def __init__(self, tracer) -> None:
730+
super().__init__()
731+
self.tracer = tracer
732+
733+
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
734+
# allow computation to complete
735+
span = self.tracer.current_span()
736+
assert span is not None
737+
738+
span.set_tag("custom_interceptor_worked")
739+
740+
return continuation(client_call_details, request_or_iterator)
741+
742+
def intercept_unary_unary(self, continuation, client_call_details, request):
743+
return self._intercept_call(continuation, client_call_details, request)

0 commit comments

Comments
 (0)