|
5 | 5 | import asyncio
|
6 | 6 | from contextlib import suppress
|
7 | 7 | from datetime import datetime, timedelta, timezone
|
| 8 | +from itertools import chain, repeat |
8 | 9 | from typing import TYPE_CHECKING, TypeVar, cast
|
9 | 10 | from unittest.mock import Mock
|
10 | 11 |
|
@@ -208,6 +209,60 @@ def get_historical_system_info() -> SystemInfo:
|
208 | 209 | await pool_run_task
|
209 | 210 |
|
210 | 211 |
|
| 212 | +async def test_autoscales_uses_desired_concurrency_ratio(system_status: SystemStatus | Mock) -> None: |
| 213 | + """Test that desired concurrency ratio can limit desired concurrency. |
| 214 | +
|
| 215 | + This test creates situation where only one task is ready and then no other task is ever ready. |
| 216 | + This creates situation where the system could scale up desired concurrency, but it will not do so because |
| 217 | + desired_concurrency_ratio=1 means that first the system would have to increase current concurrency to same number as |
| 218 | + desired concurrency and due to no other task ever being ready, it will never happen. Thus desired concurrency will |
| 219 | + stay 2 as was the initial setup, even though other conditions would allow the increase. (max_concurrency=4, |
| 220 | + system being idle).""" |
| 221 | + |
| 222 | + async def run() -> None: |
| 223 | + await asyncio.sleep(0.1) |
| 224 | + |
| 225 | + is_task_ready_iterator = chain([future(True)], repeat(future(False))) |
| 226 | + |
| 227 | + def is_task_ready_function() -> Awaitable[bool]: |
| 228 | + return next(is_task_ready_iterator) |
| 229 | + |
| 230 | + def get_historical_system_info() -> SystemInfo: |
| 231 | + return SystemInfo( |
| 232 | + cpu_info=LoadRatioInfo(limit_ratio=0.9, actual_ratio=0.3), |
| 233 | + memory_info=LoadRatioInfo(limit_ratio=0.9, actual_ratio=0.3), |
| 234 | + event_loop_info=LoadRatioInfo(limit_ratio=0.9, actual_ratio=0.3), |
| 235 | + client_info=LoadRatioInfo(limit_ratio=0.9, actual_ratio=0.3), |
| 236 | + ) |
| 237 | + |
| 238 | + cast(Mock, system_status.get_historical_system_info).side_effect = get_historical_system_info |
| 239 | + |
| 240 | + pool = AutoscaledPool( |
| 241 | + system_status=system_status, |
| 242 | + run_task_function=run, |
| 243 | + is_task_ready_function=is_task_ready_function, |
| 244 | + is_finished_function=lambda: future(False), |
| 245 | + concurrency_settings=ConcurrencySettings( |
| 246 | + min_concurrency=2, |
| 247 | + desired_concurrency=2, |
| 248 | + max_concurrency=4, |
| 249 | + ), |
| 250 | + autoscale_interval=timedelta(seconds=0.1), |
| 251 | + desired_concurrency_ratio=1, |
| 252 | + ) |
| 253 | + |
| 254 | + pool_run_task = asyncio.create_task(pool.run(), name='pool run task') |
| 255 | + try: |
| 256 | + for _ in range(5): |
| 257 | + assert pool.desired_concurrency == 2 |
| 258 | + await asyncio.sleep(0.1) |
| 259 | + |
| 260 | + finally: |
| 261 | + pool_run_task.cancel() |
| 262 | + with suppress(asyncio.CancelledError): |
| 263 | + await pool_run_task |
| 264 | + |
| 265 | + |
211 | 266 | async def test_max_tasks_per_minute_works(system_status: SystemStatus | Mock) -> None:
|
212 | 267 | done_count = 0
|
213 | 268 |
|
|
0 commit comments