Skip to content

Commit 6266d8b

Browse files
mergify[bot]majorgreysbrettlangdon
authored
fix(grpc): ensure callbacks in streaming tests (#2428) (#2441)
(cherry picked from commit bdc8152) Co-authored-by: Tahir H. Butt <[email protected]> Co-authored-by: Brett Langdon <[email protected]>
1 parent 9a2438b commit 6266d8b

File tree

1 file changed

+39
-6
lines changed

1 file changed

+39
-6
lines changed

tests/contrib/grpc/test_grpc.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,18 @@ def test_analytics_without_rate(self):
238238
assert spans[1].get_metric(ANALYTICS_SAMPLE_RATE_KEY) == 1.0
239239

240240
def test_server_stream(self):
241+
# use an event to signal when the callbacks have been called from the response
242+
callback_called = threading.Event()
243+
244+
def callback(response):
245+
callback_called.set()
246+
241247
with grpc.insecure_channel("localhost:%d" % (_GRPC_PORT)) as channel:
242248
stub = HelloStub(channel)
243249
responses_iterator = stub.SayHelloTwice(HelloRequest(name="test"))
250+
responses_iterator.add_done_callback(callback)
244251
assert len(list(responses_iterator)) == 2
252+
callback_called.wait(timeout=1)
245253

246254
spans = self.get_spans_with_sync_and_assert(size=2)
247255
server_span, client_span = spans
@@ -282,12 +290,20 @@ def test_client_stream(self):
282290
self._check_server_span(server_span, "grpc-server", "SayHelloLast", "client_streaming")
283291

284292
def test_bidi_stream(self):
293+
# use an event to signal when the callbacks have been called from the response
294+
callback_called = threading.Event()
295+
296+
def callback(response):
297+
callback_called.set()
298+
285299
requests_iterator = iter(HelloRequest(name=name) for name in ["first", "second", "third", "fourth", "fifth"])
286300

287301
with grpc.insecure_channel("localhost:%d" % (_GRPC_PORT)) as channel:
288302
stub = HelloStub(channel)
289-
responses = stub.SayHelloRepeatedly(requests_iterator)
290-
messages = [r.message for r in responses]
303+
responses_iterator = stub.SayHelloRepeatedly(requests_iterator)
304+
responses_iterator.add_done_callback(callback)
305+
messages = [r.message for r in responses_iterator]
306+
callback_called.wait(timeout=1)
291307
assert list(messages) == ["first;second", "third;fourth", "fifth"]
292308

293309
spans = self.get_spans_with_sync_and_assert(size=2)
@@ -353,6 +369,12 @@ def test_custom_interceptor_exception(self):
353369
assert server_span.get_tag(errors.ERROR_STACK) is None
354370

355371
def test_client_cancellation(self):
372+
# use an event to signal when the callbacks have been called from the response
373+
callback_called = threading.Event()
374+
375+
def callback(response):
376+
callback_called.set()
377+
356378
# unpatch and restart server since we are only testing here caller cancellation
357379
self._stop_server()
358380
_unpatch_server()
@@ -365,9 +387,11 @@ def test_client_cancellation(self):
365387
with grpc.insecure_channel("localhost:%d" % (_GRPC_PORT)) as channel:
366388
with self.assertRaises(grpc.RpcError):
367389
stub = HelloStub(channel)
368-
responses = stub.SayHelloRepeatedly(requests_iterator)
369-
responses.cancel()
370-
next(responses)
390+
responses_iterator = stub.SayHelloRepeatedly(requests_iterator)
391+
responses_iterator.add_done_callback(callback)
392+
responses_iterator.cancel()
393+
next(responses_iterator)
394+
callback_called.wait(timeout=1)
371395

372396
spans = self.get_spans_with_sync_and_assert(size=1)
373397
client_span = spans[0]
@@ -426,10 +450,19 @@ def test_client_stream_exception(self):
426450
assert "grpc.StatusCode.INVALID_ARGUMENT" in server_span.get_tag(errors.ERROR_STACK)
427451

428452
def test_server_stream_exception(self):
453+
# use an event to signal when the callbacks have been called from the response
454+
callback_called = threading.Event()
455+
456+
def callback(response):
457+
callback_called.set()
458+
429459
with grpc.secure_channel("localhost:%d" % (_GRPC_PORT), credentials=grpc.ChannelCredentials(None)) as channel:
430460
stub = HelloStub(channel)
431461
with self.assertRaises(grpc.RpcError):
432-
list(stub.SayHelloTwice(HelloRequest(name="exception")))
462+
responses_iterator = stub.SayHelloTwice(HelloRequest(name="exception"))
463+
responses_iterator.add_done_callback(callback)
464+
list(responses_iterator)
465+
callback_called.wait(timeout=1)
433466

434467
spans = self.get_spans_with_sync_and_assert(size=2)
435468
server_span, client_span = spans

0 commit comments

Comments
 (0)