diff --git a/CHANGELOG.md b/CHANGELOG.md index 024990c91d..e22ecc6108 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3796](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3796)) - `opentelemetry-instrumentation-fastapi`: Fix handling of APIRoute subclasses ([#3681](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3681)) +- `opentelemetry-instrumentation-grpc` User should be able to cancel grpc stream + ([#2093](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2093)) ### Added @@ -32,7 +34,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3743](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3743)) - Add `rstcheck` to pre-commit to stop introducing invalid RST ([#3777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3777)) -- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default +- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default Credentials (https://cloud.google.com/docs/authentication/application-default-credentials) to the OTLP Exporters created automatically by OpenTelemetry Python's auto instrumentation. These credentials authorize OTLP traces to be sent to `telemetry.googleapis.com`. [#3766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3766). - `opentelemetry-instrumentation-psycopg`: Add missing parameter `capture_parameters` to instrumentor. ([#3676](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3676)) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index bbeb4ec1d9..4d554c90ac 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -21,6 +21,7 @@ import logging from collections import OrderedDict +from functools import partial from typing import Callable, MutableMapping import grpc @@ -200,7 +201,9 @@ def _intercept_server_stream( else: mutable_metadata = OrderedDict(metadata) - with self._start_span(client_info.full_method) as span: + with self._start_span( + client_info.full_method, end_on_exit=False + ) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) rpc_info = RpcInfo( @@ -212,12 +215,27 @@ def _intercept_server_stream( if client_info.is_client_stream: rpc_info.request = request_or_iterator - try: - yield from invoker(request_or_iterator, metadata) - except grpc.RpcError as err: - span.set_status(Status(StatusCode.ERROR)) - span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0]) - raise err + stream = invoker(request_or_iterator, metadata) + + def done_callback(future, span_): + try: + future.result() + except grpc.FutureCancelledError: + span_.set_status(Status(StatusCode.OK)) + span_.set_attribute( + RPC_GRPC_STATUS_CODE, + grpc.StatusCode.CANCELLED.value[0], + ) + except grpc.RpcError as err: + span_.set_status(Status(StatusCode.ERROR)) + span_.set_attribute( + RPC_GRPC_STATUS_CODE, err.code().value[0] + ) + finally: + span_.end() + + stream.add_done_callback(partial(done_callback, span_=span)) + return stream def intercept_stream( self, request_or_iterator, metadata, client_info, invoker diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py index 67e7d0a625..4fea8e7d90 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py @@ -45,17 +45,19 @@ def request_messages(): ) -def server_streaming_method(stub, error=False): +def server_streaming_method(stub, error=False, serialize=True): request = Request( client_id=CLIENT_ID, request_data="error" if error else "data" ) response_iterator = stub.ServerStreamingMethod( request, metadata=(("key", "value"),) ) - list(response_iterator) + if serialize: + list(response_iterator) + return response_iterator -def bidirectional_streaming_method(stub, error=False): +def bidirectional_streaming_method(stub, error=False, serialize=True): def request_messages(): for _ in range(5): request = Request( @@ -67,4 +69,6 @@ def request_messages(): request_messages(), metadata=(("key", "value"),) ) - list(response_iterator) + if serialize: + list(response_iterator) + return response_iterator diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index e001d2ed57..21789de29a 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -13,6 +13,7 @@ # limitations under the License. # pylint:disable=cyclic-import +import threading from unittest import mock import grpc @@ -171,6 +172,38 @@ def test_unary_stream(self): }, ) + def test_unary_stream_can_be_cancel(self): + done = threading.Event() + responses = server_streaming_method(self._stub, serialize=False) + responses.add_done_callback(lambda _: done.set()) + for resp, _ in enumerate(responses): + if resp == 1: + responses.cancel() + break + self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) + done.wait(5) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod") + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationScope( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "ServerStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[0], + }, + ) + def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -221,6 +254,40 @@ def test_stream_stream(self): }, ) + def test_stream_stream_can_be_cancel(self): + done = threading.Event() + responses = bidirectional_streaming_method(self._stub, serialize=False) + responses.add_done_callback(lambda _: done.set()) + for resp, _ in enumerate(responses): + if resp == 1: + responses.cancel() + break + self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED) + done.wait(5) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual( + span.name, "/GRPCTestServer/BidirectionalStreamingMethod" + ) + self.assertIs(span.kind, trace.SpanKind.CLIENT) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationScope( + span, opentelemetry.instrumentation.grpc + ) + + self.assertSpanHasAttributes( + span, + { + RPC_METHOD: "BidirectionalStreamingMethod", + RPC_SERVICE: "GRPCTestServer", + RPC_SYSTEM: "grpc", + RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[0], + }, + ) + def test_error_simple(self): with self.assertRaises(grpc.RpcError): simple_method(self._stub, error=True) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py index 437175c4b2..f5754549c0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_filter.py @@ -101,7 +101,12 @@ def setUp(self): self.server.start() # use a user defined interceptor along with the opentelemetry client interceptor interceptors = [Interceptor()] - self.channel = grpc.insecure_channel("localhost:25565") + self.channel = grpc.insecure_channel( + "localhost:25565", + options=[ + (grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1) + ], + ) self.channel = grpc.intercept_channel(self.channel, *interceptors) self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)