@@ -110,10 +110,9 @@ async def run(
110
110
Each SchedulerResult object contains information about the request,
111
111
the response, and the run information.
112
112
"""
113
- self ._validate_scheduler_params (scheduling_strategy ,
114
- max_duration ,
115
- max_error_rate ,
116
- max_number )
113
+ self ._validate_scheduler_params (
114
+ scheduling_strategy , max_duration , max_error_rate , max_number
115
+ )
117
116
118
117
with (
119
118
multiprocessing .Manager () as manager ,
@@ -122,13 +121,16 @@ async def run(
122
121
) as executor ,
123
122
):
124
123
requests_iter : Optional [Iterator [Any ]] = None
125
- futures , requests_queue , responses_queue , shutdown_event = \
126
- await self ._start_processes (
127
- manager , executor , scheduling_strategy , max_error_rate is not None )
124
+ (
125
+ futures ,
126
+ requests_queue ,
127
+ responses_queue ,
128
+ shutdown_event ,
129
+ ) = await self ._start_processes (
130
+ manager , executor , scheduling_strategy , max_error_rate is not None
131
+ )
128
132
if shutdown_event and shutdown_event .is_set ():
129
- raise RuntimeError (
130
- "shutdown_event is set before starting scheduling"
131
- )
133
+ raise RuntimeError ("shutdown_event is set before starting scheduling" )
132
134
run_info , requests_iter , times_iter = self ._run_setup (
133
135
futures , scheduling_strategy , max_number , max_duration , max_error_rate
134
136
)
@@ -166,17 +168,23 @@ async def run(
166
168
run_info ,
167
169
)
168
170
if iter_result is not None :
169
- if iter_result .request_info .errored \
170
- and not iter_result .request_info .canceled \
171
- and self ._is_max_error_rate_reached (iter_result .run_info ):
171
+ if (
172
+ iter_result .request_info .errored
173
+ and not iter_result .request_info .canceled
174
+ and self ._is_max_error_rate_reached (iter_result .run_info )
175
+ ):
172
176
if shutdown_event is None :
173
- raise RuntimeError ("We've reached max_error_rate "
174
- "but shutdown_event is corrupt" )
177
+ raise RuntimeError (
178
+ "We've reached max_error_rate "
179
+ "but shutdown_event is corrupt"
180
+ )
175
181
shutdown_event .set ()
176
182
max_error_rate_reached = True
177
- logger .info (f"Max error rate of "
178
- f"({ iter_result .run_info .max_error_rate } ) "
179
- f"reached, sending shutdown signal" )
183
+ logger .info (
184
+ f"Max error rate of "
185
+ f"({ iter_result .run_info .max_error_rate } ) "
186
+ f"reached, sending shutdown signal"
187
+ )
180
188
yield iter_result
181
189
182
190
# yield control to the event loop
@@ -192,14 +200,14 @@ async def run(
192
200
await self ._stop_processes (futures , requests_queue )
193
201
194
202
def _validate_scheduler_params (
195
- self ,
196
- scheduling_strategy : SchedulingStrategy ,
197
- max_duration : Optional [float ],
198
- max_error_rate : Optional [float ],
199
- max_number : Optional [int ]
203
+ self ,
204
+ scheduling_strategy : SchedulingStrategy ,
205
+ max_duration : Optional [float ],
206
+ max_error_rate : Optional [float ],
207
+ max_number : Optional [int ],
200
208
) -> None :
201
209
if scheduling_strategy is None or not isinstance (
202
- scheduling_strategy , SchedulingStrategy
210
+ scheduling_strategy , SchedulingStrategy
203
211
):
204
212
raise ValueError (f"Invalid scheduling strategy: { scheduling_strategy } " )
205
213
if max_number is not None and max_number < 1 :
@@ -213,21 +221,23 @@ def _is_max_error_rate_reached(self, run_info: SchedulerRunInfo) -> bool:
213
221
if run_info .max_error_rate is None :
214
222
return False
215
223
current_error_rate = run_info .errored_requests / run_info .end_number
216
- logger .info (f"Current error rate { current_error_rate } "
217
- f"i.e total_finished [success / error] / max total possible" )
224
+ logger .info (
225
+ f"Current error rate { current_error_rate } "
226
+ f"i.e total_finished [success / error] / max total possible"
227
+ )
218
228
return run_info .max_error_rate < current_error_rate
219
229
220
230
async def _start_processes (
221
231
self ,
222
232
manager ,
223
233
executor : ProcessPoolExecutor ,
224
234
scheduling_strategy : SchedulingStrategy ,
225
- create_shutdown_event : bool = False
235
+ create_shutdown_event : bool = False ,
226
236
) -> tuple [
227
237
list [asyncio .Future ],
228
238
multiprocessing .Queue ,
229
239
multiprocessing .Queue ,
230
- Optional [MultiprocessingEvent ]
240
+ Optional [MultiprocessingEvent ],
231
241
]:
232
242
await self .worker .prepare_multiprocessing ()
233
243
shutdown_event = manager .Event () if create_shutdown_event else None
@@ -309,8 +319,10 @@ def _run_setup(
309
319
)
310
320
311
321
if end_number == math .inf and max_error_rate is not None :
312
- logger .warning ("max_error_rate will be ignored "
313
- "because end_number can not be determined." )
322
+ logger .warning (
323
+ "max_error_rate will be ignored "
324
+ "because end_number can not be determined."
325
+ )
314
326
315
327
if end_number == math .inf and end_time is None :
316
328
logger .warning (
@@ -324,16 +336,16 @@ def _run_setup(
324
336
end_number = end_number ,
325
337
processes = len (processes ),
326
338
strategy = scheduling_strategy ,
327
- max_error_rate = max_error_rate
339
+ max_error_rate = max_error_rate ,
328
340
)
329
341
330
342
return info , requests_iter , times_iter
331
343
332
344
def _determine_total_requests_count (
333
- self ,
334
- scheduling_strategy : SchedulingStrategy ,
335
- max_duration : Optional [float ],
336
- max_number : Optional [int ],
345
+ self ,
346
+ scheduling_strategy : SchedulingStrategy ,
347
+ max_duration : Optional [float ],
348
+ max_number : Optional [int ],
337
349
) -> Union [int , float ]:
338
350
end_number = max_number or math .inf
339
351
try :
0 commit comments