@@ -119,6 +119,9 @@ def dask_subsystem_mock(
119119    )
120120    mock_rabbitmq_client  =  create_rabbitmq_client ("pytest_dask_sidecar_logs_publisher" )
121121    mock_dask_rabbitmq_plugin .get_client .return_value  =  mock_rabbitmq_client 
122+     mock_dask_rabbitmq_plugin .publish_message_from_any_thread  =  (
123+         mock_rabbitmq_client .publish 
124+     )
122125
123126    mocker .patch (
124127        "simcore_service_dask_sidecar.utils.dask.get_rabbitmq_client" ,
@@ -707,15 +710,16 @@ async def test_run_computational_sidecar_dask(
707710
708711
709712@pytest .mark .parametrize ( 
710-     "integration_version, boot_mode" , [("1.0.0" , BootMode .CPU )], indirect = True  
713+     "integration_version, boot_mode, task_owner" , 
714+     [("1.0.0" , BootMode .CPU , "no_parent_node" )], 
715+     indirect = True , 
711716) 
712717async  def  test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub (
713718    dask_client : distributed .Client ,
714719    sidecar_task : Callable [..., ServiceExampleParam ],
715720    progress_sub : distributed .Sub ,
716721    mocked_get_image_labels : mock .Mock ,
717722    log_rabbit_client_parser : mock .AsyncMock ,
718-     task_owner : TaskOwner ,
719723):
720724    mocked_get_image_labels .assert_not_called ()
721725    NUMBER_OF_LOGS  =  20000 
@@ -761,11 +765,7 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
761765    ]
762766    # check all the awaited logs are in there 
763767    filtered_worker_logs  =  filter (lambda  log : "This is iteration"  in  log , worker_logs )
764-     assert  (
765-         len (list (filtered_worker_logs )) ==  (2  *  NUMBER_OF_LOGS )
766-         if  task_owner .has_parent 
767-         else  NUMBER_OF_LOGS 
768-     )
768+     assert  len (list (filtered_worker_logs )) ==  NUMBER_OF_LOGS 
769769    mocked_get_image_labels .assert_called ()
770770
771771
0 commit comments