@@ -350,39 +350,24 @@ async def _process_next_request(self, target_start: float):
350350 :param target_start: Unix timestamp when request should begin processing
351351 """
352352 request : RequestT | MultiTurnRequestT [RequestT ] | None = None
353- request_info : RequestInfo | None
353+ request_info : RequestInfo | None = None
354354 response : ResponseT | None = None
355355
356356 try :
357357 # Pull request from the queue, update state, and send "pending" update
358- request , request_info = await self .messaging .get ()
359- dequeued_time = time .time () # Ensure accurate dequeue timing
360- if request is None or request_info is None :
361- raise RuntimeError ("Received invalid request or request info" )
362- if isinstance (request , list | tuple ):
363- raise NotImplementedError ("Multi-turn requests are not yet supported" )
364-
365- request_info .timings .dequeued = dequeued_time
366- request_info .scheduler_node_id = self .messaging .worker_index or - 1
367- request_info .timings .targeted_start = target_start
368- self ._send_update ("pending" , response , request , request_info )
369-
370- # Schedule the request
371- current_time = time .time ()
372- request_info .timings .scheduled_at = current_time
373- if target_start > current_time :
374- await asyncio .sleep (target_start - current_time )
375- # Adapt delay so that scheduled at reflects the sleep time
376- request_info .timings .scheduled_at = target_start
377-
378- # Process the request with the backend
379- request_info .timings .resolve_start = time .time ()
380- self ._send_update ("in_progress" , response , request , request_info )
381- async for resp , info in await self .backend .resolve (
358+ request , request_info = await self ._dequeue_next_request (target_start )
359+
360+ # Schedule the request and send "in_progress" update
361+ await self ._schedule_request (request , request_info , target_start )
362+
363+ async for resp , info in self .backend .resolve ( # type: ignore[attr-defined]
382364 request , request_info , None
383365 ):
366+
384367 response = resp
385368 request_info = info
369+ if request_info is None :
370+ raise RuntimeError ("Received invalid request info from backend" )
386371
387372 # Complete the request
388373 request_info .timings .resolve_end = time .time ()
@@ -405,6 +390,39 @@ async def _process_next_request(self, target_start: float):
405390 if request_info is not None :
406391 self .strategy .request_completed (request_info )
407392
393+ async def _dequeue_next_request (
394+ self , target_start : float
395+ ) -> tuple [RequestT , RequestInfo ]:
396+ request , request_info = await self .messaging .get ()
397+ dequeued_time = time .time () # Ensure accurate dequeue timing
398+ if request is None or request_info is None :
399+ raise RuntimeError ("Received invalid request or request info" )
400+ if isinstance (request , list | tuple ):
401+ raise NotImplementedError ("Multi-turn requests are not yet supported" )
402+
403+ request_info .timings .dequeued = dequeued_time
404+ request_info .scheduler_node_id = self .messaging .worker_index or - 1
405+ request_info .timings .targeted_start = target_start
406+ self ._send_update ("pending" , None , request , request_info )
407+ return request , request_info
408+
409+ async def _schedule_request (
410+ self ,
411+ request : RequestT ,
412+ request_info : RequestInfo ,
413+ target_start : float
414+ ):
415+ current_time = time .time ()
416+ request_info .timings .scheduled_at = current_time
417+ if target_start > current_time :
418+ await asyncio .sleep (target_start - current_time )
419+ # Adapt delay so that scheduled at reflects the sleep time
420+ request_info .timings .scheduled_at = target_start
421+
422+ # Process the request with the backend
423+ request_info .timings .resolve_start = time .time ()
424+ self ._send_update ("in_progress" , None , request , request_info )
425+
408426 def _send_update (
409427 self ,
410428 new_status : Literal [
0 commit comments