Skip to content

Commit a979c73

Browse files
fix(grpc): fix segfault with grpc.aio streaming responses [backport 1.20] (#9276)
Backport 5897cab from #9233 to 1.20. This PR fixes a few issues with the grpc aio integration. Most notably, the integration was causing segfaults when wrapping async stream responses, most likely since these spans were never being finished. This issue was uncovered when customers upgraded their google-api-core dependencies to 2.17.0; with this upgrade, the package changed many grpc calls to use async streaming. In addition to fixing the segfault, this PR also fixes the Pin object to be correctly placed on the grpcio module. Fixes #9139 ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Emmett Butler <[email protected]>
1 parent 7a0e539 commit a979c73

File tree

9 files changed

+425
-25
lines changed

9 files changed

+425
-25
lines changed

ddtrace/contrib/grpc/aio_client_interceptor.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
from ...ext import SpanKind
2828
from ...ext import SpanTypes
2929
from ...internal.compat import to_unicode
30+
from ...internal.logger import get_logger
3031
from ...propagation.http import HTTPPropagator
3132
from ..grpc import constants
3233
from ..grpc import utils
3334

3435

36+
log = get_logger(__name__)
37+
38+
3539
def create_aio_client_interceptors(pin, host, port):
3640
# type: (Pin, str, int) -> Tuple[aio.ClientInterceptor, ...]
3741
return (
@@ -42,7 +46,17 @@ def create_aio_client_interceptors(pin, host, port):
4246
)
4347

4448

45-
def _done_callback(span, code, details):
49+
def _handle_add_callback(call, callback):
50+
try:
51+
call.add_done_callback(callback)
52+
except NotImplementedError:
53+
# add_done_callback is not implemented in UnaryUnaryCallResponse
54+
# https://github.com/grpc/grpc/blob/c54c69dcdd483eba78ed8dbc98c60a8c2d069758/src/python/grpcio/grpc/aio/_interceptor.py#L1058
55+
# If callback is not called, we need to finish the span here
56+
callback(call)
57+
58+
59+
def _done_callback_unary(span, code, details):
4660
# type: (Span, grpc.StatusCode, str) -> Callable[[aio.Call], None]
4761
def func(call):
4862
# type: (aio.Call) -> None
@@ -51,15 +65,45 @@ def func(call):
5165

5266
# Handle server-side error in unary response RPCs
5367
if code != grpc.StatusCode.OK:
54-
_handle_error(span, call, code, details)
68+
_handle_error(span, code, details)
69+
finally:
70+
span.finish()
71+
72+
return func
73+
74+
75+
def _done_callback_stream(span):
76+
# type: (Span) -> Callable[[aio.Call], None]
77+
def func(call):
78+
# type: (aio.Call) -> None
79+
try:
80+
if call.done():
81+
# check to ensure code and details are not already set, in which case this span
82+
# is an error span and already has all error tags from `_handle_cancelled_error`
83+
code_tag = span.get_tag(constants.GRPC_STATUS_CODE_KEY)
84+
details_tag = span.get_tag(ERROR_MSG)
85+
if not code_tag or not details_tag:
86+
# we need to call __repr__ as we cannot call code() or details() since they are both async
87+
code, details = utils._parse_rpc_repr_string(call.__repr__(), grpc)
88+
89+
span.set_tag_str(constants.GRPC_STATUS_CODE_KEY, to_unicode(code))
90+
91+
# Handle server-side error in unary response RPCs
92+
if code != grpc.StatusCode.OK:
93+
_handle_error(span, code, details)
94+
else:
95+
log.warning("Grpc call has not completed, unable to set status code and details on span.")
96+
except ValueError:
97+
# ValueError is thrown from _parse_rpc_repr_string
98+
log.warning("Unable to parse async grpc string for status code and details.")
5599
finally:
56100
span.finish()
57101

58102
return func
59103

60104

61-
def _handle_error(span, call, code, details):
62-
# type: (Span, aio.Call, grpc.StatusCode, str) -> None
105+
def _handle_error(span, code, details):
106+
# type: (Span, grpc.StatusCode, str) -> None
63107
span.error = 1
64108
span.set_tag_str(ERROR_MSG, details)
65109
span.set_tag_str(ERROR_TYPE, to_unicode(code))
@@ -152,13 +196,13 @@ async def _wrap_stream_response(
152196
):
153197
# type: (...) -> ResponseIterableType
154198
try:
199+
_handle_add_callback(call, _done_callback_stream(span))
155200
async for response in call:
156201
yield response
157-
code = await call.code()
158-
details = await call.details()
159-
# NOTE: The callback is registered after the iteration is done,
160-
# otherwise `call.code()` and `call.details()` block indefinitely.
161-
call.add_done_callback(_done_callback(span, code, details))
202+
except StopAsyncIteration:
203+
# Callback will handle span finishing
204+
_handle_cancelled_error(call, span)
205+
raise
162206
except aio.AioRpcError as rpc_error:
163207
# NOTE: We can also handle the error in done callbacks,
164208
# but reuse this error handling function used in unary response RPCs.
@@ -184,7 +228,7 @@ async def _wrap_unary_response(
184228
# NOTE: As both `code` and `details` are available after the RPC is done (= we get `call` object),
185229
# and we can't call awaitable functions inside the non-async callback,
186230
# there is no other way but to register the callback here.
187-
call.add_done_callback(_done_callback(span, code, details))
231+
_handle_add_callback(call, _done_callback_unary(span, code, details))
188232
return call
189233
except aio.AioRpcError as rpc_error:
190234
# NOTE: `AioRpcError` is raised in `await continuation(...)`

ddtrace/contrib/grpc/patch.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def _unpatch_aio_server():
196196
def _client_channel_interceptor(wrapped, instance, args, kwargs):
197197
channel = wrapped(*args, **kwargs)
198198

199-
pin = Pin.get_from(channel)
199+
pin = Pin.get_from(constants.GRPC_PIN_MODULE_CLIENT)
200200
if not pin or not pin.enabled():
201201
return channel
202202

@@ -207,11 +207,10 @@ def _client_channel_interceptor(wrapped, instance, args, kwargs):
207207

208208

209209
def _aio_client_channel_interceptor(wrapped, instance, args, kwargs):
210-
channel = wrapped(*args, **kwargs)
210+
pin = Pin.get_from(GRPC_AIO_PIN_MODULE_CLIENT)
211211

212-
pin = Pin.get_from(channel)
213212
if not pin or not pin.enabled():
214-
return channel
213+
return wrapped(*args, **kwargs)
215214

216215
(host, port) = utils._parse_target_from_args(args, kwargs)
217216

ddtrace/contrib/grpc/utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import re
23

34
from ddtrace.internal.compat import parse
45

@@ -74,3 +75,30 @@ def _parse_target_from_args(args, kwargs):
7475
return hostname, port
7576
except ValueError:
7677
log.warning("Malformed target '%s'.", target)
78+
79+
80+
def _parse_rpc_repr_string(rpc_string, module):
81+
# Define the regular expression patterns to extract status and details
82+
status_pattern = r"status\s*=\s*StatusCode\.(\w+)"
83+
details_pattern = r'details\s*=\s*"([^"]*)"'
84+
85+
# Search for the status and details in the input string
86+
status_match = re.search(status_pattern, rpc_string)
87+
details_match = re.search(details_pattern, rpc_string)
88+
89+
if not status_match or not details_match:
90+
raise ValueError("Unable to parse grpc status or details repr string")
91+
92+
# Extract the status and details from the matches
93+
status_str = status_match.group(1)
94+
details = details_match.group(1)
95+
96+
# Convert the status string to a grpc.StatusCode object
97+
try:
98+
code = module.StatusCode[status_str]
99+
except KeyError:
100+
code = None
101+
raise ValueError("Invalid grpc status code: " + status_str)
102+
103+
# Return the status code and details
104+
return code, details

docs/spelling_wordlist.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ MySQL
1515
OpenTracing
1616
Runtimes
1717
SpanContext
18+
aio
1819
aiobotocore
1920
aiohttp
2021
aiomysql
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
fix(grpc): This change fixes a bug in the grpc.aio support specific to streaming responses.

tests/contrib/grpc_aio/hellostreamingworld_pb2.py

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# isort: off
2+
from typing import ClassVar as _ClassVar
3+
from typing import Optional as _Optional
4+
5+
from ddtrace.internal.compat import PYTHON_VERSION_INFO
6+
7+
if PYTHON_VERSION_INFO > (3, 7):
8+
from google.protobuf import descriptor as _descriptor
9+
from google.protobuf import message as _message
10+
11+
DESCRIPTOR: _descriptor.FileDescriptor
12+
class HelloReply(_message.Message):
13+
__slots__ = ["message"]
14+
MESSAGE_FIELD_NUMBER: _ClassVar[int]
15+
message: str
16+
def __init__(self, message: _Optional[str] = ...) -> None: ...
17+
class HelloRequest(_message.Message):
18+
__slots__ = ["name", "num_greetings"]
19+
NAME_FIELD_NUMBER: _ClassVar[int]
20+
NUM_GREETINGS_FIELD_NUMBER: _ClassVar[int]
21+
name: str
22+
num_greetings: str
23+
def __init__(self, name: _Optional[str] = ..., num_greetings: _Optional[str] = ...) -> None: ...
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
2+
"""Client and server classes corresponding to protobuf-defined services."""
3+
import grpc
4+
5+
from ddtrace.internal.compat import PYTHON_VERSION_INFO
6+
7+
8+
if PYTHON_VERSION_INFO > (3, 7):
9+
from tests.contrib.grpc_aio import hellostreamingworld_pb2 as hellostreamingworld__pb2
10+
11+
class MultiGreeterStub(object):
12+
"""The greeting service definition."""
13+
14+
def __init__(self, channel):
15+
"""Constructor.
16+
17+
Args:
18+
channel: A grpc.Channel.
19+
"""
20+
self.sayHello = channel.unary_stream(
21+
"/hellostreamingworld.MultiGreeter/sayHello",
22+
request_serializer=hellostreamingworld__pb2.HelloRequest.SerializeToString,
23+
response_deserializer=hellostreamingworld__pb2.HelloReply.FromString,
24+
)
25+
26+
class MultiGreeterServicer(object):
27+
"""The greeting service definition."""
28+
29+
def sayHello(self, request, context):
30+
"""Sends multiple greetings"""
31+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
32+
context.set_details("Method not implemented!")
33+
raise NotImplementedError("Method not implemented!")
34+
35+
def add_MultiGreeterServicer_to_server(servicer, server):
36+
rpc_method_handlers = {
37+
"sayHello": grpc.unary_stream_rpc_method_handler(
38+
servicer.sayHello,
39+
request_deserializer=hellostreamingworld__pb2.HelloRequest.FromString,
40+
response_serializer=hellostreamingworld__pb2.HelloReply.SerializeToString,
41+
),
42+
}
43+
generic_handler = grpc.method_handlers_generic_handler("hellostreamingworld.MultiGreeter", rpc_method_handlers)
44+
server.add_generic_rpc_handlers((generic_handler,))
45+
46+
# This class is part of an EXPERIMENTAL API.
47+
class MultiGreeter(object):
48+
"""The greeting service definition."""
49+
50+
@staticmethod
51+
def sayHello(
52+
request,
53+
target,
54+
options=(),
55+
channel_credentials=None,
56+
call_credentials=None,
57+
insecure=False,
58+
compression=None,
59+
wait_for_ready=None,
60+
timeout=None,
61+
metadata=None,
62+
):
63+
return grpc.experimental.unary_stream(
64+
request,
65+
target,
66+
"/hellostreamingworld.MultiGreeter/sayHello",
67+
hellostreamingworld__pb2.HelloRequest.SerializeToString,
68+
hellostreamingworld__pb2.HelloReply.FromString,
69+
options,
70+
channel_credentials,
71+
insecure,
72+
call_credentials,
73+
compression,
74+
wait_for_ready,
75+
timeout,
76+
metadata,
77+
)

0 commit comments

Comments
 (0)