|
1 | 1 | import re |
2 | | -from collections.abc import Callable |
3 | 2 |
|
4 | 3 | import pytest |
5 | | -from celery import Celery |
| 4 | +from celery import Task |
| 5 | +from models_library.progress_bar import ProgressReport, ProgressStructuredMessage |
6 | 6 | from models_library.users import UserID |
7 | 7 | from simcore_service_storage.api._worker_tasks._data_export import data_export |
8 | | -from simcore_service_storage.modules.celery._task import define_task |
9 | | -from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient |
10 | | -from simcore_service_storage.modules.celery.models import ( |
11 | | - TaskContext, |
12 | | - TaskState, |
13 | | -) |
14 | | -from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed |
15 | 8 |
|
16 | 9 | pytest_simcore_core_services_selection = [ |
17 | 10 | "postgres", |
18 | 11 | "rabbit", |
19 | 12 | ] |
20 | 13 |
|
21 | 14 |
|
22 | | -@pytest.fixture |
23 | | -def register_celery_tasks() -> Callable[[Celery], None]: |
24 | | - def _(celery_app: Celery) -> None: |
25 | | - define_task(celery_app, data_export) |
26 | | - |
27 | | - return _ |
28 | | - |
29 | | - |
30 | 15 | @pytest.mark.usefixtures("celery_worker") |
31 | | -async def test_data_export(celery_client: CeleryTaskQueueClient, user_id: UserID): |
32 | | - task_context = TaskContext() |
33 | | - |
34 | | - task_uuid = await celery_client.send_task( |
35 | | - data_export.__name__, |
36 | | - task_context=task_context, |
37 | | - user_id=user_id, |
38 | | - paths_to_export=[], |
39 | | - ) |
40 | | - |
41 | | - for attempt in Retrying( |
42 | | - retry=retry_if_exception_type(AssertionError), |
43 | | - wait=wait_fixed(1), |
44 | | - stop=stop_after_delay(30), |
45 | | - ): |
46 | | - with attempt: |
47 | | - status = await celery_client.get_task_status(task_context, task_uuid) |
48 | | - assert status.task_state == TaskState.SUCCESS |
49 | | - |
50 | | - assert ( |
51 | | - await celery_client.get_task_status(task_context, task_uuid) |
52 | | - ).task_state == TaskState.SUCCESS |
53 | | - |
54 | | - result = await celery_client.get_task_result(task_context, task_uuid) |
| 16 | +async def test_data_export( |
| 17 | + mock_task_progress: list[ProgressReport], fake_celery_task: Task, user_id: UserID |
| 18 | +): |
| 19 | + result = await data_export(fake_celery_task, user_id=user_id, paths_to_export=[]) |
55 | 20 | assert re.fullmatch( |
56 | 21 | rf"^exports/{user_id}/[0-9a-fA-F]{{8}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{4}}-[0-9a-fA-F]{{12}}\.zip$", |
57 | 22 | result, |
58 | 23 | ) |
| 24 | + |
| 25 | + assert mock_task_progress == [ |
| 26 | + ProgressReport( |
| 27 | + actual_value=0.0, |
| 28 | + total=1.0, |
| 29 | + attempt=0, |
| 30 | + unit=None, |
| 31 | + message=ProgressStructuredMessage( |
| 32 | + description="create and upload export", |
| 33 | + current=0.0, |
| 34 | + total=1, |
| 35 | + unit=None, |
| 36 | + sub=None, |
| 37 | + ), |
| 38 | + ), |
| 39 | + ProgressReport( |
| 40 | + actual_value=1.0, |
| 41 | + total=1.0, |
| 42 | + attempt=0, |
| 43 | + unit=None, |
| 44 | + message=ProgressStructuredMessage( |
| 45 | + description="create and upload export", |
| 46 | + current=1.0, |
| 47 | + total=1, |
| 48 | + unit=None, |
| 49 | + sub=None, |
| 50 | + ), |
| 51 | + ), |
| 52 | + ] |
0 commit comments