4848 pass
4949
5050
51- def wait_until_healthy (channel : grpc .Channel , timeout : float ):
51+ def _create_geometry_channel (target : str ) -> grpc .Channel :
52+ """Create a Geometry service gRPC channel.
53+
54+ Notes
55+ -----
56+ Contains specific options for the Geometry service.
57+
58+ Parameters
59+ ----------
60+ target : str
61+ Target of the channel. This is usually a string in the form of
62+ ``host:port``.
63+
64+ Returns
65+ -------
66+ ~grpc.Channel
67+ gRPC channel for the Geometry service.
68+ """
69+ return grpc .insecure_channel (
70+ target ,
71+ options = [
72+ ("grpc.max_receive_message_length" , pygeom_defaults .MAX_MESSAGE_LENGTH ),
73+ ("grpc.max_send_message_length" , pygeom_defaults .MAX_MESSAGE_LENGTH ),
74+ ],
75+ )
76+
77+
78+ def wait_until_healthy (channel : grpc .Channel | str , timeout : float ) -> grpc .Channel :
5279 """Wait until a channel is healthy before returning.
5380
5481 Parameters
5582 ----------
56- channel : ~grpc.Channel
57- Channel that must be established and healthy.
83+ channel : ~grpc.Channel | str
84+ Channel that must be established and healthy. The target can also be
85+ passed in. In that case, a channel is created using the default insecure channel.
5886 timeout : float
5987 Timeout in seconds. Attempts are made with the following backoff strategy:
6088
@@ -70,14 +98,28 @@ def wait_until_healthy(channel: grpc.Channel, timeout: float):
7098 ------
7199 TimeoutError
72100 Raised when the total elapsed time exceeds the value for the ``timeout`` parameter.
101+
102+ Returns
103+ -------
104+ grpc.Channel
105+ The channel that was passed in. This channel is guaranteed to be healthy.
106+ If a string was passed in, a channel is created using the default insecure channel.
73107 """
74108 t_max = time .time () + timeout
75- health_stub = health_pb2_grpc .HealthStub (channel )
76- request = health_pb2 .HealthCheckRequest (service = "" )
77-
78109 t_out = 0.1
110+
111+ # If the channel is a string, create a channel using the default insecure channel
112+ channel_creation_required = True if isinstance (channel , str ) else False
113+ tmp_channel = None
114+
79115 while time .time () < t_max :
80116 try :
117+ tmp_channel = (
118+ _create_geometry_channel (channel ) if channel_creation_required else channel
119+ )
120+ health_stub = health_pb2_grpc .HealthStub (tmp_channel )
121+ request = health_pb2 .HealthCheckRequest (service = "" )
122+
81123 out = health_stub .Check (request , timeout = t_out )
82124 if out .status is health_pb2 .HealthCheckResponse .SERVING :
83125 break
@@ -91,11 +133,13 @@ def wait_until_healthy(channel: grpc.Channel, timeout: float):
91133 t_out = t_max - t_now
92134 continue
93135 else :
94- target_str = channel ._channel .target ().decode ()
136+ target_str = tmp_channel ._channel .target ().decode ()
95137 raise TimeoutError (
96138 f"Channel health check to target '{ target_str } ' timed out after { timeout } seconds."
97139 )
98140
141+ return tmp_channel
142+
99143
100144class GrpcClient :
101145 """Wraps the gRPC connection for the Geometry service.
@@ -155,23 +199,15 @@ def __init__(
155199 self ._remote_instance = remote_instance
156200 self ._docker_instance = docker_instance
157201 self ._product_instance = product_instance
202+ self ._grpc_health_timeout = timeout
203+
158204 if channel :
159205 # Used for PyPIM when directly providing a channel
160- self ._channel = channel
161206 self ._target = str (channel )
207+ self ._channel = wait_until_healthy (channel , self ._grpc_health_timeout )
162208 else :
163209 self ._target = f"{ host } :{ port } "
164- self ._channel = grpc .insecure_channel (
165- self ._target ,
166- options = [
167- ("grpc.max_receive_message_length" , pygeom_defaults .MAX_MESSAGE_LENGTH ),
168- ("grpc.max_send_message_length" , pygeom_defaults .MAX_MESSAGE_LENGTH ),
169- ],
170- )
171-
172- # do not finish initialization until channel is healthy
173- self ._grpc_health_timeout = timeout
174- wait_until_healthy (self ._channel , self ._grpc_health_timeout )
210+ self ._channel = wait_until_healthy (self ._target , self ._grpc_health_timeout )
175211
176212 # Initialize the gRPC services
177213 self ._services = _GRPCServices (self ._channel , version = proto_version )
@@ -263,9 +299,8 @@ def healthy(self) -> bool:
263299 if self ._closed :
264300 return False
265301 try :
266- wait_until_healthy (self ._channel , self ._grpc_health_timeout )
267- return True
268- except TimeoutError : # pragma: no cover
302+ return self .services .admin .get_service_status ().get ("healthy" )
303+ except Exception : # pragma: no cover
269304 return False
270305
271306 def __repr__ (self ) -> str :
0 commit comments