Skip to content

Commit c2fd813

Browse files
Type fixes, typos & bugfixes
1 parent b502c94 commit c2fd813

File tree

3 files changed

+22
-13
lines changed

3 files changed

+22
-13
lines changed

src/guidellm/__main__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def cli():
179179
type=float,
180180
default=None,
181181
help=(
182-
"The percent of the benchmark (based on max-seconds, max-requets, "
182+
"The percent of the benchmark (based on max-seconds, max-requests, "
183183
"or lenth of dataset) to run as a warmup and not include in the final results. "
184184
"Defaults to None."
185185
),
@@ -188,7 +188,7 @@ def cli():
188188
"--cooldown-percent",
189189
type=float,
190190
help=(
191-
"The percent of the benchmark (based on max-seconds, max-requets, or lenth "
191+
"The percent of the benchmark (based on max-seconds, max-requests, or length "
192192
"of dataset) to run as a cooldown and not include in the final results. "
193193
"Defaults to None."
194194
),

src/guidellm/scheduler/scheduler.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
from collections.abc import AsyncGenerator, Iterable, Iterator
77
from concurrent.futures import ProcessPoolExecutor
8+
from multiprocessing.synchronize import Event as MultiprocessingEvent
89
from typing import (
910
Any,
1011
Generic,
@@ -168,11 +169,15 @@ async def run(
168169
if iter_result.request_info.errored \
169170
and not iter_result.request_info.canceled \
170171
and self._is_max_error_rate_reached(iter_result.run_info):
172+
if shutdown_event is None:
173+
raise RuntimeError("We've reached max_error_rate "
174+
"but shutdown_event is corrupt")
171175
shutdown_event.set()
172176
max_error_rate_reached = True
173177
logger.info(f"Max error rate of "
174178
f"({iter_result.run_info.max_error_rate}) "
175179
f"reached, sending shutdown signal")
180+
logger.info("Itter is not None")
176181
yield iter_result
177182

178183
# yield control to the event loop
@@ -205,8 +210,12 @@ def _validate_scheduler_params(
205210
if max_error_rate is not None and (max_error_rate < 0 or max_error_rate > 1):
206211
raise ValueError(f"Invalid max_error_rate: {max_error_rate}")
207212

208-
def _is_max_error_rate_reached(self, run_info) -> bool:
213+
def _is_max_error_rate_reached(self, run_info: SchedulerRunInfo) -> bool:
214+
if run_info.max_error_rate is None:
215+
return False
209216
current_error_rate = run_info.errored_requests / run_info.end_number
217+
logger.info(f"Current error rate {current_error_rate} "
218+
f"i.e total_finished [success / error] / max total possible")
210219
return run_info.max_error_rate < current_error_rate
211220

212221
async def _start_processes(
@@ -219,7 +228,7 @@ async def _start_processes(
219228
list[asyncio.Future],
220229
multiprocessing.Queue,
221230
multiprocessing.Queue,
222-
Optional[multiprocessing.Event]
231+
Optional[MultiprocessingEvent]
223232
]:
224233
await self.worker.prepare_multiprocessing()
225234
shutdown_event = manager.Event() if create_shutdown_event else None
@@ -232,7 +241,6 @@ async def _start_processes(
232241
scheduling_strategy.processes_limit,
233242
scheduling_strategy.processing_requests_limit,
234243
)
235-
num_processes = 1
236244
requests_limit_split = (
237245
scheduling_strategy.processing_requests_limit
238246
// scheduling_strategy.processes_limit
@@ -327,7 +335,7 @@ def _determine_total_requests_count(
327335
scheduling_strategy: SchedulingStrategy,
328336
max_duration: Optional[float],
329337
max_number: Optional[int],
330-
) -> int:
338+
) -> Union[int, float]:
331339
end_number = max_number or math.inf
332340
try:
333341
# update end_number if the request_loader is finite and less than max_number

src/guidellm/scheduler/worker.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import asyncio
22
import math
3-
import multiprocessing
43
import multiprocessing.queues
54
import queue
65
import time
76
from abc import ABC, abstractmethod
87
from collections.abc import AsyncGenerator
98
from dataclasses import dataclass
109
from datetime import timedelta
10+
from multiprocessing.synchronize import Event as MultiprocessingEvent
1111
from typing import (
1212
Any,
1313
Generic,
@@ -124,7 +124,7 @@ async def resolve(
124124

125125
async def get_request(
126126
self, requests_queue: multiprocessing.Queue,
127-
shutdown_event: Optional[multiprocessing.Event] = None,
127+
shutdown_event: Optional[MultiprocessingEvent] = None,
128128
process_id: Optional[int] = None,
129129
) -> Optional[WorkerProcessRequest[RequestT]]:
130130
if shutdown_event is not None and process_id is None:
@@ -186,7 +186,8 @@ async def resolve_scheduler_request(
186186
await asyncio.sleep(wait_time)
187187

188188
info.worker_start = time.time()
189-
request_start_result = WorkerProcessResult(
189+
request_start_result: WorkerProcessResult[RequestT, ResponseT] = \
190+
WorkerProcessResult(
190191
type_="request_start",
191192
request=request,
192193
response=None,
@@ -215,7 +216,7 @@ def process_loop_synchronous(
215216
requests_queue: multiprocessing.Queue,
216217
results_queue: multiprocessing.Queue,
217218
process_id: int,
218-
shutdown_event: Optional[multiprocessing.Event] = None,
219+
shutdown_event: Optional[MultiprocessingEvent] = None,
219220
):
220221
async def _process_runner():
221222
while (
@@ -256,7 +257,7 @@ def process_loop_asynchronous(
256257
results_queue: multiprocessing.Queue,
257258
max_concurrency: int,
258259
process_id: int,
259-
shutdown_event: Optional[multiprocessing.Event] = None,
260+
shutdown_event: Optional[MultiprocessingEvent] = None,
260261
):
261262
async def _process_runner():
262263
pending = asyncio.Semaphore(max_concurrency)
@@ -355,7 +356,7 @@ def process_loop_synchronous(
355356
requests_queue: multiprocessing.Queue,
356357
results_queue: multiprocessing.Queue,
357358
process_id: int,
358-
shutdown_event: Optional[multiprocessing.Event] = None
359+
shutdown_event: Optional[MultiprocessingEvent] = None
359360
):
360361
asyncio.run(self.backend.validate())
361362
super().process_loop_synchronous(
@@ -371,7 +372,7 @@ def process_loop_asynchronous(
371372
results_queue: multiprocessing.Queue,
372373
max_concurrency: int,
373374
process_id: int,
374-
shutdown_event: Optional[multiprocessing.Event] = None
375+
shutdown_event: Optional[MultiprocessingEvent] = None
375376
):
376377
asyncio.run(self.backend.validate())
377378
super().process_loop_asynchronous(

0 commit comments

Comments
 (0)