|
| 1 | +import collections |
1 | 2 | from datetime import timedelta
|
2 | 3 | import asyncio
|
3 | 4 | import math
|
@@ -128,10 +129,11 @@ async def run(
|
128 | 129 | responses_queue,
|
129 | 130 | shutdown_event,
|
130 | 131 | ) = await self._start_processes(
|
131 |
| - manager, executor, scheduling_strategy, max_error_rate is not None |
| 132 | + manager, executor, scheduling_strategy |
132 | 133 | )
|
133 |
| - if shutdown_event and shutdown_event.is_set(): |
| 134 | + if shutdown_event.is_set(): |
134 | 135 | raise RuntimeError("shutdown_event is set before starting scheduling")
|
| 136 | + |
135 | 137 | run_info, requests_iter, times_iter = self._run_setup(
|
136 | 138 | futures, scheduling_strategy, max_number, max_duration, max_error_rate
|
137 | 139 | )
|
@@ -217,27 +219,42 @@ def _validate_scheduler_params(
|
217 | 219 | def _is_max_error_rate_reached(self, run_info: SchedulerRunInfo) -> bool:
|
218 | 220 | if run_info.max_error_rate is None:
|
219 | 221 | return False
|
220 |
| - current_error_rate = run_info.errored_requests / run_info.end_number |
221 |
| - logger.debug( |
222 |
| - f"Current error rate {current_error_rate} " |
223 |
| - f"i.e total_finished [success / error] / max total possible" |
224 |
| - ) |
225 |
| - return run_info.max_error_rate < current_error_rate |
| 222 | + |
| 223 | + is_max_error_rate = run_info.max_error_rate < 1 |
| 224 | + if not is_max_error_rate: |
| 225 | + # Constant value |
| 226 | + raise NotImplementedError() |
| 227 | + if( |
| 228 | + run_info.strategy.type_ == "constant" |
| 229 | + and run_info.end_number != math.inf |
| 230 | + ): |
| 231 | + # We know how many requests |
| 232 | + current_error_rate = run_info.errored_requests / run_info.end_number |
| 233 | + logger.debug( |
| 234 | + f"Current error rate {current_error_rate} " |
| 235 | + f"i.e total_finished [success / error] / max total possible" |
| 236 | + ) |
| 237 | + return run_info.max_error_rate < current_error_rate |
| 238 | + elif settings.constant_error_check_window_size <= run_info.completed_requests: |
| 239 | + # Calculate deque ratio or success to erorr |
| 240 | + if run_info.last_requests_statuses is None: |
| 241 | + raise RuntimeError("") |
| 242 | + return |
| 243 | + return False |
226 | 244 |
|
227 | 245 | async def _start_processes(
|
228 | 246 | self,
|
229 | 247 | manager,
|
230 | 248 | executor: ProcessPoolExecutor,
|
231 | 249 | scheduling_strategy: SchedulingStrategy,
|
232 |
| - create_shutdown_event: bool = False, |
233 | 250 | ) -> tuple[
|
234 | 251 | list[asyncio.Future],
|
235 | 252 | multiprocessing.Queue,
|
236 | 253 | multiprocessing.Queue,
|
237 |
| - Optional[MultiprocessingEvent], |
| 254 | + MultiprocessingEvent, |
238 | 255 | ]:
|
239 | 256 | await self.worker.prepare_multiprocessing()
|
240 |
| - shutdown_event = manager.Event() if create_shutdown_event else None |
| 257 | + shutdown_event = manager.Event() |
241 | 258 | requests_queue = manager.Queue(
|
242 | 259 | maxsize=scheduling_strategy.queued_requests_limit
|
243 | 260 | )
|
@@ -325,6 +342,7 @@ def _run_setup(
|
325 | 342 | processes=len(processes),
|
326 | 343 | strategy=scheduling_strategy,
|
327 | 344 | max_error_rate=max_error_rate,
|
| 345 | + last_requests_statuses = collections.deque(maxlen=settings.constant_error_check_window_size) if max_error_rate > 1 else None |
328 | 346 | )
|
329 | 347 |
|
330 | 348 | return info, requests_iter, times_iter
|
@@ -437,9 +455,14 @@ def _check_result_ready(
|
437 | 455 | run_info.processing_requests -= 1
|
438 | 456 | run_info.completed_requests += 1
|
439 | 457 |
|
440 |
| - if process_response.info.errored: |
| 458 | + is_errored = process_response.info.errored |
| 459 | + if is_errored: |
441 | 460 | run_info.errored_requests += 1
|
442 | 461 |
|
| 462 | + if run_info.last_requests_statuses: |
| 463 | + status = "error" if is_errored else "success" |
| 464 | + run_info.last_requests_statuses.append(status) |
| 465 | + |
443 | 466 | return SchedulerRequestResult(
|
444 | 467 | type_="request_complete",
|
445 | 468 | run_info=run_info,
|
|
0 commit comments