@@ -188,34 +188,46 @@ async def _start_processes(
188188 maxsize = scheduling_strategy .queued_requests_limit
189189 )
190190 responses_queue = manager .Queue ()
191- per_process_requests_limit = scheduling_strategy .processing_requests_limit // (
192- scheduling_strategy .processes_limit
191+
192+ num_processes = min (
193+ scheduling_strategy .processes_limit ,
194+ scheduling_strategy .processing_requests_limit
195+ )
196+ requests_limit_split = (scheduling_strategy .processing_requests_limit //
197+ scheduling_strategy .processes_limit )
198+ requests_limit_remain = (scheduling_strategy .processing_requests_limit %
199+ scheduling_strategy .processes_limit )
200+ process_ids = (id_ for id_ in range (num_processes ))
201+ process_requests_limits = (
202+ requests_limit_split + 1
203+ if i < requests_limit_remain else requests_limit_split
204+ for i in range (num_processes )
193205 )
194206
195207 futures = []
196208 loop = asyncio .get_event_loop ()
197- for process_id in range ( scheduling_strategy . processes_limit ):
209+ for id_ , requests_limit in zip ( process_ids , process_requests_limits ):
198210 if scheduling_strategy .processing_mode == "sync" :
199211 futures .append (
200212 loop .run_in_executor (
201213 executor ,
202214 self .worker .process_loop_synchronous ,
203215 requests_queue ,
204216 responses_queue ,
205- process_id ,
217+ id_ ,
206218 )
207219 )
208220 elif scheduling_strategy .processing_mode == "async" :
209- futures .append (
210- loop .run_in_executor (
211- executor ,
212- self .worker .process_loop_asynchronous ,
213- requests_queue ,
214- responses_queue ,
215- per_process_requests_limit ,
216- process_id ,
221+ futures .append (
222+ loop .run_in_executor (
223+ executor ,
224+ self .worker .process_loop_asynchronous ,
225+ requests_queue ,
226+ responses_queue ,
227+ requests_limit ,
228+ id_ ,
229+ )
217230 )
218- )
219231 else :
220232 raise ValueError (
221233 f"Invalid processing mode: { scheduling_strategy .processing_mode } "
0 commit comments