1111
1212import aiodocker
1313import pytest
14+ from dask_task_models_library .container_tasks .utils import generate_dask_job_id
1415from faker import Faker
1516from fastapi import FastAPI
1617from models_library .docker import DockerLabelKey , StandardSimcoreDockerLabels
1718from models_library .generated_models .docker_rest_api import Service , Task
1819from models_library .progress_bar import ProgressReport
20+ from models_library .projects import ProjectID
21+ from models_library .projects_nodes_io import NodeID
1922from models_library .rabbitmq_messages import (
2023 LoggerRabbitMessage ,
2124 ProgressRabbitMessageNode ,
2225 ProgressType ,
2326)
27+ from models_library .services_types import ServiceKey , ServiceVersion
28+ from models_library .users import UserID
2429from pydantic import TypeAdapter
2530from pytest_mock .plugin import MockerFixture
2631from servicelib .rabbitmq import BIND_TO_ALL_TOPICS , RabbitMQClient
2732from settings_library .rabbit import RabbitSettings
33+ from simcore_service_autoscaling .models import DaskTask
2834from simcore_service_autoscaling .utils .rabbitmq import (
2935 post_tasks_log_message ,
3036 post_tasks_progress_message ,
3137)
38+ from tenacity import RetryError , retry_always
3239from tenacity .asyncio import AsyncRetrying
3340from tenacity .retry import retry_if_exception_type
3441from tenacity .stop import stop_after_delay
4148 "wait" : wait_fixed (0.1 ),
4249}
4350
51+ _TENACITY_STABLE_RETRY_PARAMS = {
52+ "reraise" : True ,
53+ "retry" : retry_always ,
54+ "stop" : stop_after_delay (3 ),
55+ "wait" : wait_fixed (1 ),
56+ }
57+
4458
4559# Selection of core and tool services started in this swarm fixture (integration)
4660pytest_simcore_core_services_selection = [
@@ -97,17 +111,95 @@ async def _(labels: dict[DockerLabelKey, str]) -> list[Task]:
97111 )
98112 assert service .spec
99113
100- service_tasks = TypeAdapter (list [Task ]).validate_python (
114+ docker_tasks = TypeAdapter (list [Task ]).validate_python (
101115 await async_docker_client .tasks .list (filters = {"service" : service .spec .name })
102116 )
103- assert service_tasks
104- assert len (service_tasks ) == 1
105- return service_tasks
117+ assert docker_tasks
118+ assert len (docker_tasks ) == 1
119+ return docker_tasks
106120
107121 return _
108122
109123
110- async def test_post_task_log_message (
124+ @pytest .fixture
125+ def service_version () -> ServiceVersion :
126+ return "1.0.0"
127+
128+
129+ @pytest .fixture
130+ def service_key () -> ServiceKey :
131+ return "simcore/services/dynamic/test"
132+
133+
134+ @pytest .fixture
135+ def node_id (faker : Faker ) -> NodeID :
136+ return faker .uuid4 (cast_to = None )
137+
138+
139+ @pytest .fixture
140+ def project_id (faker : Faker ) -> ProjectID :
141+ return faker .uuid4 (cast_to = None )
142+
143+
144+ @pytest .fixture
145+ def user_id (faker : Faker ) -> UserID :
146+ return faker .pyint (min_value = 1 )
147+
148+
149+ @pytest .fixture
150+ def dask_task (
151+ service_key : ServiceKey ,
152+ service_version : ServiceVersion ,
153+ user_id : UserID ,
154+ project_id : ProjectID ,
155+ node_id : NodeID ,
156+ ) -> DaskTask :
157+ dask_key = generate_dask_job_id (
158+ service_key , service_version , user_id , project_id , node_id
159+ )
160+ return DaskTask (task_id = dask_key , required_resources = {})
161+
162+
163+ @pytest .fixture
164+ def dask_task_with_invalid_key (
165+ faker : Faker ,
166+ ) -> DaskTask :
167+ dask_key = faker .pystr ()
168+ return DaskTask (task_id = dask_key , required_resources = {})
169+
170+
171+ async def test_post_task_empty_tasks (
172+ disable_autoscaling_background_task ,
173+ disable_buffers_pool_background_task ,
174+ enabled_rabbitmq : RabbitSettings ,
175+ disabled_ec2 : None ,
176+ disabled_ssm : None ,
177+ mocked_redis_server : None ,
178+ initialized_app : FastAPI ,
179+ logs_rabbitmq_consumer : AsyncMock ,
180+ progress_rabbitmq_consumer : AsyncMock ,
181+ ):
182+ await post_tasks_log_message (initialized_app , tasks = [], message = "no tasks" )
183+ await post_tasks_progress_message (
184+ initialized_app ,
185+ tasks = [],
186+ progress = 0 ,
187+ progress_type = ProgressType .CLUSTER_UP_SCALING ,
188+ )
189+
190+ with pytest .raises (RetryError ): # noqa: PT012
191+ async for attempt in AsyncRetrying (** _TENACITY_STABLE_RETRY_PARAMS ):
192+ with attempt :
193+ print (
194+ f"--> checking for message in rabbit exchange { LoggerRabbitMessage .get_channel_name ()} , { attempt .retry_state .retry_object .statistics } "
195+ )
196+
197+ logs_rabbitmq_consumer .assert_not_called ()
198+ progress_rabbitmq_consumer .assert_not_called ()
199+ print ("... no message received" )
200+
201+
202+ async def test_post_task_log_message_docker (
111203 disable_autoscaling_background_task ,
112204 disable_buffers_pool_background_task ,
113205 enabled_rabbitmq : RabbitSettings ,
@@ -120,13 +212,13 @@ async def test_post_task_log_message(
120212 faker : Faker ,
121213 logs_rabbitmq_consumer : AsyncMock ,
122214):
123- service_tasks = await running_service_tasks (
215+ docker_tasks = await running_service_tasks (
124216 osparc_docker_label_keys .to_simcore_runtime_docker_labels ()
125217 )
126- assert len (service_tasks ) == 1
218+ assert len (docker_tasks ) == 1
127219 log_message = faker .pystr ()
128220 await post_tasks_log_message (
129- initialized_app , tasks = service_tasks , message = log_message , level = 0
221+ initialized_app , tasks = docker_tasks , message = log_message , level = 0
130222 )
131223
132224 async for attempt in AsyncRetrying (** _TENACITY_RETRY_PARAMS ):
@@ -148,7 +240,46 @@ async def test_post_task_log_message(
148240 print ("... message received" )
149241
150242
151- async def test_post_task_progress_message (
243+ async def test_post_task_log_message_dask (
244+ disable_autoscaling_background_task ,
245+ disable_buffers_pool_background_task ,
246+ enabled_rabbitmq : RabbitSettings ,
247+ disabled_ec2 : None ,
248+ disabled_ssm : None ,
249+ mocked_redis_server : None ,
250+ initialized_app : FastAPI ,
251+ dask_task : DaskTask ,
252+ user_id : UserID ,
253+ project_id : ProjectID ,
254+ node_id : NodeID ,
255+ faker : Faker ,
256+ logs_rabbitmq_consumer : AsyncMock ,
257+ ):
258+ log_message = faker .pystr ()
259+ await post_tasks_log_message (
260+ initialized_app , tasks = [dask_task ], message = log_message , level = 0
261+ )
262+
263+ async for attempt in AsyncRetrying (** _TENACITY_RETRY_PARAMS ):
264+ with attempt :
265+ print (
266+ f"--> checking for message in rabbit exchange { LoggerRabbitMessage .get_channel_name ()} , { attempt .retry_state .retry_object .statistics } "
267+ )
268+ logs_rabbitmq_consumer .assert_called_once_with (
269+ LoggerRabbitMessage (
270+ node_id = node_id ,
271+ project_id = project_id ,
272+ user_id = user_id ,
273+ messages = [f"[cluster] { log_message } " ],
274+ log_level = 0 ,
275+ )
276+ .model_dump_json ()
277+ .encode ()
278+ )
279+ print ("... message received" )
280+
281+
282+ async def test_post_task_progress_message_docker (
152283 disable_autoscaling_background_task ,
153284 disable_buffers_pool_background_task ,
154285 enabled_rabbitmq : RabbitSettings ,
@@ -161,15 +292,15 @@ async def test_post_task_progress_message(
161292 faker : Faker ,
162293 progress_rabbitmq_consumer : AsyncMock ,
163294):
164- service_tasks = await running_service_tasks (
295+ docker_tasks = await running_service_tasks (
165296 osparc_docker_label_keys .to_simcore_runtime_docker_labels (),
166297 )
167- assert len (service_tasks ) == 1
298+ assert len (docker_tasks ) == 1
168299
169300 progress_value = faker .pyfloat (min_value = 0 )
170301 await post_tasks_progress_message (
171302 initialized_app ,
172- tasks = service_tasks ,
303+ tasks = docker_tasks ,
173304 progress = progress_value ,
174305 progress_type = ProgressType .CLUSTER_UP_SCALING ,
175306 )
@@ -193,6 +324,48 @@ async def test_post_task_progress_message(
193324 print ("... message received" )
194325
195326
327+ async def test_post_task_progress_message_dask (
328+ disable_autoscaling_background_task ,
329+ disable_buffers_pool_background_task ,
330+ enabled_rabbitmq : RabbitSettings ,
331+ disabled_ec2 : None ,
332+ disabled_ssm : None ,
333+ mocked_redis_server : None ,
334+ initialized_app : FastAPI ,
335+ dask_task : DaskTask ,
336+ user_id : UserID ,
337+ project_id : ProjectID ,
338+ node_id : NodeID ,
339+ faker : Faker ,
340+ progress_rabbitmq_consumer : AsyncMock ,
341+ ):
342+ progress_value = faker .pyfloat (min_value = 0 )
343+ await post_tasks_progress_message (
344+ initialized_app ,
345+ tasks = [dask_task ],
346+ progress = progress_value ,
347+ progress_type = ProgressType .CLUSTER_UP_SCALING ,
348+ )
349+
350+ async for attempt in AsyncRetrying (** _TENACITY_RETRY_PARAMS ):
351+ with attempt :
352+ print (
353+ f"--> checking for message in rabbit exchange { ProgressRabbitMessageNode .get_channel_name ()} , { attempt .retry_state .retry_object .statistics } "
354+ )
355+ progress_rabbitmq_consumer .assert_called_once_with (
356+ ProgressRabbitMessageNode (
357+ node_id = node_id ,
358+ project_id = project_id ,
359+ user_id = user_id ,
360+ progress_type = ProgressType .CLUSTER_UP_SCALING ,
361+ report = ProgressReport (actual_value = progress_value , total = 1 ),
362+ )
363+ .model_dump_json ()
364+ .encode ()
365+ )
366+ print ("... message received" )
367+
368+
196369async def test_post_task_messages_does_not_raise_if_service_has_no_labels (
197370 disable_autoscaling_background_task ,
198371 disable_buffers_pool_background_task ,
@@ -204,17 +377,44 @@ async def test_post_task_messages_does_not_raise_if_service_has_no_labels(
204377 running_service_tasks : Callable [[dict [DockerLabelKey , str ]], Awaitable [list [Task ]]],
205378 faker : Faker ,
206379):
207- service_tasks = await running_service_tasks ({})
208- assert len (service_tasks ) == 1
380+ docker_tasks = await running_service_tasks ({})
381+ assert len (docker_tasks ) == 1
382+
383+ # this shall not raise any exception even if the task does not contain
384+ # the necessary labels
385+ await post_tasks_log_message (
386+ initialized_app , tasks = docker_tasks , message = faker .pystr (), level = 0
387+ )
388+ await post_tasks_progress_message (
389+ initialized_app ,
390+ tasks = docker_tasks ,
391+ progress = faker .pyfloat (min_value = 0 ),
392+ progress_type = ProgressType .CLUSTER_UP_SCALING ,
393+ )
394+
209395
396+ async def test_post_task_messages_does_not_raise_if_dask_task_key_is_invalid (
397+ disable_autoscaling_background_task ,
398+ disable_buffers_pool_background_task ,
399+ enabled_rabbitmq : RabbitSettings ,
400+ disabled_ec2 : None ,
401+ disabled_ssm : None ,
402+ mocked_redis_server : None ,
403+ initialized_app : FastAPI ,
404+ dask_task_with_invalid_key : DaskTask ,
405+ faker : Faker ,
406+ ):
210407 # this shall not raise any exception even if the task does not contain
211408 # the necessary labels
212409 await post_tasks_log_message (
213- initialized_app , tasks = service_tasks , message = faker .pystr (), level = 0
410+ initialized_app ,
411+ tasks = [dask_task_with_invalid_key ],
412+ message = faker .pystr (),
413+ level = 0 ,
214414 )
215415 await post_tasks_progress_message (
216416 initialized_app ,
217- tasks = service_tasks ,
417+ tasks = [ dask_task_with_invalid_key ] ,
218418 progress = faker .pyfloat (min_value = 0 ),
219419 progress_type = ProgressType .CLUSTER_UP_SCALING ,
220420 )
0 commit comments