From 62c24e74df7b53249581602c9177b70c7c41c83f Mon Sep 17 00:00:00 2001 From: Ondrej Unger Date: Tue, 12 Dec 2023 16:34:49 +0100 Subject: [PATCH 1/8] Test solution with add done callback --- .../instrumentation/grpc/_client.py | 47 +++++++++++------ .../tests/_client.py | 6 ++- .../tests/test_client_interceptor.py | 50 ++++++++++++++++--- .../tests/test_client_interceptor_filter.py | 4 +- 4 files changed, 82 insertions(+), 25 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index bbeb4ec1d9..980824482d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -21,6 +21,7 @@ import logging from collections import OrderedDict +from functools import partial from typing import Callable, MutableMapping import grpc @@ -77,12 +78,11 @@ def _safe_invoke(function: Callable, *args): "Error when invoking function '%s'", function_name, exc_info=ex ) - class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, tracer, filter_=None, request_hook=None, response_hook=None ): self._tracer = tracer self._filter = filter_ @@ -136,10 +136,10 @@ def _intercept(self, request, metadata, client_info, invoker): else: mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + client_info.full_method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, ) as span: result = None try: @@ -193,14 +193,17 @@ def intercept_unary(self, request, metadata, client_info, invoker): # the span across the generated responses and detect any errors, we wrap # the result in a new generator that yields the response values. def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not metadata: mutable_metadata = OrderedDict() else: mutable_metadata = OrderedDict(metadata) - with self._start_span(client_info.full_method) as span: + with self._start_span( + client_info.full_method, + end_on_exit=False + ) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) rpc_info = RpcInfo( @@ -212,15 +215,29 @@ def _intercept_server_stream( if client_info.is_client_stream: rpc_info.request = request_or_iterator - try: - yield from invoker(request_or_iterator, metadata) - except grpc.RpcError as err: - span.set_status(Status(StatusCode.ERROR)) - span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0]) - raise err + stream = invoker(request_or_iterator, metadata) + + def done_callback(future, span_): + try: + future.result() + except grpc.FutureCancelledError: + span_.set_status(Status(StatusCode.OK)) + span_.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] + ) + except grpc.RpcError as err: + span_.set_status(Status(StatusCode.ERROR)) + span_.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] + ) + finally: + span_.end() + + stream.add_done_callback(partial(done_callback, span_=span)) + return stream def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not is_instrumentation_enabled(): return invoker(request_or_iterator, metadata) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index 67e7d0a625..26815a3e01 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -45,14 +45,16 @@ def request_messages(): ) -def server_streaming_method(stub, error=False): +def server_streaming_method(stub, error=False, serialize=True): request = Request( client_id=CLIENT_ID, request_data="error" if error else "data" ) response_iterator = stub.ServerStreamingMethod( request, metadata=(("key", "value"),) ) - list(response_iterator) + if serialize: + list(response_iterator) + return response_iterator def bidirectional_streaming_method(stub, error=False): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index e001d2ed57..5bd09a4423 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint:disable=cyclic-import +from time import sleep from unittest import mock @@ -61,24 +62,24 @@ def __init__(self): pass def intercept_unary_unary( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_unary_stream( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_stream_unary( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator ) def intercept_stream_stream( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator @@ -86,7 +87,7 @@ def intercept_stream_stream( @staticmethod def _intercept_call( - continuation, client_call_details, request_or_iterator + continuation, client_call_details, request_or_iterator ): return continuation(client_call_details, request_or_iterator) @@ -99,7 +100,9 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.insecure_channel("localhost:25565", options=[ + # (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + ]) self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) @@ -171,6 +174,39 @@ def test_unary_stream(self): }, ) + def test_unary_stream_can_be_cancel(self): + responses = server_streaming_method(self._stub) + for i, _ in enumerate(responses): + if i == 1: + responses.cancel() + break + sleep(10) + self.server.stop(None) + self.channel.close() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ + 0 + ], + }, + ) + def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -296,7 +332,7 @@ def invoker(_request, _metadata): self.assertEqual(span_end_mock.call_count, 1) def test_client_interceptor_trace_context_propagation( - self, + self, ): # pylint: disable=no-self-use """ensure that client interceptor correctly inject trace context into all outgoing requests.""" previous_propagator = get_global_textmap() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index 437175c4b2..9bd86d634b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -101,7 +101,9 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.insecure_channel("localhost:25565",options=[ + (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + ]) self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) From a2cf54a189dabfbd202801b7f28daea3f9c6a1d0 Mon Sep 17 00:00:00 2001 From: Ondrej Unger Date: Thu, 14 Dec 2023 15:47:13 +0100 Subject: [PATCH 2/8] Wrap strean response in proxy that manage span lifetime --- .../instrumentation/grpc/_client.py | 88 +++++++++++++------ .../tests/_client.py | 11 +-- .../tests/test_client_interceptor.py | 81 ++++++++++++++--- .../tests/test_client_interceptor_filter.py | 4 +- 4 files changed, 133 insertions(+), 51 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 980824482d..5c46d5a723 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -21,12 +21,12 @@ import logging from collections import OrderedDict -from functools import partial from typing import Callable, MutableMapping import grpc +import wrapt -from opentelemetry import trace +from opentelemetry import context, trace from opentelemetry.instrumentation.grpc import grpcext from opentelemetry.instrumentation.grpc._utilities import RpcInfo from opentelemetry.instrumentation.utils import is_instrumentation_enabled @@ -78,11 +78,59 @@ def _safe_invoke(function: Callable, *args): "Error when invoking function '%s'", function_name, exc_info=ex ) + +class OpenTelemetryStreamWrapper(wrapt.ObjectProxy): + def __init__(self, wrapped, span: trace.Span): + super().__init__(wrapped) + self._self_span = span + + def _end_span_if_not_already_ended(self, status_code=None, status=None): + if self._self_span.end_time is None: + self._self_span.end() + if status_code is not None: + self._self_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, status_code + ) + if status is not None: + self._self_span.set_status(status) + + def __del__(self): + self._end_span_if_not_already_ended() + self.__wrapped__.__del__() + + def __iter__(self): + return self + + def cancel(self): + self._end_span_if_not_already_ended( + status_code=grpc.StatusCode.CANCELLED.value[0] + ) + return self.__wrapped__.cancel() + + def __next__(self): + return self._next() + + def next(self): + return self._next() + + def _next(self): + try: + return self.__wrapped__._next() + except StopIteration: + self._end_span_if_not_already_ended() + raise + except grpc.RpcError as err: + self._end_span_if_not_already_ended( + err.code().value[0], Status(StatusCode.ERROR) + ) + raise err + + class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, tracer, filter_=None, request_hook=None, response_hook=None ): self._tracer = tracer self._filter = filter_ @@ -136,10 +184,10 @@ def _intercept(self, request, metadata, client_info, invoker): else: mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + client_info.full_method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, ) as span: result = None try: @@ -193,7 +241,7 @@ def intercept_unary(self, request, metadata, client_info, invoker): # the span across the generated responses and detect any errors, we wrap # the result in a new generator that yields the response values. def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not metadata: mutable_metadata = OrderedDict() @@ -201,8 +249,7 @@ def _intercept_server_stream( mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, - end_on_exit=False + client_info.full_method, end_on_exit=False ) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -217,27 +264,10 @@ def _intercept_server_stream( stream = invoker(request_or_iterator, metadata) - def done_callback(future, span_): - try: - future.result() - except grpc.FutureCancelledError: - span_.set_status(Status(StatusCode.OK)) - span_.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] - ) - except grpc.RpcError as err: - span_.set_status(Status(StatusCode.ERROR)) - span_.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] - ) - finally: - span_.end() - - stream.add_done_callback(partial(done_callback, span_=span)) - return stream + return OpenTelemetryStreamWrapper(stream, span) def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not is_instrumentation_enabled(): return invoker(request_or_iterator, metadata) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index 26815a3e01..b6f8a82e24 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -57,7 +57,7 @@ def server_streaming_method(stub, error=False, serialize=True): return response_iterator -def bidirectional_streaming_method(stub, error=False): +def bidirectional_streaming_method(stub, error=False, serialize=True): def request_messages(): for _ in range(5): request = Request( @@ -65,8 +65,9 @@ def request_messages(): ) yield request - response_iterator = stub.BidirectionalStreamingMethod( - request_messages(), metadata=(("key", "value"),) + response_iterator = stub.ServerStreamingMethod( + request, metadata=(("key", "value"),) ) - - list(response_iterator) + if serialize: + list(response_iterator) + return response_iterator diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 5bd09a4423..f9a6c71efc 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint:disable=cyclic-import -from time import sleep from unittest import mock @@ -62,24 +61,24 @@ def __init__(self): pass def intercept_unary_unary( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_unary_stream( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_stream_unary( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator ) def intercept_stream_stream( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator @@ -87,7 +86,7 @@ def intercept_stream_stream( @staticmethod def _intercept_call( - continuation, client_call_details, request_or_iterator + continuation, client_call_details, request_or_iterator ): return continuation(client_call_details, request_or_iterator) @@ -100,9 +99,7 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565", options=[ - # (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) - ]) + self.channel = grpc.insecure_channel("localhost:25565") self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) @@ -175,14 +172,11 @@ def test_unary_stream(self): ) def test_unary_stream_can_be_cancel(self): - responses = server_streaming_method(self._stub) + responses = server_streaming_method(self._stub, serialize=False) for i, _ in enumerate(responses): if i == 1: responses.cancel() break - sleep(10) - self.server.stop(None) - self.channel.close() spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -207,6 +201,33 @@ def test_unary_stream_can_be_cancel(self): }, ) + def test_finished_stream_cancel_does_not_change_status_of_span(self): + responses = server_streaming_method(self._stub, serialize=True) + responses.cancel() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ + 0 + ], + }, + ) + def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -257,6 +278,38 @@ def test_stream_stream(self): }, ) + def test_stream_stream_can_be_cancel(self): + responses = bidirectional_streaming_method(self._stub, serialize=False) + for i, _ in enumerate(responses): + if i == 1: + responses.cancel() + break + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + SpanAttributes.RPC_METHOD: "BidirectionalStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ + 0 + ], + }, + ) + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) @@ -332,7 +385,7 @@ def invoker(_request, _metadata): self.assertEqual(span_end_mock.call_count, 1) def test_client_interceptor_trace_context_propagation( - self, + self, ): # pylint: disable=no-self-use """ensure that client interceptor correctly inject trace context into all outgoing requests.""" previous_propagator = get_global_textmap() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index 9bd86d634b..437175c4b2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -101,9 +101,7 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565",options=[ - (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) - ]) + self.channel = grpc.insecure_channel("localhost:25565") self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) From 641beb7425b8630d8a835638e254befc26a7dada Mon Sep 17 00:00:00 2001 From: Ondrej Unger Date: Fri, 10 May 2024 17:42:27 +0200 Subject: [PATCH 3/8] Fix failing test, lint, and update changelog - set_attribute and also set_status are not called if span was already ended. Simple reorder of function calls helped - I also realized that accessing end_time is dangerous, in SDK it is protected by lock. I solved this by remembering if we already called end. Calling multiple times end on span is not problematic, but it generates warning logs and is probably not good practice - updated CHANGELOG.md - fix lint --- .../instrumentation/grpc/_client.py | 21 ++++++++++++------- .../tests/test_client_interceptor.py | 8 +++---- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 5c46d5a723..dc0619791f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -79,20 +79,25 @@ def _safe_invoke(function: Callable, *args): ) +# pylint:disable=abstract-method class OpenTelemetryStreamWrapper(wrapt.ObjectProxy): def __init__(self, wrapped, span: trace.Span): super().__init__(wrapped) self._self_span = span + self._span_ended = False def _end_span_if_not_already_ended(self, status_code=None, status=None): - if self._self_span.end_time is None: - self._self_span.end() - if status_code is not None: - self._self_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, status_code - ) - if status is not None: - self._self_span.set_status(status) + if self._span_ended: + return + + if status_code is not None: + self._self_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, status_code + ) + if status is not None: + self._self_span.set_status(status) + self._span_ended = True + self._self_span.end() def __del__(self): self._end_span_if_not_already_ended() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index f9a6c71efc..9e26055093 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -173,8 +173,8 @@ def test_unary_stream(self): def test_unary_stream_can_be_cancel(self): responses = server_streaming_method(self._stub, serialize=False) - for i, _ in enumerate(responses): - if i == 1: + for response_num, _ in enumerate(responses): + if response_num == 1: responses.cancel() break spans = self.memory_exporter.get_finished_spans() @@ -280,8 +280,8 @@ def test_stream_stream(self): def test_stream_stream_can_be_cancel(self): responses = bidirectional_streaming_method(self._stub, serialize=False) - for i, _ in enumerate(responses): - if i == 1: + for response_num, _ in enumerate(responses): + if response_num == 1: responses.cancel() break spans = self.memory_exporter.get_finished_spans() From 58683c1891198a089d359244d21445d927c09f53 Mon Sep 17 00:00:00 2001 From: Ondrej Unger Date: Fri, 28 Jun 2024 11:56:49 +0200 Subject: [PATCH 4/8] Revert "Fix failing test, lint, and update changelog" This reverts commit 5b26e33f --- .../instrumentation/grpc/_client.py | 21 +++++++------------ .../tests/test_client_interceptor.py | 8 +++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index dc0619791f..5c46d5a723 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -79,25 +79,20 @@ def _safe_invoke(function: Callable, *args): ) -# pylint:disable=abstract-method class OpenTelemetryStreamWrapper(wrapt.ObjectProxy): def __init__(self, wrapped, span: trace.Span): super().__init__(wrapped) self._self_span = span - self._span_ended = False def _end_span_if_not_already_ended(self, status_code=None, status=None): - if self._span_ended: - return - - if status_code is not None: - self._self_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, status_code - ) - if status is not None: - self._self_span.set_status(status) - self._span_ended = True - self._self_span.end() + if self._self_span.end_time is None: + self._self_span.end() + if status_code is not None: + self._self_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, status_code + ) + if status is not None: + self._self_span.set_status(status) def __del__(self): self._end_span_if_not_already_ended() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 9e26055093..f9a6c71efc 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -173,8 +173,8 @@ def test_unary_stream(self): def test_unary_stream_can_be_cancel(self): responses = server_streaming_method(self._stub, serialize=False) - for response_num, _ in enumerate(responses): - if response_num == 1: + for i, _ in enumerate(responses): + if i == 1: responses.cancel() break spans = self.memory_exporter.get_finished_spans() @@ -280,8 +280,8 @@ def test_stream_stream(self): def test_stream_stream_can_be_cancel(self): responses = bidirectional_streaming_method(self._stub, serialize=False) - for response_num, _ in enumerate(responses): - if response_num == 1: + for i, _ in enumerate(responses): + if i == 1: responses.cancel() break spans = self.memory_exporter.get_finished_spans() From ce09c313322bbbcff65125d3a18f9359b709a906 Mon Sep 17 00:00:00 2001 From: Ondrej Unger Date: Fri, 28 Jun 2024 11:57:15 +0200 Subject: [PATCH 5/8] Revert "Wrap strean response in proxy that manage span lifetime" This reverts commit d0abacf9 --- .../instrumentation/grpc/_client.py | 89 ++++++------------- .../tests/_client.py | 7 +- .../tests/test_client_interceptor.py | 81 +++-------------- .../tests/test_client_interceptor_filter.py | 4 +- 4 files changed, 49 insertions(+), 132 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 5c46d5a723..f03fdafe80 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -21,12 +21,11 @@ import logging from collections import OrderedDict +from functools import partial from typing import Callable, MutableMapping import grpc -import wrapt - -from opentelemetry import context, trace +from opentelemetry import trace from opentelemetry.instrumentation.grpc import grpcext from opentelemetry.instrumentation.grpc._utilities import RpcInfo from opentelemetry.instrumentation.utils import is_instrumentation_enabled @@ -78,59 +77,11 @@ def _safe_invoke(function: Callable, *args): "Error when invoking function '%s'", function_name, exc_info=ex ) - -class OpenTelemetryStreamWrapper(wrapt.ObjectProxy): - def __init__(self, wrapped, span: trace.Span): - super().__init__(wrapped) - self._self_span = span - - def _end_span_if_not_already_ended(self, status_code=None, status=None): - if self._self_span.end_time is None: - self._self_span.end() - if status_code is not None: - self._self_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, status_code - ) - if status is not None: - self._self_span.set_status(status) - - def __del__(self): - self._end_span_if_not_already_ended() - self.__wrapped__.__del__() - - def __iter__(self): - return self - - def cancel(self): - self._end_span_if_not_already_ended( - status_code=grpc.StatusCode.CANCELLED.value[0] - ) - return self.__wrapped__.cancel() - - def __next__(self): - return self._next() - - def next(self): - return self._next() - - def _next(self): - try: - return self.__wrapped__._next() - except StopIteration: - self._end_span_if_not_already_ended() - raise - except grpc.RpcError as err: - self._end_span_if_not_already_ended( - err.code().value[0], Status(StatusCode.ERROR) - ) - raise err - - class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, tracer, filter_=None, request_hook=None, response_hook=None ): self._tracer = tracer self._filter = filter_ @@ -184,10 +135,10 @@ def _intercept(self, request, metadata, client_info, invoker): else: mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + client_info.full_method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, ) as span: result = None try: @@ -241,7 +192,7 @@ def intercept_unary(self, request, metadata, client_info, invoker): # the span across the generated responses and detect any errors, we wrap # the result in a new generator that yields the response values. def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not metadata: mutable_metadata = OrderedDict() @@ -249,7 +200,8 @@ def _intercept_server_stream( mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, end_on_exit=False + client_info.full_method, + end_on_exit=False ) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -264,10 +216,27 @@ def _intercept_server_stream( stream = invoker(request_or_iterator, metadata) - return OpenTelemetryStreamWrapper(stream, span) + def done_callback(future, span_): + try: + future.result() + except grpc.FutureCancelledError: + span_.set_status(Status(StatusCode.OK)) + span_.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] + ) + except grpc.RpcError as err: + span_.set_status(Status(StatusCode.ERROR)) + span_.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] + ) + finally: + span_.end() + + stream.add_done_callback(partial(done_callback, span_=span)) + return stream def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not is_instrumentation_enabled(): return invoker(request_or_iterator, metadata) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index b6f8a82e24..661d7094de 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -57,7 +57,7 @@ def server_streaming_method(stub, error=False, serialize=True): return response_iterator -def bidirectional_streaming_method(stub, error=False, serialize=True): +def bidirectional_streaming_method(stub, error=False): def request_messages(): for _ in range(5): request = Request( @@ -68,6 +68,5 @@ def request_messages(): response_iterator = stub.ServerStreamingMethod( request, metadata=(("key", "value"),) ) - if serialize: - list(response_iterator) - return response_iterator + + list(response_iterator) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index f9a6c71efc..5bd09a4423 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint:disable=cyclic-import +from time import sleep from unittest import mock @@ -61,24 +62,24 @@ def __init__(self): pass def intercept_unary_unary( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_unary_stream( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_stream_unary( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator ) def intercept_stream_stream( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator @@ -86,7 +87,7 @@ def intercept_stream_stream( @staticmethod def _intercept_call( - continuation, client_call_details, request_or_iterator + continuation, client_call_details, request_or_iterator ): return continuation(client_call_details, request_or_iterator) @@ -99,7 +100,9 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.insecure_channel("localhost:25565", options=[ + # (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + ]) self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) @@ -172,11 +175,14 @@ def test_unary_stream(self): ) def test_unary_stream_can_be_cancel(self): - responses = server_streaming_method(self._stub, serialize=False) + responses = server_streaming_method(self._stub) for i, _ in enumerate(responses): if i == 1: responses.cancel() break + sleep(10) + self.server.stop(None) + self.channel.close() spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -201,33 +207,6 @@ def test_unary_stream_can_be_cancel(self): }, ) - def test_finished_stream_cancel_does_not_change_status_of_span(self): - responses = server_streaming_method(self._stub, serialize=True) - responses.cancel() - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - span = spans[0] - - self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") - self.assertIs(span.kind, trace.SpanKind.CLIENT) - - # Check version and name in span's instrumentation info - self.assertEqualSpanInstrumentationInfo( - span, opentelemetry.instrumentation.grpc - ) - - self.assertSpanHasAttributes( - span, - { - SpanAttributes.RPC_METHOD: "ServerStreamingMethod", - SpanAttributes.RPC_SERVICE: "GRPCTestServer", - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], - }, - ) - def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -278,38 +257,6 @@ def test_stream_stream(self): }, ) - def test_stream_stream_can_be_cancel(self): - responses = bidirectional_streaming_method(self._stub, serialize=False) - for i, _ in enumerate(responses): - if i == 1: - responses.cancel() - break - spans = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans), 1) - span = spans[0] - - self.assertEqual( - span.name, "/GRPCTestServer/BidirectionalStreamingMethod" - ) - self.assertIs(span.kind, trace.SpanKind.CLIENT) - - # Check version and name in span's instrumentation info - self.assertEqualSpanInstrumentationInfo( - span, opentelemetry.instrumentation.grpc - ) - - self.assertSpanHasAttributes( - span, - { - SpanAttributes.RPC_METHOD: "BidirectionalStreamingMethod", - SpanAttributes.RPC_SERVICE: "GRPCTestServer", - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ - 0 - ], - }, - ) - def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) @@ -385,7 +332,7 @@ def invoker(_request, _metadata): self.assertEqual(span_end_mock.call_count, 1) def test_client_interceptor_trace_context_propagation( - self, + self, ): # pylint: disable=no-self-use """ensure that client interceptor correctly inject trace context into all outgoing requests.""" previous_propagator = get_global_textmap() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index 437175c4b2..9bd86d634b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -101,7 +101,9 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.insecure_channel("localhost:25565",options=[ + (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + ]) self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) From 9e59ba6ecc02facc612cf06da3fcb6c4dea107a3 Mon Sep 17 00:00:00 2001 From: Ondrej Unger Date: Fri, 28 Jun 2024 18:27:41 +0200 Subject: [PATCH 6/8] Test calling code first --- .../opentelemetry/instrumentation/grpc/_client.py | 1 + .../tests/test_client_interceptor.py | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index f03fdafe80..e5dad4efe7 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -217,6 +217,7 @@ def _intercept_server_stream( stream = invoker(request_or_iterator, metadata) def done_callback(future, span_): + logger.exception("done_callback") try: future.result() except grpc.FutureCancelledError: diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 5bd09a4423..4709704746 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint:disable=cyclic-import +import logging from time import sleep from unittest import mock @@ -100,9 +101,7 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565", options=[ - # (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) - ]) + self.channel = grpc.insecure_channel("localhost:25565") self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) @@ -175,14 +174,16 @@ def test_unary_stream(self): ) def test_unary_stream_can_be_cancel(self): - responses = server_streaming_method(self._stub) + responses = server_streaming_method(self._stub, serialize=False) for i, _ in enumerate(responses): if i == 1: responses.cancel() break - sleep(10) - self.server.stop(None) - self.channel.close() + self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) + # self.server.stop(None) + # self.channel.close() + logging.exception("Getting spans") + # sleep(10) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] From 9bdf76acb71dd35803dd0dcb0ba2bcc2a2669868 Mon Sep 17 00:00:00 2001 From: Aidan Jensen Date: Thu, 9 Oct 2025 10:34:35 -0700 Subject: [PATCH 7/8] Fix tests --- CHANGELOG.md | 4 +- .../instrumentation/grpc/_client.py | 5 +- .../tests/_client.py | 10 ++-- .../tests/test_client_interceptor.py | 55 +++++++++++++++---- 4 files changed, 56 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d33472c8e6..b0383d1676 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3624](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624)) - `opentelemetry-instrumentation-dbapi`: fix crash retrieving libpq version when enabling commenter with psycopg ([#3796](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3796)) +- `opentelemetry-instrumentation-grpc` User should be able to cancel grpc stream + ([#2093](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2093)) ### Added @@ -24,7 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3765](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3765)) - Add `rstcheck` to pre-commit to stop introducing invalid RST ([#3777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3777)) -- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default +- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default Credentials (https://cloud.google.com/docs/authentication/application-default-credentials) to the OTLP Exporters created automatically by OpenTelemetry Python's auto instrumentation. These credentials authorize OTLP traces to be sent to `telemetry.googleapis.com`. [#3766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3766). ## Version 1.37.0/0.58b0 (2025-09-11) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index e5dad4efe7..5704a42459 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -217,18 +217,17 @@ def _intercept_server_stream( stream = invoker(request_or_iterator, metadata) def done_callback(future, span_): - logger.exception("done_callback") try: future.result() except grpc.FutureCancelledError: span_.set_status(Status(StatusCode.OK)) span_.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] + RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] ) except grpc.RpcError as err: span_.set_status(Status(StatusCode.ERROR)) span_.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0] + RPC_GRPC_STATUS_CODE, err.code().value[0] ) finally: span_.end() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index 661d7094de..4fea8e7d90 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -57,7 +57,7 @@ def server_streaming_method(stub, error=False, serialize=True): return response_iterator -def bidirectional_streaming_method(stub, error=False): +def bidirectional_streaming_method(stub, error=False, serialize=True): def request_messages(): for _ in range(5): request = Request( @@ -65,8 +65,10 @@ def request_messages(): ) yield request - response_iterator = stub.ServerStreamingMethod( - request, metadata=(("key", "value"),) + response_iterator = stub.BidirectionalStreamingMethod( + request_messages(), metadata=(("key", "value"),) ) - list(response_iterator) + if serialize: + list(response_iterator) + return response_iterator diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index 4709704746..e47f676edc 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -13,8 +13,9 @@ # limitations under the License. # pylint:disable=cyclic-import import logging -from time import sleep +import threading +import time from unittest import mock import grpc @@ -174,16 +175,15 @@ def test_unary_stream(self): ) def test_unary_stream_can_be_cancel(self): + done = threading.Event() responses = server_streaming_method(self._stub, serialize=False) + responses.add_done_callback(lambda: done.set()) for i, _ in enumerate(responses): if i == 1: responses.cancel() break self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) - # self.server.stop(None) - # self.channel.close() - logging.exception("Getting spans") - # sleep(10) + done.wait(5) spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 1) span = spans[0] @@ -192,17 +192,17 @@ def test_unary_stream_can_be_cancel(self): self.assertIs(span.kind, trace.SpanKind.CLIENT) # Check version and name in span's instrumentation info - self.assertEqualSpanInstrumentationInfo( + self.assertEqualSpanInstrumentationScope( span, opentelemetry.instrumentation.grpc ) self.assertSpanHasAttributes( span, { - SpanAttributes.RPC_METHOD: "ServerStreamingMethod", - SpanAttributes.RPC_SERVICE: "GRPCTestServer", - SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ + RPC_METHOD: "ServerStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ 0 ], }, @@ -258,6 +258,41 @@ def test_stream_stream(self): }, ) + def test_stream_stream_can_be_cancel(self): + done = threading.Event() + responses = bidirectional_streaming_method(self._stub, serialize=False) + responses.add_done_callback(lambda: done.set()) + for i, _ in enumerate(responses): + if i == 1: + responses.cancel() + break + self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) + done.wait(5) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/BidirectionalStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationScope( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "BidirectionalStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ + 0 + ], + }, + ) + + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) From 979f0c6bd3c68ad3ff17b1475fcd6349c8ab8e0e Mon Sep 17 00:00:00 2001 From: Aidan Jensen Date: Fri, 10 Oct 2025 05:52:47 -0700 Subject: [PATCH 8/8] Fix formatting Signed-off-by: Aidan Jensen --- .../instrumentation/grpc/_client.py | 22 ++++++----- .../tests/test_client_interceptor.py | 39 ++++++++----------- .../tests/test_client_interceptor_filter.py | 9 +++-- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 5704a42459..4d554c90ac 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -25,6 +25,7 @@ from typing import Callable, MutableMapping import grpc + from opentelemetry import trace from opentelemetry.instrumentation.grpc import grpcext from opentelemetry.instrumentation.grpc._utilities import RpcInfo @@ -77,11 +78,12 @@ def _safe_invoke(function: Callable, *args): "Error when invoking function '%s'", function_name, exc_info=ex ) + class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, tracer, filter_=None, request_hook=None, response_hook=None ): self._tracer = tracer self._filter = filter_ @@ -135,10 +137,10 @@ def _intercept(self, request, metadata, client_info, invoker): else: mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, - end_on_exit=False, - record_exception=False, - set_status_on_exception=False, + client_info.full_method, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False, ) as span: result = None try: @@ -192,7 +194,7 @@ def intercept_unary(self, request, metadata, client_info, invoker): # the span across the generated responses and detect any errors, we wrap # the result in a new generator that yields the response values. def _intercept_server_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not metadata: mutable_metadata = OrderedDict() @@ -200,8 +202,7 @@ def _intercept_server_stream( mutable_metadata = OrderedDict(metadata) with self._start_span( - client_info.full_method, - end_on_exit=False + client_info.full_method, end_on_exit=False ) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -222,7 +223,8 @@ def done_callback(future, span_): except grpc.FutureCancelledError: span_.set_status(Status(StatusCode.OK)) span_.set_attribute( - RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0] + RPC_GRPC_STATUS_CODE, + grpc.StatusCode.CANCELLED.value[0], ) except grpc.RpcError as err: span_.set_status(Status(StatusCode.ERROR)) @@ -236,7 +238,7 @@ def done_callback(future, span_): return stream def intercept_stream( - self, request_or_iterator, metadata, client_info, invoker + self, request_or_iterator, metadata, client_info, invoker ): if not is_instrumentation_enabled(): return invoker(request_or_iterator, metadata) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index e47f676edc..21789de29a 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -12,10 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # pylint:disable=cyclic-import -import logging import threading -import time from unittest import mock import grpc @@ -64,24 +62,24 @@ def __init__(self): pass def intercept_unary_unary( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_unary_stream( - self, continuation, client_call_details, request + self, continuation, client_call_details, request ): return self._intercept_call(continuation, client_call_details, request) def intercept_stream_unary( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator ) def intercept_stream_stream( - self, continuation, client_call_details, request_iterator + self, continuation, client_call_details, request_iterator ): return self._intercept_call( continuation, client_call_details, request_iterator @@ -89,7 +87,7 @@ def intercept_stream_stream( @staticmethod def _intercept_call( - continuation, client_call_details, request_or_iterator + continuation, client_call_details, request_or_iterator ): return continuation(client_call_details, request_or_iterator) @@ -177,9 +175,9 @@ def test_unary_stream(self): def test_unary_stream_can_be_cancel(self): done = threading.Event() responses = server_streaming_method(self._stub, serialize=False) - responses.add_done_callback(lambda: done.set()) - for i, _ in enumerate(responses): - if i == 1: + responses.add_done_callback(lambda _: done.set()) + for resp, _ in enumerate(responses): + if resp == 1: responses.cancel() break self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) @@ -202,9 +200,7 @@ def test_unary_stream_can_be_cancel(self): RPC_METHOD: "ServerStreamingMethod", RPC_SERVICE: "GRPCTestServer", RPC_SYSTEM: "grpc", - RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ - 0 - ], + RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[0], }, ) @@ -261,9 +257,9 @@ def test_stream_stream(self): def test_stream_stream_can_be_cancel(self): done = threading.Event() responses = bidirectional_streaming_method(self._stub, serialize=False) - responses.add_done_callback(lambda: done.set()) - for i, _ in enumerate(responses): - if i == 1: + responses.add_done_callback(lambda _: done.set()) + for resp, _ in enumerate(responses): + if resp == 1: responses.cancel() break self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) @@ -272,7 +268,9 @@ def test_stream_stream_can_be_cancel(self): self.assertEqual(len(spans), 1) span = spans[0] - self.assertEqual(span.name, "/GRPCTestServer/BidirectionalStreamingMethod") + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) self.assertIs(span.kind, trace.SpanKind.CLIENT) # Check version and name in span's instrumentation info @@ -286,13 +284,10 @@ def test_stream_stream_can_be_cancel(self): RPC_METHOD: "BidirectionalStreamingMethod", RPC_SERVICE: "GRPCTestServer", RPC_SYSTEM: "grpc", - RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ - 0 - ], + RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[0], }, ) - def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) @@ -368,7 +363,7 @@ def invoker(_request, _metadata): self.assertEqual(span_end_mock.call_count, 1) def test_client_interceptor_trace_context_propagation( - self, + self, ): # pylint: disable=no-self-use """ensure that client interceptor correctly inject trace context into all outgoing requests.""" previous_propagator = get_global_textmap() diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index 9bd86d634b..f5754549c0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -101,9 +101,12 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565",options=[ - (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) - ]) + self.channel = grpc.insecure_channel( + "localhost:25565", + options=[ + (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + ], + ) self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)