Skip to content

Commit d7900f9

Browse files
author
Aidan Jensen
committed
Fix tests
Signed-off-by: Aidan Jensen <[email protected]>
1 parent 9e59ba6 commit d7900f9

File tree

3 files changed

+53
-17
lines changed

3 files changed

+53
-17
lines changed

instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,18 +217,17 @@ def _intercept_server_stream(
217217
stream = invoker(request_or_iterator, metadata)
218218

219219
def done_callback(future, span_):
220-
logger.exception("done_callback")
221220
try:
222221
future.result()
223222
except grpc.FutureCancelledError:
224223
span_.set_status(Status(StatusCode.OK))
225224
span_.set_attribute(
226-
SpanAttributes.RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0]
225+
RPC_GRPC_STATUS_CODE, grpc.StatusCode.CANCELLED.value[0]
227226
)
228227
except grpc.RpcError as err:
229228
span_.set_status(Status(StatusCode.ERROR))
230229
span_.set_attribute(
231-
SpanAttributes.RPC_GRPC_STATUS_CODE, err.code().value[0]
230+
RPC_GRPC_STATUS_CODE, err.code().value[0]
232231
)
233232
finally:
234233
span_.end()

instrumentation/opentelemetry-instrumentation-grpc/tests/_client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,18 @@ def server_streaming_method(stub, error=False, serialize=True):
5757
return response_iterator
5858

5959

60-
def bidirectional_streaming_method(stub, error=False):
60+
def bidirectional_streaming_method(stub, error=False, serialize=True):
6161
def request_messages():
6262
for _ in range(5):
6363
request = Request(
6464
client_id=CLIENT_ID, request_data="error" if error else "data"
6565
)
6666
yield request
6767

68-
response_iterator = stub.ServerStreamingMethod(
69-
request, metadata=(("key", "value"),)
68+
response_iterator = stub.BidirectionalStreamingMethod(
69+
request_messages(), metadata=(("key", "value"),)
7070
)
7171

72-
list(response_iterator)
72+
if serialize:
73+
list(response_iterator)
74+
return response_iterator

instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
# limitations under the License.
1414
# pylint:disable=cyclic-import
1515
import logging
16-
from time import sleep
1716

17+
import threading
18+
import time
1819
from unittest import mock
1920

2021
import grpc
@@ -174,16 +175,15 @@ def test_unary_stream(self):
174175
)
175176

176177
def test_unary_stream_can_be_cancel(self):
178+
done = threading.Event()
177179
responses = server_streaming_method(self._stub, serialize=False)
180+
responses.add_done_callback(lambda: done.set())
178181
for i, _ in enumerate(responses):
179182
if i == 1:
180183
responses.cancel()
181184
break
182185
self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED)
183-
# self.server.stop(None)
184-
# self.channel.close()
185-
logging.exception("Getting spans")
186-
# sleep(10)
186+
done.wait(5)
187187
spans = self.memory_exporter.get_finished_spans()
188188
self.assertEqual(len(spans), 1)
189189
span = spans[0]
@@ -192,17 +192,17 @@ def test_unary_stream_can_be_cancel(self):
192192
self.assertIs(span.kind, trace.SpanKind.CLIENT)
193193

194194
# Check version and name in span's instrumentation info
195-
self.assertEqualSpanInstrumentationInfo(
195+
self.assertEqualSpanInstrumentationScope(
196196
span, opentelemetry.instrumentation.grpc
197197
)
198198

199199
self.assertSpanHasAttributes(
200200
span,
201201
{
202-
SpanAttributes.RPC_METHOD: "ServerStreamingMethod",
203-
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
204-
SpanAttributes.RPC_SYSTEM: "grpc",
205-
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[
202+
RPC_METHOD: "ServerStreamingMethod",
203+
RPC_SERVICE: "GRPCTestServer",
204+
RPC_SYSTEM: "grpc",
205+
RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[
206206
0
207207
],
208208
},
@@ -258,6 +258,41 @@ def test_stream_stream(self):
258258
},
259259
)
260260

261+
def test_stream_stream_can_be_cancel(self):
262+
done = threading.Event()
263+
responses = bidirectional_streaming_method(self._stub, serialize=False)
264+
responses.add_done_callback(lambda: done.set())
265+
for i, _ in enumerate(responses):
266+
if i == 1:
267+
responses.cancel()
268+
break
269+
self.assertEqual(responses.code(), grpc.StatusCode.CANCELLED)
270+
done.wait(5)
271+
spans = self.memory_exporter.get_finished_spans()
272+
self.assertEqual(len(spans), 1)
273+
span = spans[0]
274+
275+
self.assertEqual(span.name, "/GRPCTestServer/BidirectionalStreamingMethod")
276+
self.assertIs(span.kind, trace.SpanKind.CLIENT)
277+
278+
# Check version and name in span's instrumentation info
279+
self.assertEqualSpanInstrumentationScope(
280+
span, opentelemetry.instrumentation.grpc
281+
)
282+
283+
self.assertSpanHasAttributes(
284+
span,
285+
{
286+
RPC_METHOD: "BidirectionalStreamingMethod",
287+
RPC_SERVICE: "GRPCTestServer",
288+
RPC_SYSTEM: "grpc",
289+
RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[
290+
0
291+
],
292+
},
293+
)
294+
295+
261296
def test_error_simple(self):
262297
with self.assertRaises(grpc.RpcError):
263298
simple_method(self._stub, error=True)

0 commit comments

Comments
 (0)