Skip to content

Commit 9993d2b

Browse files
authored
Merge pull request grpc#22860 from lidizheng/fix-metadata-flags-test-flake
Fix the metadata flags test flake
2 parents 3c82944 + e2a41a1 commit 9993d2b

File tree

1 file changed

+51
-44
lines changed

1 file changed

+51
-44
lines changed

src/python/grpcio_tests/tests/unit/_metadata_flags_test.py

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import weakref
1818
import unittest
1919
import threading
20+
import logging
2021
import socket
2122
from six.moves import queue
2223

@@ -25,7 +26,7 @@
2526
from tests.unit import test_common
2627
from tests.unit.framework.common import test_constants
2728
import tests.unit.framework.common
28-
from tests.unit.framework.common import bound_socket
29+
from tests.unit.framework.common import get_socket
2930

3031
_UNARY_UNARY = '/test/UnaryUnary'
3132
_UNARY_STREAM = '/test/UnaryStream'
@@ -101,8 +102,9 @@ def service(self, handler_call_details):
101102

102103
def create_dummy_channel():
103104
"""Creating dummy channels is a workaround for retries"""
104-
with bound_socket() as (host, port):
105-
return grpc.insecure_channel('{}:{}'.format(host, port))
105+
host, port, sock = get_socket()
106+
sock.close()
107+
return grpc.insecure_channel('{}:{}'.format(host, port))
106108

107109

108110
def perform_unary_unary_call(channel, wait_for_ready=None):
@@ -203,51 +205,56 @@ def test_call_wait_for_ready_enabled(self):
203205
# main thread. So, it need another method to store the
204206
# exceptions and raise them again in main thread.
205207
unhandled_exceptions = queue.Queue()
206-
with bound_socket(listen=False) as (host, port):
207-
addr = '{}:{}'.format(host, port)
208-
wg = test_common.WaitGroup(len(_ALL_CALL_CASES))
209208

210-
def wait_for_transient_failure(channel_connectivity):
211-
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
209+
# We just need an unused TCP port
210+
host, port, sock = get_socket()
211+
sock.close()
212+
213+
addr = '{}:{}'.format(host, port)
214+
wg = test_common.WaitGroup(len(_ALL_CALL_CASES))
215+
216+
def wait_for_transient_failure(channel_connectivity):
217+
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
218+
wg.done()
219+
220+
def test_call(perform_call):
221+
with grpc.insecure_channel(addr) as channel:
222+
try:
223+
channel.subscribe(wait_for_transient_failure)
224+
perform_call(channel, wait_for_ready=True)
225+
except BaseException as e: # pylint: disable=broad-except
226+
# If the call failed, the thread would be destroyed. The
227+
# channel object can be collected before calling the
228+
# callback, which will result in a deadlock.
212229
wg.done()
230+
unhandled_exceptions.put(e, True)
213231

214-
def test_call(perform_call):
215-
with grpc.insecure_channel(addr) as channel:
216-
try:
217-
channel.subscribe(wait_for_transient_failure)
218-
perform_call(channel, wait_for_ready=True)
219-
except BaseException as e: # pylint: disable=broad-except
220-
# If the call failed, the thread would be destroyed. The
221-
# channel object can be collected before calling the
222-
# callback, which will result in a deadlock.
223-
wg.done()
224-
unhandled_exceptions.put(e, True)
225-
226-
test_threads = []
227-
for perform_call in _ALL_CALL_CASES:
228-
test_thread = threading.Thread(target=test_call,
229-
args=(perform_call,))
230-
test_thread.exception = None
231-
test_thread.start()
232-
test_threads.append(test_thread)
233-
234-
# Start the server after the connections are waiting
235-
wg.wait()
236-
server = test_common.test_server(reuse_port=True)
237-
server.add_generic_rpc_handlers(
238-
(_GenericHandler(weakref.proxy(self)),))
239-
server.add_insecure_port(addr)
240-
server.start()
241-
242-
for test_thread in test_threads:
243-
test_thread.join()
244-
245-
# Stop the server to make test end properly
246-
server.stop(0)
247-
248-
if not unhandled_exceptions.empty():
249-
raise unhandled_exceptions.get(True)
232+
test_threads = []
233+
for perform_call in _ALL_CALL_CASES:
234+
test_thread = threading.Thread(target=test_call,
235+
args=(perform_call,))
236+
test_thread.daemon = True
237+
test_thread.exception = None
238+
test_thread.start()
239+
test_threads.append(test_thread)
240+
241+
# Start the server after the connections are waiting
242+
wg.wait()
243+
server = test_common.test_server(reuse_port=True)
244+
server.add_generic_rpc_handlers((_GenericHandler(weakref.proxy(self)),))
245+
server.add_insecure_port(addr)
246+
server.start()
247+
248+
for test_thread in test_threads:
249+
test_thread.join()
250+
251+
# Stop the server to make test end properly
252+
server.stop(0)
253+
254+
if not unhandled_exceptions.empty():
255+
raise unhandled_exceptions.get(True)
250256

251257

252258
if __name__ == '__main__':
259+
logging.basicConfig(level=logging.DEBUG)
253260
unittest.main(verbosity=2)

0 commit comments

Comments
 (0)