4747from pydantic import TypeAdapter
4848from pytest_mock .plugin import MockerFixture
4949from servicelib .rabbitmq import RabbitMQClient
50+ from servicelib .rabbitmq ._constants import BIND_TO_ALL_TOPICS
5051from simcore_postgres_database .models .comp_runs import comp_runs
5152from simcore_postgres_database .models .comp_tasks import NodeClass
5253from simcore_service_director_v2 .core .errors import (
@@ -350,7 +351,9 @@ async def computational_pipeline_rabbit_client_parser(
350351 client = create_rabbitmq_client ("computational_pipeline_pytest_consumer" )
351352 mock = mocker .AsyncMock (return_value = True )
352353 queue_name , _ = await client .subscribe (
353- ComputationalPipelineStatusMessage .get_channel_name (), mock
354+ ComputationalPipelineStatusMessage .get_channel_name (),
355+ mock ,
356+ topics = [BIND_TO_ALL_TOPICS ],
354357 )
355358 yield mock
356359 await client .unsubscribe (queue_name )
@@ -361,6 +364,22 @@ async def _assert_message_received(
361364 expected_call_count : int ,
362365 message_parser : Callable ,
363366) -> list :
367+ if expected_call_count == 0 :
368+ # ensure it remains so for a few seconds
369+ mocked_message_parser .assert_not_called ()
370+ async for attempt in AsyncRetrying (
371+ wait = wait_fixed (1 ),
372+ stop = stop_after_delay (3 ),
373+ retry = retry_if_exception_type (AssertionError ),
374+ reraise = True ,
375+ ):
376+ with attempt :
377+ print (
378+ f"--> waiting for rabbitmq message [{ attempt .retry_state .attempt_number } , { attempt .retry_state .idle_for } ]"
379+ )
380+ mocked_message_parser .assert_not_called ()
381+
382+ return []
364383 async for attempt in AsyncRetrying (
365384 wait = wait_fixed (0.1 ),
366385 stop = stop_after_delay (5 ),
@@ -371,7 +390,9 @@ async def _assert_message_received(
371390 print (
372391 f"--> waiting for rabbitmq message [{ attempt .retry_state .attempt_number } , { attempt .retry_state .idle_for } ]"
373392 )
374- assert mocked_message_parser .call_count == expected_call_count
393+ assert mocked_message_parser .call_count == expected_call_count , (
394+ mocked_message_parser .call_args_list
395+ )
375396 print (
376397 f"<-- rabbitmq message received after [{ attempt .retry_state .attempt_number } , { attempt .retry_state .idle_for } ]"
377398 )
@@ -1615,7 +1636,7 @@ async def mocked_get_task_result(_job_id: str) -> TaskOutputData:
16151636 )
16161637 await _assert_message_received (
16171638 computational_pipeline_rabbit_client_parser ,
1618- 1 ,
1639+ 0 ,
16191640 ComputationalPipelineStatusMessage .model_validate_json ,
16201641 )
16211642
0 commit comments