Skip to content

Commit c720845

Browse files
committed
use tenacity instead of sleeps
1 parent 82659c3 commit c720845

File tree

1 file changed

+51
-27
lines changed

1 file changed

+51
-27
lines changed

services/dask-sidecar/tests/unit/test_tasks.py

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@
6262
_s3fs_settings_from_s3_settings,
6363
)
6464
from simcore_service_dask_sidecar.worker import run_computational_sidecar
65+
from tenacity import (
66+
AsyncRetrying,
67+
retry_if_exception_type,
68+
stop_after_delay,
69+
wait_fixed,
70+
)
6571

6672
_logger = logging.getLogger(__name__)
6773

@@ -674,22 +680,30 @@ async def test_run_computational_sidecar_dask(
674680
), "ordering of progress values incorrectly sorted!"
675681
assert worker_progresses[0] == 0, "missing/incorrect initial progress value"
676682
assert worker_progresses[-1] == 1, "missing/incorrect final progress value"
677-
await asyncio.sleep(5)
678-
assert log_rabbit_client_parser.called
679-
worker_logs = [
680-
message
681-
for msg in log_rabbit_client_parser.call_args_list
682-
for message in LoggerRabbitMessage.model_validate_json(msg.args[0]).messages
683-
]
684-
685-
print(f"<-- we got {len(worker_logs)} lines of logs")
686-
687-
for log in sleeper_task.expected_logs:
688-
r = re.compile(rf"^({log}).*")
689-
search_results = list(filter(r.search, worker_logs))
690-
assert (
691-
len(search_results) > 0
692-
), f"Could not find {log} in worker_logs:\n {pformat(worker_logs, width=240)}"
683+
async for attempt in AsyncRetrying(
684+
wait=wait_fixed(1),
685+
stop=stop_after_delay(30),
686+
reraise=True,
687+
retry=retry_if_exception_type(AssertionError),
688+
):
689+
with attempt:
690+
assert log_rabbit_client_parser.called
691+
worker_logs = [
692+
message
693+
for msg in log_rabbit_client_parser.call_args_list
694+
for message in LoggerRabbitMessage.model_validate_json(
695+
msg.args[0]
696+
).messages
697+
]
698+
699+
print(f"<-- we got {len(worker_logs)} lines of logs")
700+
701+
for log in sleeper_task.expected_logs:
702+
r = re.compile(rf"^({log}).*")
703+
search_results = list(filter(r.search, worker_logs))
704+
assert (
705+
len(search_results) > 0
706+
), f"Could not find {log} in worker_logs:\n {pformat(worker_logs, width=240)}"
693707

694708
# check that the task produce the expected data, not less not more
695709
assert isinstance(output_data, TaskOutputData)
@@ -755,17 +769,27 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
755769
assert worker_progresses[0] == 0, "missing/incorrect initial progress value"
756770
assert worker_progresses[-1] == 1, "missing/incorrect final progress value"
757771

758-
await asyncio.sleep(5)
759-
assert log_rabbit_client_parser.called
760-
761-
worker_logs = [
762-
message
763-
for msg in log_rabbit_client_parser.call_args_list
764-
for message in LoggerRabbitMessage.model_validate_json(msg.args[0]).messages
765-
]
766-
# check all the awaited logs are in there
767-
filtered_worker_logs = filter(lambda log: "This is iteration" in log, worker_logs)
768-
assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS
772+
async for attempt in AsyncRetrying(
773+
wait=wait_fixed(1),
774+
stop=stop_after_delay(30),
775+
reraise=True,
776+
retry=retry_if_exception_type(AssertionError),
777+
):
778+
with attempt:
779+
assert log_rabbit_client_parser.called
780+
781+
worker_logs = [
782+
message
783+
for msg in log_rabbit_client_parser.call_args_list
784+
for message in LoggerRabbitMessage.model_validate_json(
785+
msg.args[0]
786+
).messages
787+
]
788+
# check all the awaited logs are in there
789+
filtered_worker_logs = filter(
790+
lambda log: "This is iteration" in log, worker_logs
791+
)
792+
assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS
769793
mocked_get_image_labels.assert_called()
770794

771795

0 commit comments

Comments
 (0)