@@ -119,6 +119,9 @@ def dask_subsystem_mock(
119119 )
120120 mock_rabbitmq_client = create_rabbitmq_client ("pytest_dask_sidecar_logs_publisher" )
121121 mock_dask_rabbitmq_plugin .get_client .return_value = mock_rabbitmq_client
122+ mock_dask_rabbitmq_plugin .publish_message_from_any_thread = (
123+ mock_rabbitmq_client .publish
124+ )
122125
123126 mocker .patch (
124127 "simcore_service_dask_sidecar.utils.dask.get_rabbitmq_client" ,
@@ -707,15 +710,16 @@ async def test_run_computational_sidecar_dask(
707710
708711
709712@pytest .mark .parametrize (
710- "integration_version, boot_mode" , [("1.0.0" , BootMode .CPU )], indirect = True
713+ "integration_version, boot_mode, task_owner" ,
714+ [("1.0.0" , BootMode .CPU , "no_parent_node" )],
715+ indirect = True ,
711716)
712717async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub (
713718 dask_client : distributed .Client ,
714719 sidecar_task : Callable [..., ServiceExampleParam ],
715720 progress_sub : distributed .Sub ,
716721 mocked_get_image_labels : mock .Mock ,
717722 log_rabbit_client_parser : mock .AsyncMock ,
718- task_owner : TaskOwner ,
719723):
720724 mocked_get_image_labels .assert_not_called ()
721725 NUMBER_OF_LOGS = 20000
@@ -761,11 +765,7 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
761765 ]
762766 # check all the awaited logs are in there
763767 filtered_worker_logs = filter (lambda log : "This is iteration" in log , worker_logs )
764- assert (
765- len (list (filtered_worker_logs )) == (2 * NUMBER_OF_LOGS )
766- if task_owner .has_parent
767- else NUMBER_OF_LOGS
768- )
768+ assert len (list (filtered_worker_logs )) == NUMBER_OF_LOGS
769769 mocked_get_image_labels .assert_called ()
770770
771771
0 commit comments