Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.

Commit ca20772

Browse files
prateekrc24t
authored andcommitted
Add ocagent stats exporter (#617)
1 parent a2f7685 commit ca20772

File tree

16 files changed

+1115
-136
lines changed

16 files changed

+1115
-136
lines changed

README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ Trace Exporter
243243
Stats Exporter
244244
--------------
245245

246+
- `OCAgent`_
246247
- `Prometheus`_
247248
- `Stackdriver`_
248249

contrib/opencensus-ext-google-cloud-clientlibs/tests/test_google_cloud_clientlibs_trace.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,18 @@
1515
import unittest
1616

1717
import mock
18+
import grpc
1819

1920
from opencensus.ext.google_cloud_clientlibs import trace
2021

2122

2223
class Test_google_cloud_clientlibs_trace(unittest.TestCase):
24+
def setUp(self):
25+
self._insecure_channel_func = getattr(grpc, 'insecure_channel')
26+
27+
def tearDown(self):
28+
setattr(grpc, 'insecure_channel', self._insecure_channel_func)
29+
2330
def test_trace_integration(self):
2431
mock_trace_grpc = mock.Mock()
2532
mock_trace_http = mock.Mock()

contrib/opencensus-ext-grpc/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## Unreleased
4+
- Create WrappedResponseIterator for intercepted bi-directional rpc stream.
45

56
## 0.1.1
67
Released 2019-04-08

contrib/opencensus-ext-grpc/opencensus/ext/grpc/client_interceptor.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -181,17 +181,9 @@ def intercept_unary_stream(
181181
request_iterator=iter((request,)),
182182
grpc_type=oc_grpc.UNARY_STREAM)
183183

184-
response_it = continuation(
185-
new_details,
186-
next(new_request_iterator))
187-
response_it = grpc_utils.wrap_iter_with_message_events(
188-
request_or_response_iter=response_it,
189-
span=current_span,
190-
message_event_type=time_event.Type.RECEIVED
191-
)
192-
response_it = grpc_utils.wrap_iter_with_end_span(response_it)
193-
194-
return response_it
184+
return grpc_utils.WrappedResponseIterator(
185+
continuation(new_details, next(new_request_iterator)),
186+
current_span)
195187

196188
def intercept_stream_unary(
197189
self, continuation, client_call_details, request_iterator
@@ -225,17 +217,8 @@ def intercept_stream_stream(
225217
request_iterator=request_iterator,
226218
grpc_type=oc_grpc.STREAM_STREAM)
227219

228-
response_it = continuation(
229-
new_details,
230-
new_request_iterator)
231-
response_it = grpc_utils.wrap_iter_with_message_events(
232-
request_or_response_iter=response_it,
233-
span=current_span,
234-
message_event_type=time_event.Type.RECEIVED
235-
)
236-
response_it = grpc_utils.wrap_iter_with_end_span(response_it)
237-
238-
return response_it
220+
return grpc_utils.WrappedResponseIterator(
221+
continuation(new_details, new_request_iterator), current_span)
239222

240223

241224
def _get_span_name(client_call_details):

contrib/opencensus-ext-grpc/opencensus/ext/grpc/utils.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from datetime import datetime
22

3-
from opencensus.trace import time_event
3+
from grpc.framework.foundation import future
4+
from grpc.framework.interfaces.face import face
45
from opencensus.trace import execution_context
6+
from opencensus.trace import time_event
57

68

79
def add_message_event(proto_message, span, message_event_type, message_id=1):
@@ -43,3 +45,93 @@ def wrap_iter_with_end_span(response_iter):
4345
for response in response_iter:
4446
yield response
4547
execution_context.get_opencensus_tracer().end_span()
48+
49+
50+
class WrappedResponseIterator(future.Future, face.Call):
51+
"""Wraps the rpc response iterator.
52+
53+
The grpc.StreamStreamClientInterceptor abstract class states stream
54+
interceptor method should return an object that's both a call (implementing
55+
the response iterator) and a future. Thus, this class is a thin wrapper
56+
around the rpc response to provide the opencensus extension.
57+
58+
:type iterator: (future.Future, face.Call)
59+
:param iterator: rpc response iterator
60+
61+
:type span: opencensus.trace.Span
62+
:param span: rpc span
63+
"""
64+
def __init__(self, iterator, span):
65+
self._iterator = iterator
66+
self._span = span
67+
68+
self._messages_received = 0
69+
70+
def add_done_callback(self, fn):
71+
self._iterator.add_done_callback(lambda ignored_callback: fn(self))
72+
73+
def __iter__(self):
74+
return self
75+
76+
def __next__(self):
77+
try:
78+
message = next(self._iterator)
79+
except StopIteration:
80+
execution_context.get_opencensus_tracer().end_span()
81+
raise
82+
83+
self._messages_received += 1
84+
add_message_event(
85+
proto_message=message,
86+
span=self._span,
87+
message_event_type=time_event.Type.RECEIVED,
88+
message_id=self._messages_received)
89+
return message
90+
91+
def next(self):
92+
return self.__next__()
93+
94+
def cancel(self):
95+
return self._iterator.cancel()
96+
97+
def is_active(self):
98+
return self._iterator.is_active()
99+
100+
def cancelled(self):
101+
raise NotImplementedError() # pragma: NO COVER
102+
103+
def running(self):
104+
raise NotImplementedError() # pragma: NO COVER
105+
106+
def done(self):
107+
raise NotImplementedError() # pragma: NO COVER
108+
109+
def result(self, timeout=None):
110+
raise NotImplementedError() # pragma: NO COVER
111+
112+
def exception(self, timeout=None):
113+
raise NotImplementedError() # pragma: NO COVER
114+
115+
def traceback(self, timeout=None):
116+
raise NotImplementedError() # pragma: NO COVER
117+
118+
def initial_metadata(self):
119+
raise NotImplementedError() # pragma: NO COVER
120+
121+
def terminal_metadata(self):
122+
raise NotImplementedError() # pragma: NO COVER
123+
124+
def code(self):
125+
raise NotImplementedError() # pragma: NO COVER
126+
127+
def details(self):
128+
raise NotImplementedError() # pragma: NO COVER
129+
130+
def time_remaining(self):
131+
raise NotImplementedError() # pragma: NO COVER
132+
133+
def add_abortion_callback(self, abortion_callback):
134+
raise NotImplementedError() # pragma: NO COVER
135+
136+
def protocol_context(self):
137+
raise NotImplementedError() # pragma: NO COVER

contrib/opencensus-ext-grpc/tests/test_client_interceptor.py

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import collections
16+
import mock
17+
import threading
1518
import unittest
1619

17-
import mock
20+
from google.api_core import bidi
21+
from google.protobuf import proto_builder
22+
from grpc.framework.foundation import logging_pool
23+
import grpc
1824

1925
from opencensus.ext.grpc import client_interceptor
2026
from opencensus.trace import execution_context
@@ -282,6 +288,108 @@ def test_intercept_stream_stream_not_trace(self):
282288
self.assertFalse(mock_tracer.end_span.called)
283289

284290

291+
class TestGrpcInterface(unittest.TestCase):
292+
293+
def setUp(self):
294+
self._server = _start_server()
295+
self._port = self._server.add_insecure_port('[::]:0')
296+
self._channel = grpc.insecure_channel('localhost:%d' % self._port)
297+
298+
def tearDown(self):
299+
self._server.stop(None)
300+
self._channel.close()
301+
302+
def _intercepted_channel(self, tracer=None):
303+
return grpc.intercept_channel(
304+
self._channel,
305+
client_interceptor.OpenCensusClientInterceptor(tracer=tracer))
306+
307+
def test_bidi_rpc_stream(self):
308+
event = threading.Event()
309+
310+
def _helper(request_iterator, context):
311+
counter = 0
312+
for _ in request_iterator:
313+
counter += 1
314+
if counter == 2:
315+
event.set()
316+
yield
317+
318+
self._server.add_generic_rpc_handlers(
319+
(StreamStreamRpcHandler(_helper),))
320+
self._server.start()
321+
322+
rpc = bidi.BidiRpc(
323+
self._intercepted_channel().stream_stream(
324+
'', EmptyMessage.SerializeToString),
325+
initial_request=EmptyMessage())
326+
done_event = threading.Event()
327+
rpc.add_done_callback(lambda _: done_event.set())
328+
329+
rpc.open()
330+
rpc.send(EmptyMessage())
331+
self.assertTrue(event.wait(timeout=1))
332+
rpc.close()
333+
self.assertTrue(done_event.wait(timeout=1))
334+
335+
@mock.patch('opencensus.trace.execution_context.get_opencensus_tracer')
336+
def test_close_span_on_done(self, mock_tracer):
337+
def _helper(request_iterator, context):
338+
for _ in request_iterator:
339+
yield EmptyMessage()
340+
yield
341+
342+
self._server.add_generic_rpc_handlers(
343+
(StreamStreamRpcHandler(_helper), ))
344+
self._server.start()
345+
346+
mock_tracer.return_value = mock_tracer
347+
rpc = self._intercepted_channel(NoopTracer()).stream_stream(
348+
method='',
349+
request_serializer=EmptyMessage.SerializeToString,
350+
response_deserializer=EmptyMessage.FromString)(iter(
351+
[EmptyMessage()]))
352+
353+
for resp in rpc:
354+
pass
355+
356+
self.assertEqual(mock_tracer.end_span.call_count, 1)
357+
358+
359+
EmptyMessage = proto_builder.MakeSimpleProtoClass(
360+
collections.OrderedDict([]),
361+
full_name='tests.test_client_interceptor.EmptyMessage')
362+
363+
364+
def _start_server():
365+
"""Starts an insecure grpc server."""
366+
return grpc.server(logging_pool.pool(max_workers=1),
367+
options=(('grpc.so_reuseport', 0), ))
368+
369+
370+
class StreamStreamMethodHandler(grpc.RpcMethodHandler):
371+
372+
def __init__(self, stream_handler_func):
373+
self.request_streaming = True
374+
self.response_streaming = True
375+
self.request_deserializer = None
376+
self.response_serializer = EmptyMessage.SerializeToString
377+
self.unary_unary = None
378+
self.unary_stream = None
379+
self.stream_unary = None
380+
self.stream_stream = stream_handler_func
381+
382+
383+
class StreamStreamRpcHandler(grpc.GenericRpcHandler):
384+
385+
def __init__(self, stream_stream_handler):
386+
self._stream_stream_handler = stream_stream_handler
387+
388+
def service(self, handler_call_details):
389+
resp = StreamStreamMethodHandler(self._stream_stream_handler)
390+
return resp
391+
392+
285393
class MockTracer(object):
286394
def __init__(self, current_span):
287395
self.current_span = current_span

contrib/opencensus-ext-grpc/tests/test_server_interceptor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.rpc import code_pb2
1919

2020
from opencensus.ext.grpc import server_interceptor
21+
from opencensus.ext.grpc import utils as grpc_utils
2122
from opencensus.trace import execution_context
2223
from opencensus.trace import span as span_module
2324

@@ -149,6 +150,17 @@ def test_intercept_handler_exception(self):
149150
self.assertEqual(current_span.status.code, code_pb2.UNKNOWN)
150151
self.assertEqual(current_span.status.message, 'Test')
151152

153+
@mock.patch(
154+
'opencensus.trace.execution_context.get_opencensus_tracer')
155+
def test_resp_streaming_span_end(self, mock_tracer):
156+
mock_tracer.return_value = mock_tracer
157+
158+
it = grpc_utils.wrap_iter_with_end_span(iter(['test']))
159+
for i in it:
160+
pass
161+
162+
self.assertEqual(mock_tracer.end_span.call_count, 1)
163+
152164
def test__wrap_rpc_behavior_none(self):
153165
new_handler = server_interceptor._wrap_rpc_behavior(None, lambda: None)
154166
self.assertEqual(new_handler, None)

contrib/opencensus-ext-ocagent/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## Unreleased
4+
- Add stats exporter
45

56
## 0.2.0
67
Released 2019-04-08

contrib/opencensus-ext-ocagent/README.rst

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
OpenCensus OC-Agent Trace Exporter
1+
OpenCensus OC-Agent Exporter
22
============================================================================
33

44
|pypi|
@@ -16,6 +16,13 @@ Installation
1616
Usage
1717
-----
1818

19+
Stats
20+
~~~~~
21+
1922
.. code:: python
2023
21-
# TBD
24+
from opencensus.ext.ocagent import stats_exporter as ocagent_stats_exporter
25+
26+
ocagent_stats_exporter.new_stats_exporter(
27+
service_name='service_name',
28+
endpoint='localhost:55678')

0 commit comments

Comments
 (0)