Skip to content

Commit a4d6a60

Browse files
committed
More functional fixes to worker
Signed-off-by: Samuel Monson <[email protected]>
1 parent d010537 commit a4d6a60

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

src/guidellm/scheduler/worker.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from guidellm.scheduler.result import (
3131
MPQueues,
3232
SchedulerRequestInfo,
33+
WorkerProcessRequestTime,
3334
WorkerProcessResult,
3435
)
3536
from guidellm.scheduler.types import RequestT, ResponseT
@@ -107,6 +108,11 @@ async def resolve(
107108
"""
108109
...
109110

111+
async def get_request_time(
112+
self, times_queue: Queue[WorkerProcessRequestTime]
113+
) -> WorkerProcessRequestTime:
114+
return await asyncio.to_thread(times_queue.get) # type: ignore[attr-defined]
115+
110116
async def send_result(
111117
self,
112118
results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]],
@@ -170,7 +176,7 @@ async def resolve_scheduler_request(
170176
asyncio.create_task(self.send_result(results_queue, result))
171177

172178
request_session.push_response(response)
173-
return request
179+
return request_session
174180

175181
def process_loop_asynchronous(
176182
self,
@@ -194,7 +200,7 @@ async def _process_runner():
194200
else queues.requests.get_nowait()
195201
)
196202
dequeued_time = time.time()
197-
request_times = queues.times.get()
203+
request_times = await self.get_request_time(queues.times)
198204
except (QueueEmpty, IndexError):
199205
# Requeue the session if we don't have a next time yet
200206
if request_session is not None:
@@ -215,7 +221,7 @@ async def wait_then_requeue(
215221
# Release the lock with the session on top of the stack
216222
lock.release()
217223

218-
async def _request_callback(
224+
def _request_callback(
219225
session_future: asyncio.Future[RequestSession[RequestT, ResponseT]],
220226
):
221227
# If we are prioritizing sessions, hold
@@ -224,7 +230,7 @@ async def _request_callback(
224230
if not prioritize_sessions:
225231
lock.release()
226232

227-
session = await session_future
233+
session = session_future.result()
228234
if not session.complete:
229235
asyncio.create_task(wait_then_requeue(session))
230236
elif prioritize_sessions:

0 commit comments

Comments
 (0)