diff --git a/CHANGELOG.md b/CHANGELOG.md index ba513aebfa..58afe7e8dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572)) +- grpc instrumentation uses official grpc.*ClientInterceptor-interfaces + ([#1583](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1583)) +- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. + ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572)) - `opentelemetry-instrumentation-celery` Record exceptions as events on the span. ([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573)) - Add metric instrumentation for urllib diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 25010e147b..d5d4a97334 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -59,6 +59,18 @@ def run(): logging.basicConfig() run() +You can also add the interceptors manually, rather than using +:py:class:`~opentelemetry.instrumentation.grpc.GrpcInstrumentorClient`: + +.. code-block:: python + + from opentelemetry.instrumentation.grpc import client_interceptors + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:50051"), + *client_interceptors() + ) + Usage Server ------------ .. code-block:: python @@ -285,7 +297,6 @@ async def serve(): negate, service_name, ) -from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.package import _instruments from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -469,11 +480,10 @@ def _uninstrument(self, **kwargs): def wrapper_fn(self, original_func, instance, args, kwargs): channel = original_func(*args, **kwargs) tracer_provider = kwargs.get("tracer_provider") - return intercept_channel( + return grpc.intercept_channel( channel, - client_interceptor( - tracer_provider=tracer_provider, - filter_=self._filter, + *client_interceptors( + tracer_provider=tracer_provider, filter_=self._filter ), ) @@ -541,8 +551,8 @@ def _uninstrument(self, **kwargs): grpc.aio.secure_channel = self._original_secure -def client_interceptor(tracer_provider=None, filter_=None): - """Create a gRPC client channel interceptor. +def client_interceptors(tracer_provider=None, filter_=None): + """Create gRPC client channel interceptors. Args: tracer: The tracer to use to create client-side spans. @@ -552,13 +562,18 @@ def client_interceptor(tracer_provider=None, filter_=None): all requests. Returns: - An invocation-side interceptor object. + A list of invocation-side interceptor objects. """ from . import _client tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _client.OpenTelemetryClientInterceptor(tracer, filter_=filter_) + return [ + _client.UnaryUnaryClientInterceptor(tracer, filter_=filter_), + _client.UnaryStreamClientInterceptor(tracer, filter_=filter_), + _client.StreamUnaryClientInterceptor(tracer, filter_=filter_), + _client.StreamStreamClientInterceptor(tracer, filter_=filter_), + ] def server_interceptor(tracer_provider=None, filter_=None): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index c7630bfe9f..1b38e61c55 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -18,14 +18,11 @@ import grpc from grpc.aio import ClientCallDetails -from opentelemetry import context -from opentelemetry.instrumentation.grpc._client import ( - OpenTelemetryClientInterceptor, - _carrier_setter, -) +from opentelemetry import context, trace +from opentelemetry.instrumentation.grpc._client import _carrier_setter from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagate import inject -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv.trace import RpcSystemValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode @@ -49,7 +46,12 @@ def callback(call): return callback -class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor): +class _BaseAioClientInterceptor: + + def __init__(self, tracer, filter_=None): + self._tracer = tracer + self._filter = filter_ + @staticmethod def propagate_trace_in_details(client_call_details): metadata = client_call_details.metadata @@ -99,6 +101,22 @@ def _start_interceptor_span(self, method): set_status_on_exception=False, ) + def _start_span(self, method, **kwargs): + service, meth = method.lstrip("/").split("/", 1) + attributes = { + SpanAttributes.RPC_SYSTEM: RpcSystemValues.GRPC.value, + SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: meth, + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], + } + + return self._tracer.start_as_current_span( + name=method, + kind=trace.SpanKind.CLIENT, + attributes=attributes, + **kwargs, + ) + async def _wrap_unary_response(self, continuation, span): try: call = await continuation() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 55a46d4a49..9f1cb32516 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -20,17 +20,17 @@ """Implementation of the invocation-side open-telemetry interceptor.""" from collections import OrderedDict +import functools from typing import MutableMapping import grpc +from grpc._interceptor import _ClientCallDetails from opentelemetry import context, trace -from opentelemetry.instrumentation.grpc import grpcext -from opentelemetry.instrumentation.grpc._utilities import RpcInfo from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagate import inject from opentelemetry.propagators.textmap import Setter -from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.semconv.trace import RpcSystemValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode @@ -46,33 +46,127 @@ def set(self, carrier: MutableMapping[str, str], key: str, value: str): _carrier_setter = _CarrierSetter() -def _make_future_done_callback(span, rpc_info): +def _unary_done_callback(span): + def callback(response_future): with trace.use_span(span, end_on_exit=True): code = response_future.code() if code != grpc.StatusCode.OK: - rpc_info.error = code - return - response = response_future.result() - rpc_info.response = response + details = response_future.details() + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}: {details}", + ) + ) + + try: + span.record_exception(response_future.exception()) + except grpc.FutureCancelledError: + pass return callback -class OpenTelemetryClientInterceptor( - grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor -): +class _BaseClientInterceptor: + """Base for client interceptors. + + Supplies convenient functions which are required by all four client + interceptors. + """ + def __init__(self, tracer, filter_=None): + """Initializes the base for client interceptors. + + Args: + tracer: The tracer to use for tracing. + filter_: An optional filter to filter specific requests to be + instrumented. + """ self._tracer = tracer self._filter = filter_ + @staticmethod + def propagate_trace_in_details(client_call_details): + """Propagates the trace into the metadata of the call. + + Args: + client_call_details: The original + :py:class:`~grpc.ClientCallDetails`, describing the outgoing + RPC. + + Returns: + An adapted version of the original + :py:class:`~grpc.ClientCallDetails`, describing the outgoing RPC, + whereby the metadata contains the trace ID. + """ + metadata = client_call_details.metadata + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + inject(mutable_metadata, setter=_carrier_setter) + metadata = tuple(mutable_metadata.items()) + + return _ClientCallDetails( + client_call_details.method, + client_call_details.timeout, + metadata, + # credentials, wait_for_ready, and compression, depending on + # grpc-version + *client_call_details[3:] + ) + + @staticmethod + def add_error_details_to_span( + span: trace.Span, + exc: Exception, + ) -> None: + """Adds error and details to an active span. + + Args: + span: The active span. + exc: The exception to get code and details from. + """ + if isinstance(exc, grpc.RpcError): + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + exc.code().value[0], + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + span.record_exception(exc) + def _start_span(self, method, **kwargs): + """Context manager for creating a new span and set it as the current + span in the tracer's context. + + Exiting the context manager will call the span's end method, as well as + return the current span to its previous value by returning to the + previous context. + + Args: + method: The method name of the RPC. + **kwargs: Further keyword arguments, passed through to + :py:meth:`~opentelemetry.trace.Tracer.start_as_current_span`. + + Yields: + The newly-created span. + """ service, meth = method.lstrip("/").split("/", 1) attributes = { - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], - SpanAttributes.RPC_METHOD: meth, + SpanAttributes.RPC_SYSTEM: RpcSystemValues.GRPC.value, SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: meth, + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], } return self._tracer.start_as_current_span( @@ -82,123 +176,213 @@ def _start_span(self, method, **kwargs): **kwargs, ) - # pylint:disable=no-self-use - def _trace_result(self, span, rpc_info, result): + def _wrap_unary_response(self, span, continuation): + """Wraps a unary-response-RPC to record a possible exception. + + Args: + span: The active span. + continuation: A callable which is created by: + + .. code-block:: python + + functools.partial( + continuation, client_call_details, request_or_iterator + ) + + Returns: + The response if the RPC is called synchonously, or the + :py:class:`~grpc.Future` if the RPC is called asnchronously. + """ + response_future = None + try: + response_future = continuation() + except Exception as exc: + self.add_error_details_to_span(span, exc) + raise exc + finally: + if not response_future: + span.end() + # If the RPC is called asynchronously, add a callback to end the span # when the future is done, else end the span immediately - if isinstance(result, grpc.Future): - result.add_done_callback( - _make_future_done_callback(span, rpc_info) - ) - return result - response = result - # Handle the case when the RPC is initiated via the with_call - # method and the result is a tuple with the first element as the - # response. - # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call - if isinstance(result, tuple): - response = result[0] - rpc_info.response = response + if isinstance( response_future, grpc.Future): + response_future.add_done_callback(_unary_done_callback(span)) + return response_future + span.end() - return result + return response_future - def _intercept(self, request, metadata, client_info, invoker): - if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return invoker(request, metadata) + def _wrap_stream_response(self, span, call): + """Wraps a stream-response-RPC to record a possible exception. + + Args: + span: The active span. + call: The response iterator which is created by: + + .. code-block:: python + + continuation(client_call_details, request_or_iterator) + + Returns: + The response iterator. + """ + try: + yield from call + except Exception as exc: + self.add_error_details_to_span(span, exc) + raise exc + finally: + span.end() + + def tracing_skipped( + self, + client_call_details: grpc.ClientCallDetails + ) -> bool: + """Returns whether a call is supposed to be skipped for tracing. + + Args: + client_call_details: A :py:class:`~grpc.ClientCallDetails`-object, + describing the outgoing RPC. + + Returns: + True if: + + - no filter is set, + - the :py:class:`~grpc.ClientCallDetails` matches a set filter, + - the instrumentation is suppressed, + + False otherwise. + """ + return ( + context.get_value(_SUPPRESS_INSTRUMENTATION_KEY) + or not self.rpc_matches_filters(client_call_details) + ) + + def rpc_matches_filters( + self, + client_call_details: grpc.ClientCallDetails + ) -> bool: + """Returns whether the :py:class:`~grpc.ClientCallDetails` matches a + set `filter_`. + + Args: + client_call_details: A :py:class:`~grpc.ClientCallDetails`-object, + describing the outgoing RPC. + + Returns: + True if no filter is set or the :py:class:`~grpc.ClientCallDetails` + matches a set filter, False otherwise. + """ + return self._filter is None or self._filter(client_call_details) + + +class UnaryUnaryClientInterceptor( + grpc.UnaryUnaryClientInterceptor, + _BaseClientInterceptor, +): + + def intercept_unary_unary( + self, + continuation, + client_call_details, + request + ): + if self.tracing_skipped(client_call_details): + return continuation(client_call_details, request) - if not metadata: - mutable_metadata = OrderedDict() - else: - mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, + client_call_details.method, end_on_exit=False, record_exception=False, set_status_on_exception=False, ) as span: - result = None - try: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, - request=request, - ) + new_details = self.propagate_trace_in_details(client_call_details) - result = invoker(request, metadata) - except Exception as exc: - if isinstance(exc, grpc.RpcError): - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, - exc.code().value[0], - ) - span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{type(exc).__name__}: {exc}", - ) - ) - span.record_exception(exc) - raise exc - finally: - if not result: - span.end() - return self._trace_result(span, rpc_info, result) - - def intercept_unary(self, request, metadata, client_info, invoker): - if self._filter is not None and not self._filter(client_info): - return invoker(request, metadata) - return self._intercept(request, metadata, client_info, invoker) - - # For RPCs that stream responses, the result can be a generator. To record - # the span across the generated responses and detect any errors, we wrap - # the result in a new generator that yields the response values. - def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker + continuation_with_args = functools.partial( + continuation, new_details, request + ) + + return self._wrap_unary_response(span, continuation_with_args) + + +class StreamUnaryClientInterceptor( + grpc.StreamUnaryClientInterceptor, + _BaseClientInterceptor, +): + + def intercept_stream_unary( + self, + continuation, + client_call_details, + request_iterator ): - if not metadata: - mutable_metadata = OrderedDict() - else: - mutable_metadata = OrderedDict(metadata) + if self.tracing_skipped(client_call_details): + return continuation(client_call_details, request_iterator) + + with self._start_span( + client_call_details.method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) - with self._start_span(client_info.full_method) as span: - inject(mutable_metadata, setter=_carrier_setter) - metadata = tuple(mutable_metadata.items()) - rpc_info = RpcInfo( - full_method=client_info.full_method, - metadata=metadata, - timeout=client_info.timeout, + continuation_with_args = functools.partial( + continuation, new_details, request_iterator ) + return self._wrap_unary_response(span, continuation_with_args) - if client_info.is_client_stream: - rpc_info.request = request_or_iterator - try: - yield from invoker(request_or_iterator, metadata) - except grpc.RpcError as err: - span.set_status(Status(StatusCode.ERROR)) - span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] - ) - raise err +class UnaryStreamClientInterceptor( + grpc.UnaryStreamClientInterceptor, + _BaseClientInterceptor, +): - def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + def intercept_unary_stream( + self, + continuation, + client_call_details, + request ): - if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return invoker(request_or_iterator, metadata) + if self.tracing_skipped(client_call_details): + return continuation(client_call_details, request) - if self._filter is not None and not self._filter(client_info): - return invoker(request_or_iterator, metadata) + with self._start_span( + client_call_details.method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + new_details = self.propagate_trace_in_details(client_call_details) - if client_info.is_server_stream: - return self._intercept_server_stream( - request_or_iterator, metadata, client_info, invoker - ) + resp = continuation(new_details, request) - return self._intercept( - request_or_iterator, metadata, client_info, invoker - ) + return self._wrap_stream_response(span, resp) + + +class StreamStreamClientInterceptor( + grpc.StreamStreamClientInterceptor, + _BaseClientInterceptor, +): + + def intercept_stream_stream( + self, + continuation, + client_call_details, + request_iterator + ): + if self.tracing_skipped(client_call_details): + return continuation(client_call_details, request_iterator) + + with self._start_span( + client_call_details.method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, + ) as span: + + new_details = self.propagate_trace_in_details(client_call_details) + + resp = continuation(new_details, request_iterator) + + return self._wrap_stream_response(span, resp) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py deleted file mode 100644 index b6ff7d311a..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Internal utilities.""" - - -class RpcInfo: - def __init__( - self, - full_method=None, - metadata=None, - timeout=None, - request=None, - response=None, - error=None, - ): - self.full_method = full_method - self.metadata = metadata - self.timeout = timeout - self.request = request - self.response = response - self.error = error diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py deleted file mode 100644 index d5e2549bab..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/__init__.py +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=import-outside-toplevel -# pylint:disable=import-self -# pylint:disable=no-name-in-module - -import abc - - -class UnaryClientInfo(abc.ABC): - """Consists of various information about a unary RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ - - -class StreamClientInfo(abc.ABC): - """Consists of various information about a stream RPC on the - invocation-side. - - Attributes: - full_method: A string of the full RPC method, i.e., - /package.service/method. - is_client_stream: Indicates whether the RPC is client-streaming. - is_server_stream: Indicates whether the RPC is server-streaming. - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled, or None if this method should block until - the computation is terminated or is cancelled no matter how long that - takes. - """ - - -class UnaryClientInterceptor(abc.ABC): - """Affords intercepting unary-unary RPCs on the invocation-side.""" - - @abc.abstractmethod - def intercept_unary(self, request, metadata, client_info, invoker): - """Intercepts unary-unary RPCs on the invocation-side. - - Args: - request: The request value for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - client_info: A UnaryClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(request, metadata). - """ - raise NotImplementedError() - - -class StreamClientInterceptor(abc.ABC): - """Affords intercepting stream RPCs on the invocation-side.""" - - @abc.abstractmethod - def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker - ): - """Intercepts stream RPCs on the invocation-side. - - Args: - request_or_iterator: The request value for the RPC if - `client_info.is_client_stream` is `false`; otherwise, an iterator of - request values. - metadata: Optional :term:`metadata` to be transmitted to the service-side - of the RPC. - client_info: A StreamClientInfo containing various information about - the RPC. - invoker: The handler to complete the RPC on the client. It is the - interceptor's responsibility to call it. - - Returns: - The result from calling invoker(metadata). - """ - raise NotImplementedError() - - -def intercept_channel(channel, *interceptors): - """Creates an intercepted channel. - - Args: - channel: A Channel. - interceptors: Zero or more UnaryClientInterceptors or - StreamClientInterceptors - - Returns: - A Channel. - - Raises: - TypeError: If an interceptor derives from neither UnaryClientInterceptor - nor StreamClientInterceptor. - """ - from . import _interceptor - - return _interceptor.intercept_channel(channel, *interceptors) - - -__all__ = ( - "UnaryClientInterceptor", - "StreamClientInfo", - "StreamClientInterceptor", - "intercept_channel", -) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py deleted file mode 100644 index 53ee46a20d..0000000000 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/grpcext/_interceptor.py +++ /dev/null @@ -1,350 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint:disable=relative-beyond-top-level -# pylint:disable=no-member - -"""Implementation of gRPC Python interceptors.""" - - -import collections - -import grpc - -from opentelemetry.instrumentation.grpc import grpcext - - -class _UnaryClientInfo( - collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) -): - pass - - -class _StreamClientInfo( - collections.namedtuple( - "_StreamClientInfo", - ("full_method", "is_client_stream", "is_server_stream", "timeout"), - ) -): - pass - - -class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - def with_call( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable.with_call( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - def future( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable.future( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _UnaryClientInfo(self._method, timeout) - return self._interceptor.intercept_unary( - request, metadata, client_info, invoker - ) - - -class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request, metadata): - return self._base_callable( - request, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, False, True, timeout) - return self._interceptor.intercept_stream( - request, metadata, client_info, invoker - ) - - -class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - def with_call( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable.with_call( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - def future( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable.future( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, False, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - -class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable): - def __init__(self, method, base_callable, interceptor): - self._method = method - self._base_callable = base_callable - self._interceptor = interceptor - - def __call__( - self, - request_iterator, - timeout=None, - metadata=None, - credentials=None, - wait_for_ready=None, - compression=None, - ): - def invoker(request_iterator, metadata): - return self._base_callable( - request_iterator, - timeout, - metadata, - credentials, - wait_for_ready, - compression, - ) - - client_info = _StreamClientInfo(self._method, True, True, timeout) - return self._interceptor.intercept_stream( - request_iterator, metadata, client_info, invoker - ) - - -class _InterceptorChannel(grpc.Channel): - def __init__(self, channel, interceptor): - self._channel = channel - self._interceptor = interceptor - - def subscribe(self, *args, **kwargs): - self._channel.subscribe(*args, **kwargs) - - def unsubscribe(self, *args, **kwargs): - self._channel.unsubscribe(*args, **kwargs) - - def unary_unary( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.unary_unary( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.UnaryClientInterceptor): - return _InterceptorUnaryUnaryMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def unary_stream( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.unary_stream( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorUnaryStreamMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def stream_unary( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.stream_unary( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorStreamUnaryMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def stream_stream( - self, method, request_serializer=None, response_deserializer=None - ): - base_callable = self._channel.stream_stream( - method, request_serializer, response_deserializer - ) - if isinstance(self._interceptor, grpcext.StreamClientInterceptor): - return _InterceptorStreamStreamMultiCallable( - method, base_callable, self._interceptor - ) - return base_callable - - def close(self): - if not hasattr(self._channel, "close"): - raise RuntimeError( - "close() is not supported with the installed version of grpcio" - ) - self._channel.close() - - def __enter__(self): - """Enters the runtime context related to the channel object.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Exits the runtime context related to the channel object.""" - self.close() - - -def intercept_channel(channel, *interceptors): - result = channel - for interceptor in interceptors: - if not isinstance( - interceptor, grpcext.UnaryClientInterceptor - ) and not isinstance(interceptor, grpcext.StreamClientInterceptor): - raise TypeError( - "interceptor must be either a " - "grpcext.UnaryClientInterceptor or a " - "grpcext.StreamClientInterceptor" - ) - result = _InterceptorChannel(result, interceptor) - return result diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 810ee930dd..75384ae83f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -20,12 +20,7 @@ import opentelemetry.instrumentation.grpc from opentelemetry import context, trace from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient -from opentelemetry.instrumentation.grpc._client import ( - OpenTelemetryClientInterceptor, -) -from opentelemetry.instrumentation.grpc.grpcext._interceptor import ( - _UnaryClientInfo, -) +from opentelemetry.instrumentation.grpc._client import UnaryUnaryClientInterceptor from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagate import get_global_textmap, set_global_textmap from opentelemetry.semconv.trace import SpanAttributes @@ -84,6 +79,16 @@ def _intercept_call( return continuation(client_call_details, request_or_iterator) +class RecordingInterceptor(grpc.UnaryUnaryClientInterceptor): + recorded_details = None + + def intercept_unary_unary( + self, continuation, client_call_details, request + ): + self.recorded_details = client_call_details + return continuation(client_call_details, request) + + class TestClientProto(TestBase): def setUp(self): super().setUp() @@ -277,30 +282,25 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), + *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 2 + assert metadata[0][0] == "mock-traceid" + assert metadata[0][1] == "0" + assert metadata[1][0] == "mock-spanid" + assert metadata[1][1] == "0" finally: set_global_textmap(previous_propagator) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index a15268464b..8b118b94a5 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -24,10 +24,8 @@ from opentelemetry import context, trace from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, filters from opentelemetry.instrumentation.grpc._client import ( - OpenTelemetryClientInterceptor, -) -from opentelemetry.instrumentation.grpc.grpcext._interceptor import ( - _UnaryClientInfo, + UnaryUnaryClientInterceptor, StreamUnaryClientInterceptor, + UnaryStreamClientInterceptor, StreamStreamClientInterceptor ) from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagate import get_global_textmap, set_global_textmap @@ -43,6 +41,7 @@ simple_method_future, ) from ._server import create_test_server +from . test_client_interceptor import RecordingInterceptor from .protobuf.test_server_pb2 import Request @@ -202,30 +201,25 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), + *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 2 + assert metadata[0][0] == "mock-traceid" + assert metadata[0][1] == "0" + assert metadata[1][0] == "mock-spanid" + assert metadata[1][1] == "0" finally: set_global_textmap(previous_propagator) @@ -346,30 +340,25 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), + *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 2 + assert metadata[0][0] == "mock-traceid" + assert metadata[0][1] == "0" + assert metadata[1][0] == "mock-spanid" + assert metadata[1][1] == "0" finally: set_global_textmap(previous_propagator) @@ -609,30 +598,25 @@ def test_client_interceptor_trace_context_propagation( previous_propagator = get_global_textmap() try: set_global_textmap(MockTextMapPropagator()) - interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer()) - - carrier = tuple() - - def invoker(request, metadata): - nonlocal carrier - carrier = metadata - return {} - - request = Request(client_id=1, request_data="data") - interceptor.intercept_unary( - request, - {}, - _UnaryClientInfo( - full_method="/GRPCTestServer/SimpleMethod", timeout=None - ), - invoker=invoker, + interceptor = UnaryUnaryClientInterceptor(trace.NoOpTracer()) + + recording_interceptor = RecordingInterceptor() + interceptors = [interceptor, recording_interceptor] + + channel = grpc.intercept_channel( + grpc.insecure_channel("localhost:25565"), + *interceptors ) - assert len(carrier) == 2 - assert carrier[0][0] == "mock-traceid" - assert carrier[0][1] == "0" - assert carrier[1][0] == "mock-spanid" - assert carrier[1][1] == "0" + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + simple_method(stub) + + metadata = recording_interceptor.recorded_details.metadata + assert len(metadata) == 2 + assert metadata[0][0] == "mock-traceid" + assert metadata[0][1] == "0" + assert metadata[1][0] == "mock-spanid" + assert metadata[1][1] == "0" finally: set_global_textmap(previous_propagator)