Skip to content

Commit be0f52a

Browse files
committed
Determine number of tasks on per-process basis
1 parent 3be2b5d commit be0f52a

File tree

3 files changed

+48
-13
lines changed

3 files changed

+48
-13
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,6 @@ async def _start_processes(
188188
maxsize=scheduling_strategy.queued_requests_limit
189189
)
190190
responses_queue = manager.Queue()
191-
per_process_requests_limit = scheduling_strategy.processing_requests_limit // (
192-
scheduling_strategy.processes_limit
193-
)
194191

195192
futures = []
196193
loop = asyncio.get_event_loop()
@@ -206,16 +203,17 @@ async def _start_processes(
206203
)
207204
)
208205
elif scheduling_strategy.processing_mode == "async":
209-
futures.append(
210-
loop.run_in_executor(
211-
executor,
212-
self.worker.process_loop_asynchronous,
213-
requests_queue,
214-
responses_queue,
215-
per_process_requests_limit,
216-
process_id,
206+
if scheduling_strategy.process_requests_limits[process_id]:
207+
futures.append(
208+
loop.run_in_executor(
209+
executor,
210+
self.worker.process_loop_asynchronous,
211+
requests_queue,
212+
responses_queue,
213+
scheduling_strategy.process_requests_limits[process_id],
214+
process_id,
215+
)
217216
)
218-
)
219217
else:
220218
raise ValueError(
221219
f"Invalid processing mode: {scheduling_strategy.processing_mode} "

src/guidellm/scheduler/strategy.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,23 @@ def processing_requests_limit(self) -> int:
9494
"""
9595
return settings.max_concurrency
9696

97+
@property
98+
def process_requests_limits(self) -> list[int]:
99+
"""
100+
The maximum number of requests per process for the scheduling strategy.
101+
It determines how many requests can be processed by each worker process
102+
for the scheduling strategy.
103+
104+
:return: A per-process list of the maximum number of requests per process.
105+
"""
106+
split = self.processing_requests_limit // self.processes_limit
107+
remain = self.processing_requests_limit % self.processes_limit
108+
109+
return [
110+
split + 1 if i < remain else split
111+
for i in range(self.processes_limit)
112+
]
113+
97114
def request_times(self) -> Generator[float, None, None]:
98115
"""
99116
A generator that yields timestamps for when requests should be sent.
@@ -168,6 +185,18 @@ def processing_requests_limit(self) -> int:
168185
"""
169186
return 1
170187

188+
@property
189+
def process_requests_limits(self) -> List[int]:
190+
"""
191+
The maximum number of requests per process for the scheduling strategy.
192+
It determines how many requests can be processed by each worker process
193+
for the scheduling strategy.
194+
195+
:return: A per-process list of the maximum number of requests per process.
196+
"""
197+
198+
return [1]
199+
171200
def request_times(self) -> Generator[float, None, None]:
172201
"""
173202
A generator that yields time.time() so requests are sent immediately,

src/guidellm/scheduler/worker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,15 @@ def process_loop_asynchronous(
224224
process_id: int,
225225
):
226226
async def _process_runner():
227-
pending = asyncio.Semaphore(max_concurrency) if max_concurrency else None
227+
if max_concurrency is not None:
228+
if max_concurrency < 1:
229+
raise ValueError(
230+
f"max_concurrency must be greater than 0, got {max_concurrency}"
231+
)
232+
233+
pending = asyncio.Semaphore(max_concurrency)
234+
else:
235+
pending = None
228236

229237
while (
230238
process_request := await self.get_request(requests_queue)

0 commit comments

Comments
 (0)