@@ -134,8 +134,12 @@ def serialize_scaler_type(
134134 return value .value if value is not None else None
135135
136136 @field_serializer ("instanceIds" )
137- def serialize_instance_ids (self , value : List [CpuInstanceType ]) -> List [str ]:
137+ def serialize_instance_ids (
138+ self , value : Optional [List [CpuInstanceType ]]
139+ ) -> Optional [List [str ]]:
138140 """Convert CpuInstanceType enums to strings."""
141+ if value is None :
142+ return None
139143 return [item .value if hasattr (item , "value" ) else str (item ) for item in value ]
140144
141145 @field_validator ("gpus" )
@@ -247,62 +251,6 @@ async def deploy(self) -> "DeployableResource":
247251 log .error (f"{ self } failed to deploy: { e } " )
248252 raise
249253
250- async def is_ready_for_requests (self , give_up_threshold = 10 ) -> bool :
251- """
252- Asynchronously checks if the serverless resource is ready to handle
253- requests by polling its health endpoint.
254-
255- Args:
256- give_up_threshold (int, optional): The maximum number of polling
257- attempts before giving up and raising an error. Defaults to 10.
258-
259- Returns:
260- bool: True if the serverless resource is ready for requests.
261-
262- Raises:
263- ValueError: If the serverless resource is not deployed.
264- RuntimeError: If the health status is THROTTLED, UNHEALTHY, or UNKNOWN
265- after exceeding the give_up_threshold.
266- """
267- if not self .is_deployed ():
268- raise ValueError ("Serverless is not deployed" )
269-
270- log .debug (f"{ self } | API /health" )
271-
272- current_pace = 0
273- attempt = 0
274-
275- # Poll for health status
276- while True :
277- await asyncio .sleep (current_pace )
278-
279- health = await asyncio .to_thread (self .endpoint .health )
280- health = ServerlessHealth (** health )
281-
282- if health .is_ready :
283- return True
284- else :
285- # nothing changed, increase the gap
286- attempt += 1
287- indicator = "." * (attempt // 2 ) if attempt % 2 == 0 else ""
288- if indicator :
289- log .info (f"{ self } | { indicator } " )
290-
291- status = health .workers .status
292- if status in [
293- Status .THROTTLED ,
294- Status .UNHEALTHY ,
295- Status .UNKNOWN ,
296- ]:
297- log .debug (f"{ self } | Health { status .value } " )
298-
299- if attempt >= give_up_threshold :
300- # Give up
301- raise RuntimeError (f"Health { status .value } " )
302-
303- # Adjust polling pace appropriately
304- current_pace = get_backoff_delay (attempt )
305-
306254 async def run_sync (self , payload : Dict [str , Any ]) -> "JobOutput" :
307255 """
308256 Executes a serverless endpoint request with the payload.
@@ -319,9 +267,6 @@ def _fetch_job():
319267 try :
320268 # log.debug(f"[{log_group}] Payload: {payload}")
321269
322- # Poll until requests can be sent
323- await self .is_ready_for_requests ()
324-
325270 log .info (f"{ self } | API /run_sync" )
326271 response = await asyncio .to_thread (_fetch_job )
327272 return JobOutput (** response )
@@ -346,9 +291,6 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput":
346291 try :
347292 # log.debug(f"[{self}] Payload: {payload}")
348293
349- # Poll until requests can be sent
350- await self .is_ready_for_requests ()
351-
352294 # Create a job using the endpoint
353295 log .info (f"{ self } | API /run" )
354296 job = await asyncio .to_thread (self .endpoint .run , request_input = payload )
@@ -366,9 +308,8 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput":
366308 while True :
367309 await asyncio .sleep (current_pace )
368310
369- if await self .is_ready_for_requests ():
370- # Check job status
371- job_status = await asyncio .to_thread (job .status )
311+ # Check job status
312+ job_status = await asyncio .to_thread (job .status )
372313
373314 if last_status == job_status :
374315 # nothing changed, increase the gap
0 commit comments