@@ -41,19 +41,23 @@ class GRPCClient(object):
4141 def __init__ (self , launcher , endpoint , reconnect = False ):
4242 """Connect to GRAPE engine at the given :code:`endpoint`."""
4343 # create the gRPC stub
44- options = [
44+ self . _options = [
4545 ("grpc.max_send_message_length" , GS_GRPC_MAX_MESSAGE_LENGTH ),
4646 ("grpc.max_receive_message_length" , GS_GRPC_MAX_MESSAGE_LENGTH ),
4747 ("grpc.max_metadata_size" , GS_GRPC_MAX_MESSAGE_LENGTH ),
4848 ]
49+ self ._endpoint = endpoint
4950 self ._launcher = launcher
5051 self ._grpc_utils = GRPCUtils ()
51- self ._channel = grpc .insecure_channel (endpoint , options = options )
52- self ._stub = coordinator_service_pb2_grpc .CoordinatorServiceStub (self ._channel )
52+ self ._stub = self ._get_stub ()
5353 self ._session_id = None
5454 self ._logs_fetching_thread = None
5555 self ._reconnect = reconnect
5656
57+ def _get_stub (self ):
58+ channel = grpc .insecure_channel (self ._endpoint , options = self ._options )
59+ return coordinator_service_pb2_grpc .CoordinatorServiceStub (channel )
60+
5761 def waiting_service_ready (self , timeout_seconds = 60 ):
5862 begin_time = time .time ()
5963 request = message_pb2 .HeartBeatRequest ()
@@ -76,6 +80,9 @@ def waiting_service_ready(self, timeout_seconds=60):
7680 logger .warning ("Heart beat analytical engine failed, %s" , msg )
7781 if time .time () - begin_time >= timeout_seconds :
7882 raise ConnectionError (f"Connect coordinator timeout, { msg } " )
83+ # refresh the channel incase the server became available
84+ if e .code () == grpc .StatusCode .UNAVAILABLE :
85+ self ._stub = self ._get_stub ()
7986 time .sleep (1 )
8087
8188 def connect (self , cleanup_instance = True , dangling_timeout_seconds = 60 ):
0 commit comments