Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3796](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3796))
- `opentelemetry-instrumentation-fastapi`: Fix handling of APIRoute subclasses
([#3681](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3681))
- `opentelemetry-instrumentation-grpc` User should be able to cancel grpc stream
([#2093](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2093))

### Added

Expand All @@ -32,7 +34,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3743](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3743))
- Add `rstcheck` to pre-commit to stop introducing invalid RST
([#3777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3777))
- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default
- `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default
Credentials (https://cloud.google.com/docs/authentication/application-default-credentials) to the OTLP Exporters created automatically by OpenTelemetry Python's auto instrumentation. These credentials authorize OTLP traces to be sent to `telemetry.googleapis.com`. [#3766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3766).
- `opentelemetry-instrumentation-psycopg`: Add missing parameter `capture_parameters` to instrumentor.
([#3676](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3676))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging
from collections import OrderedDict
from functools import partial
from typing import Callable, MutableMapping

import grpc
Expand Down Expand Up @@ -200,7 +201,9 @@ def _intercept_server_stream(
else:
mutable_metadata = OrderedDict(metadata)

with self._start_span(client_info.full_method) as span:
with self._start_span(
client_info.full_method, end_on_exit=False
) as span:
inject(mutable_metadata, setter=_carrier_setter)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
Expand All @@ -212,12 +215,27 @@ def _intercept_server_stream(
if client_info.is_client_stream:
rpc_info.request = request_or_iterator

try:
yield from invoker(request_or_iterator, metadata)
except grpc.RpcError as err:
span.set_status(Status(StatusCode.ERROR))
span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0])
raise err
stream = invoker(request_or_iterator, metadata)

def done_callback(future, span_):
try:
future.result()
except grpc.FutureCancelledError:
span_.set_status(Status(StatusCode.OK))
span_.set_attribute(
RPC_GRPC_STATUS_CODE,
grpc.StatusCode.CANCELLED.value[0],
)
except grpc.RpcError as err:
span_.set_status(Status(StatusCode.ERROR))
span_.set_attribute(
RPC_GRPC_STATUS_CODE, err.code().value[0]
)
finally:
span_.end()

stream.add_done_callback(partial(done_callback, span_=span))
return stream

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,19 @@ def request_messages():
)


def server_streaming_method(stub, error=False):
def server_streaming_method(stub, error=False, serialize=True):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
response_iterator = stub.ServerStreamingMethod(
request, metadata=(("key", "value"),)
)
list(response_iterator)
if serialize:
list(response_iterator)
return response_iterator


def bidirectional_streaming_method(stub, error=False):
def bidirectional_streaming_method(stub, error=False, serialize=True):
def request_messages():
for _ in range(5):
request = Request(
Expand All @@ -67,4 +69,6 @@ def request_messages():
request_messages(), metadata=(("key", "value"),)
)

list(response_iterator)
if serialize:
list(response_iterator)
return response_iterator
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
# pylint:disable=cyclic-import

import threading
from unittest import mock

import grpc
Expand Down Expand Up @@ -171,6 +172,38 @@ def test_unary_stream(self):
},
)

def test_unary_stream_can_be_cancel(self):
done = threading.Event()
responses = server_streaming_method(self._stub, serialize=False)
responses.add_done_callback(lambda _: done.set())
for resp, _ in enumerate(responses):
if resp == 1:
responses.cancel()
break
self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED)
done.wait(5)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]

self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationScope(
span, opentelemetry.instrumentation.grpc
)

self.assertSpanHasAttributes(
span,
{
RPC_METHOD: "ServerStreamingMethod",
RPC_SERVICE: "GRPCTestServer",
RPC_SYSTEM: "grpc",
RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[0],
},
)

def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand Down Expand Up @@ -221,6 +254,40 @@ def test_stream_stream(self):
},
)

def test_stream_stream_can_be_cancel(self):
done = threading.Event()
responses = bidirectional_streaming_method(self._stub, serialize=False)
responses.add_done_callback(lambda _: done.set())
for resp, _ in enumerate(responses):
if resp == 1:
responses.cancel()
break
self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED)
done.wait(5)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]

self.assertEqual(
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
)
self.assertIs(span.kind, trace.SpanKind.CLIENT)

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationScope(
span, opentelemetry.instrumentation.grpc
)

self.assertSpanHasAttributes(
span,
{
RPC_METHOD: "BidirectionalStreamingMethod",
RPC_SERVICE: "GRPCTestServer",
RPC_SYSTEM: "grpc",
RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[0],
},
)

def test_error_simple(self):
with self.assertRaises(grpc.RpcError):
simple_method(self._stub, error=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ def setUp(self):
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.insecure_channel(
"localhost:25565",
options=[
(grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1)
],
)
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)

Expand Down