|
43 | 43 | ) |
44 | 44 | from simcore_service_autoscaling.modules.dask import DaskTaskResources |
45 | 45 | from simcore_service_autoscaling.modules.docker import get_docker_client |
46 | | -from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API |
47 | 46 | from simcore_service_autoscaling.utils.utils_docker import ( |
48 | 47 | _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, |
49 | 48 | _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY, |
@@ -182,6 +181,92 @@ def ec2_instance_custom_tags( |
182 | 181 | } |
183 | 182 |
|
184 | 183 |
|
| 184 | +@pytest.fixture |
| 185 | +def create_dask_task_resources() -> Callable[..., DaskTaskResources]: |
| 186 | + def _do( |
| 187 | + ec2_instance_type: InstanceTypeType | None, ram: ByteSize |
| 188 | + ) -> DaskTaskResources: |
| 189 | + resources = DaskTaskResources( |
| 190 | + { |
| 191 | + "RAM": int(ram), |
| 192 | + } |
| 193 | + ) |
| 194 | + if ec2_instance_type is not None: |
| 195 | + resources[create_ec2_resource_constraint_key(ec2_instance_type)] = 1 |
| 196 | + return resources |
| 197 | + |
| 198 | + return _do |
| 199 | + |
| 200 | + |
| 201 | +@pytest.fixture |
| 202 | +def mock_dask_get_worker_has_results_in_memory(mocker: MockerFixture) -> mock.Mock: |
| 203 | + return mocker.patch( |
| 204 | + "simcore_service_autoscaling.modules.dask.get_worker_still_has_results_in_memory", |
| 205 | + return_value=0, |
| 206 | + autospec=True, |
| 207 | + ) |
| 208 | + |
| 209 | + |
| 210 | +@pytest.fixture |
| 211 | +def mock_dask_get_worker_used_resources(mocker: MockerFixture) -> mock.Mock: |
| 212 | + return mocker.patch( |
| 213 | + "simcore_service_autoscaling.modules.dask.get_worker_used_resources", |
| 214 | + return_value=Resources.create_as_empty(), |
| 215 | + autospec=True, |
| 216 | + ) |
| 217 | + |
| 218 | + |
| 219 | +@pytest.fixture |
| 220 | +def mock_dask_is_worker_connected(mocker: MockerFixture) -> mock.Mock: |
| 221 | + return mocker.patch( |
| 222 | + "simcore_service_autoscaling.modules.dask.is_worker_connected", |
| 223 | + return_value=True, |
| 224 | + autospec=True, |
| 225 | + ) |
| 226 | + |
| 227 | + |
| 228 | +async def _create_task_with_resources( |
| 229 | + ec2_client: EC2Client, |
| 230 | + dask_task_imposed_ec2_type: InstanceTypeType | None, |
| 231 | + dask_ram: ByteSize | None, |
| 232 | + create_dask_task_resources: Callable[..., DaskTaskResources], |
| 233 | + create_dask_task: Callable[[DaskTaskResources], distributed.Future], |
| 234 | +) -> distributed.Future: |
| 235 | + if dask_task_imposed_ec2_type and not dask_ram: |
| 236 | + instance_types = await ec2_client.describe_instance_types( |
| 237 | + InstanceTypes=[dask_task_imposed_ec2_type] |
| 238 | + ) |
| 239 | + assert instance_types |
| 240 | + assert "InstanceTypes" in instance_types |
| 241 | + assert instance_types["InstanceTypes"] |
| 242 | + assert "MemoryInfo" in instance_types["InstanceTypes"][0] |
| 243 | + assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] |
| 244 | + dask_ram = TypeAdapter(ByteSize).validate_python( |
| 245 | + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", |
| 246 | + ) |
| 247 | + dask_task_resources = create_dask_task_resources( |
| 248 | + dask_task_imposed_ec2_type, dask_ram |
| 249 | + ) |
| 250 | + dask_future = create_dask_task(dask_task_resources) |
| 251 | + assert dask_future |
| 252 | + return dask_future |
| 253 | + |
| 254 | + |
| 255 | +@dataclass(frozen=True) |
| 256 | +class _ScaleUpParams: |
| 257 | + task_resources: Resources |
| 258 | + num_tasks: int |
| 259 | + expected_instance_type: str |
| 260 | + expected_num_instances: int |
| 261 | + |
| 262 | + |
| 263 | +def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: |
| 264 | + return { |
| 265 | + res_key.upper(): res_value |
| 266 | + for res_key, res_value in resources.model_dump().items() |
| 267 | + } |
| 268 | + |
| 269 | + |
185 | 270 | async def test_cluster_scaling_with_no_tasks_does_nothing( |
186 | 271 | minimal_configuration: None, |
187 | 272 | app_settings: ApplicationSettings, |
@@ -259,77 +344,6 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( |
259 | 344 | ) |
260 | 345 |
|
261 | 346 |
|
262 | | -@pytest.fixture |
263 | | -def create_dask_task_resources() -> Callable[..., DaskTaskResources]: |
264 | | - def _do( |
265 | | - ec2_instance_type: InstanceTypeType | None, ram: ByteSize |
266 | | - ) -> DaskTaskResources: |
267 | | - resources = DaskTaskResources( |
268 | | - { |
269 | | - "RAM": int(ram), |
270 | | - } |
271 | | - ) |
272 | | - if ec2_instance_type is not None: |
273 | | - resources[create_ec2_resource_constraint_key(ec2_instance_type)] = 1 |
274 | | - return resources |
275 | | - |
276 | | - return _do |
277 | | - |
278 | | - |
279 | | -@pytest.fixture |
280 | | -def mock_dask_get_worker_has_results_in_memory(mocker: MockerFixture) -> mock.Mock: |
281 | | - return mocker.patch( |
282 | | - "simcore_service_autoscaling.modules.dask.get_worker_still_has_results_in_memory", |
283 | | - return_value=0, |
284 | | - autospec=True, |
285 | | - ) |
286 | | - |
287 | | - |
288 | | -@pytest.fixture |
289 | | -def mock_dask_get_worker_used_resources(mocker: MockerFixture) -> mock.Mock: |
290 | | - return mocker.patch( |
291 | | - "simcore_service_autoscaling.modules.dask.get_worker_used_resources", |
292 | | - return_value=Resources.create_as_empty(), |
293 | | - autospec=True, |
294 | | - ) |
295 | | - |
296 | | - |
297 | | -@pytest.fixture |
298 | | -def mock_dask_is_worker_connected(mocker: MockerFixture) -> mock.Mock: |
299 | | - return mocker.patch( |
300 | | - "simcore_service_autoscaling.modules.dask.is_worker_connected", |
301 | | - return_value=True, |
302 | | - autospec=True, |
303 | | - ) |
304 | | - |
305 | | - |
306 | | -async def _create_task_with_resources( |
307 | | - ec2_client: EC2Client, |
308 | | - dask_task_imposed_ec2_type: InstanceTypeType | None, |
309 | | - dask_ram: ByteSize | None, |
310 | | - create_dask_task_resources: Callable[..., DaskTaskResources], |
311 | | - create_dask_task: Callable[[DaskTaskResources], distributed.Future], |
312 | | -) -> distributed.Future: |
313 | | - if dask_task_imposed_ec2_type and not dask_ram: |
314 | | - instance_types = await ec2_client.describe_instance_types( |
315 | | - InstanceTypes=[dask_task_imposed_ec2_type] |
316 | | - ) |
317 | | - assert instance_types |
318 | | - assert "InstanceTypes" in instance_types |
319 | | - assert instance_types["InstanceTypes"] |
320 | | - assert "MemoryInfo" in instance_types["InstanceTypes"][0] |
321 | | - assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] |
322 | | - dask_ram = TypeAdapter(ByteSize).validate_python( |
323 | | - f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", |
324 | | - ) |
325 | | - dask_task_resources = create_dask_task_resources( |
326 | | - dask_task_imposed_ec2_type, dask_ram |
327 | | - ) |
328 | | - dask_future = create_dask_task(dask_task_resources) |
329 | | - assert dask_future |
330 | | - return dask_future |
331 | | - |
332 | | - |
333 | 347 | @pytest.mark.acceptance_test() |
334 | 348 | @pytest.mark.parametrize( |
335 | 349 | "dask_task_imposed_ec2_type, dask_ram, expected_ec2_type", |
@@ -809,42 +823,6 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso |
809 | 823 | assert "Unexpected error:" in error_messages[0] |
810 | 824 |
|
811 | 825 |
|
812 | | -@dataclass(frozen=True) |
813 | | -class _ScaleUpParams: |
814 | | - task_resources: Resources |
815 | | - num_tasks: int |
816 | | - expected_instance_type: str |
817 | | - expected_num_instances: int |
818 | | - |
819 | | - |
820 | | -def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: |
821 | | - return { |
822 | | - res_key.upper(): res_value |
823 | | - for res_key, res_value in resources.model_dump().items() |
824 | | - } |
825 | | - |
826 | | - |
827 | | -@pytest.fixture |
828 | | -def patch_ec2_client_launch_instancess_min_number_of_instances( |
829 | | - mocker: MockerFixture, |
830 | | -) -> mock.Mock: |
831 | | - """the moto library always returns min number of instances instead of max number of instances which makes |
832 | | - it difficult to test scaling to multiple of machines. this should help""" |
833 | | - original_fct = SimcoreEC2API.launch_instances |
834 | | - |
835 | | - async def _change_parameters(*args, **kwargs) -> list[EC2InstanceData]: |
836 | | - new_kwargs = kwargs | {"min_number_of_instances": kwargs["number_of_instances"]} |
837 | | - print(f"patching launch_instances with: {new_kwargs}") |
838 | | - return await original_fct(*args, **new_kwargs) |
839 | | - |
840 | | - return mocker.patch.object( |
841 | | - SimcoreEC2API, |
842 | | - "launch_instances", |
843 | | - autospec=True, |
844 | | - side_effect=_change_parameters, |
845 | | - ) |
846 | | - |
847 | | - |
848 | 826 | @pytest.mark.parametrize( |
849 | 827 | "scale_up_params", |
850 | 828 | [ |
|
0 commit comments