Skip to content

Commit 55cf718

Browse files
WIP - New cancellation mechanism works
1 parent 35abac7 commit 55cf718

File tree

3 files changed

+30
-32
lines changed

3 files changed

+30
-32
lines changed

src/guidellm/benchmark/aggregator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ def _calculate_error_rate(self) -> float:
638638
total_successful = self.requests_stats.totals.successful.total
639639
total_errored = self.requests_stats.totals.errored.total
640640
total_finished = total_errored + total_successful
641-
return total_errored / total_finished
641+
return total_errored / total_finished if total_finished > 0 else 0
642642

643643
def _compile_results(
644644
self,

src/guidellm/scheduler/scheduler.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,6 @@ async def run(
175175
and not iter_result.request_info.canceled
176176
and self._is_max_error_rate_reached(iter_result.run_info)
177177
):
178-
if shutdown_event is None:
179-
raise RuntimeError(
180-
"We've reached max_error_rate "
181-
"but shutdown_event is corrupt"
182-
)
183178
shutdown_event.set()
184179
max_error_rate_reached = True
185180
logger.info(
@@ -199,7 +194,7 @@ async def run(
199194
run_info=run_info,
200195
)
201196

202-
await self._stop_processes(futures, shutdown_event, requests_queue)
197+
await self._stop_processes(futures, shutdown_event)
203198

204199
def _validate_scheduler_params(
205200
self,
@@ -252,7 +247,6 @@ async def _start_processes(
252247
scheduling_strategy.processes_limit,
253248
scheduling_strategy.processing_requests_limit,
254249
)
255-
num_processes = 1
256250
requests_limit_split = (
257251
scheduling_strategy.processing_requests_limit
258252
// scheduling_strategy.processes_limit
@@ -459,8 +453,8 @@ async def _stop_processes(
459453
self,
460454
futures: list[asyncio.Future],
461455
shutdown_event: MultiprocessingEvent,
462-
requests_queue: multiprocessing.Queue,
463456
):
464-
shutdown_event.set()
457+
if not shutdown_event.is_set():
458+
shutdown_event.set()
465459
logger.debug("Waiting for futures to shut down")
466460
await asyncio.gather(*futures)

src/guidellm/scheduler/worker.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
]
4343

4444

45+
class ShutdownSignalReceived(Exception):
46+
pass
47+
48+
4549
@dataclass
4650
class WorkerProcessRequest(Generic[RequestT]):
4751
request: RequestT
@@ -132,25 +136,24 @@ async def get_request(
132136
# if we simply use asyncio.to_thread(requests_queue.get)
133137
# the cancellation task doesn't propagate because the
134138
# asyncio.to_thread is blocking
135-
return await asyncio.to_thread(requests_queue.get)
136-
# def _get_queue_intermittently():
137-
# while True:
138-
# try:
139-
# return requests_queue.get(timeout=shutdown_poll_interval_seconds)
140-
# except queue.Empty:
141-
# logger.info("Checking shutdown even is set in get_request")
142-
# if shutdown_event.is_set():
143-
# logger.info(f"Shutdown signal received in future {process_id}")
144-
# raise asyncio.CancelledError()
145-
# # return None
146-
#
147-
# try:
148-
# return await asyncio.to_thread(_get_queue_intermittently) # type: ignore[attr-defined]
149-
# except asyncio.CancelledError:
150-
# logger.info("kaki")
151-
# # return None
152-
# raise
153-
# # raise
139+
def _get_queue_intermittently():
140+
while True:
141+
try:
142+
return requests_queue.get(timeout=shutdown_poll_interval_seconds)
143+
except queue.Empty:
144+
logger.info("Checking shutdown even is set in get_request")
145+
if shutdown_event.is_set():
146+
logger.info(f"Shutdown signal received in future {process_id}")
147+
raise asyncio.CancelledError()
148+
# return None
149+
150+
try:
151+
return await asyncio.to_thread(_get_queue_intermittently) # type: ignore[attr-defined]
152+
except asyncio.CancelledError:
153+
logger.info("kaki")
154+
# return None
155+
raise
156+
# raise
154157

155158
async def send_result(
156159
self,
@@ -267,7 +270,7 @@ async def _process_runner():
267270
],
268271
return_when=asyncio.FIRST_EXCEPTION,
269272
)
270-
logger.info("First exception happened")
273+
logger.info(f"First exception happened, done: [{[r.get_name() for r in done]}")
271274

272275
for task in pending:
273276
logger.debug(f"Cancelling task {task.get_name()}")
@@ -281,7 +284,7 @@ async def _process_runner():
281284

282285
for task in done:
283286
task_exception = task.exception()
284-
if not isinstance(task_exception, asyncio.CancelledError):
287+
if not isinstance(task_exception, ShutdownSignalReceived):
285288
raise task_exception
286289
try:
287290
asyncio.run(_process_runner())
@@ -303,7 +306,8 @@ async def _wait_for_shutdown(
303306
await asyncio.sleep(shutdown_poll_interval)
304307

305308
logger.debug("Shutdown signal received")
306-
raise asyncio.CancelledError("Shutdown event set, cancelling process loop.")
309+
raise ShutdownSignalReceived("Shutdown event set, cancelling process loop.")
310+
# raise asyncio.CancelledError("Shutdown event set, cancelling process loop.")
307311

308312
async def _process_synchronous_requests_loop(
309313
self,

0 commit comments

Comments
 (0)