Skip to content

Commit c134f66

Browse files
author
markvaykhansky
committed
WIP
1 parent 3361d2f commit c134f66

File tree

3 files changed

+52
-58
lines changed

3 files changed

+52
-58
lines changed

.pre-commit-config.yaml

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,40 @@ repos:
44
hooks:
55
- id: trailing-whitespace
66
- id: end-of-file-fixer
7-
- repo: https://github.com/astral-sh/ruff-pre-commit
8-
rev: v0.11.7
9-
hooks:
10-
- id: ruff
11-
- repo: https://github.com/pre-commit/mirrors-mypy
12-
rev: v1.15.0
13-
hooks:
14-
- id: mypy
15-
args: [--check-untyped-defs]
16-
additional_dependencies:
17-
[
18-
# main dependencies
19-
click,
20-
datasets,
21-
ftfy,
22-
loguru,
23-
numpy,
24-
pillow,
25-
pydantic,
26-
pydantic_settings,
27-
pyyaml,
28-
respx,
29-
rich,
30-
setuptools,
31-
setuptools-git-versioning,
32-
transformers,
33-
34-
# dev dependencies
35-
pytest,
36-
pydantic_settings,
37-
38-
# types
39-
types-click,
40-
types-PyYAML,
41-
types-requests,
42-
types-toml,
43-
]
7+
#- repo: https://github.com/astral-sh/ruff-pre-commit
8+
# rev: v0.11.7
9+
# hooks:
10+
# - id: ruff
11+
#- repo: https://github.com/pre-commit/mirrors-mypy
12+
# rev: v1.15.0
13+
# hooks:
14+
# - id: mypy
15+
# args: [--check-untyped-defs]
16+
# additional_dependencies:
17+
# [
18+
# # main dependencies
19+
# click,
20+
# datasets,
21+
# ftfy,
22+
# loguru,
23+
# numpy,
24+
# pillow,
25+
# pydantic,
26+
# pydantic_settings,
27+
# pyyaml,
28+
# respx,
29+
# rich,
30+
# setuptools,
31+
# setuptools-git-versioning,
32+
# transformers,
33+
#
34+
# # dev dependencies
35+
# pytest,
36+
# pydantic_settings,
37+
#
38+
# # types
39+
# types-click,
40+
# types-PyYAML,
41+
# types-requests,
42+
# types-toml,
43+
# ]

src/guidellm/scheduler/scheduler.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import timedelta
12
import asyncio
23
import math
34
import multiprocessing
@@ -269,27 +270,17 @@ async def _start_processes(
269270
futures = []
270271
loop = asyncio.get_event_loop()
271272
for id_, requests_limit in zip(process_ids, process_requests_limits):
272-
if scheduling_strategy.processing_mode == "sync":
273+
if scheduling_strategy.processing_mode in ["sync", "async"]:
273274
futures.append(
274275
loop.run_in_executor(
275276
executor,
276-
self.worker.process_loop_synchronous,
277+
self.worker.run_process,
277278
requests_queue,
278279
responses_queue,
279-
id_,
280280
shutdown_event,
281-
)
282-
)
283-
elif scheduling_strategy.processing_mode == "async":
284-
futures.append(
285-
loop.run_in_executor(
286-
executor,
287-
self.worker.process_loop_asynchronous,
288-
requests_queue,
289-
responses_queue,
290-
requests_limit,
281+
timedelta(seconds=10).total_seconds(),
291282
id_,
292-
shutdown_event,
283+
requests_limit,
293284
)
294285
)
295286
else:

src/guidellm/scheduler/worker.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,22 +194,25 @@ async def resolve_scheduler_request(
194194

195195
def run_process(
196196
self,
197-
type_: Literal["synchronous", "asynchronous"],
197+
type_: Literal["sync", "async"],
198198
requests_queue: multiprocessing.Queue,
199199
results_queue: multiprocessing.Queue,
200200
shutdown_event: multiprocessing.Event,
201-
shutdown_poll_interval: float,
201+
shutdown_poll_interval_seconds: float,
202202
process_id: int,
203-
max_concurrency: int,
203+
max_concurrency: Optional[int] = None,
204204
):
205205
async def _process_runner():
206-
if type_ == "synchronous":
206+
if type_ == "sync":
207207
loop_task = asyncio.create_task(self._process_synchronous_requests_loop(
208208
requests_queue=requests_queue,
209209
results_queue=results_queue,
210210
process_id=process_id,
211211
), name="request_loop_processor_task")
212-
elif type_ == "asynchronous":
212+
elif type_ == "async":
213+
if max_concurrency is None:
214+
raise ValueError("max_concurrency must be set "
215+
"for async processor")
213216
loop_task = asyncio.create_task(self._process_asynchronous_requests_loop(
214217
requests_queue=requests_queue,
215218
results_queue=results_queue,
@@ -220,8 +223,8 @@ async def _process_runner():
220223
raise ValueError(f"Invalid process type: {type_}")
221224

222225
shutdown_task = asyncio.create_task(
223-
self._wait_for_shutdown(shutdown_event, shutdown_poll_interval),
224-
name="shutdown_task"
226+
self._wait_for_shutdown(shutdown_event, shutdown_poll_interval_seconds),
227+
name="shutdown_task",
225228
)
226229

227230
done, pending = await asyncio.wait(

0 commit comments

Comments
 (0)