Skip to content

Commit 26340dd

Browse files
committed
Revert "temporary remove this test"
This reverts commit e680984.
1 parent 45a2813 commit 26340dd

File tree

2 files changed

+279
-0
lines changed

2 files changed

+279
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
from collections.abc import Callable, Iterable
3+
from datetime import timedelta
4+
from typing import Any
5+
6+
import pytest
7+
from celery import Celery
8+
from celery.contrib.testing.worker import TestWorkController, start_worker
9+
from celery.signals import worker_init, worker_shutdown
10+
from pytest_simcore.helpers.typing_env import EnvVarsDict
11+
from servicelib.logging_utils import config_all_loggers
12+
from simcore_service_storage.core.settings import ApplicationSettings
13+
from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient
14+
from simcore_service_storage.modules.celery.signals import (
15+
on_worker_init,
16+
on_worker_shutdown,
17+
)
18+
19+
20+
@pytest.fixture
21+
def celery_conf() -> dict[str, Any]:
22+
return {
23+
"broker_url": "memory://",
24+
"result_backend": "cache+memory://",
25+
"result_expires": timedelta(days=7),
26+
"result_extended": True,
27+
"pool": "threads",
28+
"worker_send_task_events": True,
29+
"task_track_started": True,
30+
"task_send_sent_event": True,
31+
"broker_connection_retry_on_startup": True,
32+
}
33+
34+
35+
@pytest.fixture
36+
def celery_app(celery_conf: dict[str, Any]):
37+
return Celery(**celery_conf)
38+
39+
40+
@pytest.fixture
41+
def register_celery_tasks() -> Callable[[Celery], None]:
42+
msg = "please define a callback that registers the tasks"
43+
raise NotImplementedError(msg)
44+
45+
46+
@pytest.fixture
47+
def celery_client(
48+
app_environment: EnvVarsDict, celery_app: Celery
49+
) -> CeleryTaskQueueClient:
50+
return CeleryTaskQueueClient(celery_app)
51+
52+
53+
@pytest.fixture
54+
def celery_worker_controller(
55+
app_environment: EnvVarsDict,
56+
app_settings: ApplicationSettings,
57+
register_celery_tasks: Callable[[Celery], None],
58+
celery_app: Celery,
59+
) -> Iterable[TestWorkController]:
60+
# Signals must be explicitily connected
61+
logging.basicConfig(level=logging.WARNING) # NOSONAR
62+
logging.root.setLevel(app_settings.log_level)
63+
config_all_loggers(
64+
log_format_local_dev_enabled=app_settings.STORAGE_LOG_FORMAT_LOCAL_DEV_ENABLED,
65+
logger_filter_mapping=app_settings.STORAGE_LOG_FILTER_MAPPING,
66+
tracing_settings=app_settings.STORAGE_TRACING,
67+
)
68+
worker_init.connect(on_worker_init)
69+
worker_shutdown.connect(on_worker_shutdown)
70+
71+
register_celery_tasks(celery_app)
72+
73+
with start_worker(
74+
celery_app,
75+
pool="threads",
76+
loglevel="info",
77+
perform_ping_check=False,
78+
worker_kwargs={"hostname": "celery@worker1"},
79+
) as worker:
80+
worker_init.send(sender=worker)
81+
82+
yield worker
83+
84+
worker_shutdown.send(sender=worker)
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import asyncio
2+
import logging
3+
import time
4+
from collections.abc import Callable
5+
from random import randint
6+
7+
import pytest
8+
from celery import Celery, Task
9+
from celery.contrib.abortable import AbortableTask
10+
from common_library.errors_classes import OsparcErrorMixin
11+
from models_library.progress_bar import ProgressReport
12+
from pydantic import TypeAdapter, ValidationError
13+
from servicelib.logging_utils import log_context
14+
from simcore_service_storage.modules.celery import get_event_loop
15+
from simcore_service_storage.modules.celery._task import define_task
16+
from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient
17+
from simcore_service_storage.modules.celery.models import (
18+
TaskContext,
19+
TaskError,
20+
TaskState,
21+
)
22+
from simcore_service_storage.modules.celery.utils import (
23+
get_celery_worker,
24+
get_fastapi_app,
25+
)
26+
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
27+
28+
_logger = logging.getLogger(__name__)
29+
30+
pytest_simcore_core_services_selection = ["postgres", "rabbit"]
31+
pytest_simcore_ops_services_selection = []
32+
33+
34+
async def _async_archive(
35+
celery_app: Celery, task_name: str, task_id: str, files: list[str]
36+
) -> str:
37+
worker = get_celery_worker(celery_app)
38+
39+
def sleep_for(seconds: float) -> None:
40+
time.sleep(seconds)
41+
42+
for n, file in enumerate(files, start=1):
43+
with log_context(_logger, logging.INFO, msg=f"Processing file {file}"):
44+
worker.set_task_progress(
45+
task_name=task_name,
46+
task_id=task_id,
47+
report=ProgressReport(actual_value=n / len(files) * 10),
48+
)
49+
await asyncio.get_event_loop().run_in_executor(None, sleep_for, 1)
50+
51+
return "archive.zip"
52+
53+
54+
def sync_archive(task: Task, files: list[str]) -> str:
55+
assert task.name
56+
_logger.info("Calling async_archive")
57+
return asyncio.run_coroutine_threadsafe(
58+
_async_archive(task.app, task.name, task.request.id, files),
59+
get_event_loop(get_fastapi_app(task.app)),
60+
).result()
61+
62+
63+
class MyError(OsparcErrorMixin, Exception):
64+
msg_template = "Something strange happened: {msg}"
65+
66+
67+
def failure_task(task: Task):
68+
assert task
69+
msg = "BOOM!"
70+
raise MyError(msg=msg)
71+
72+
73+
def dreamer_task(task: AbortableTask) -> list[int]:
74+
numbers = []
75+
for _ in range(30):
76+
if task.is_aborted():
77+
_logger.warning("Alarm clock")
78+
return numbers
79+
numbers.append(randint(1, 90)) # noqa: S311
80+
time.sleep(0.1)
81+
return numbers
82+
83+
84+
@pytest.fixture
85+
def register_celery_tasks() -> Callable[[Celery], None]:
86+
def _(celery_app: Celery) -> None:
87+
define_task(celery_app, sync_archive)
88+
define_task(celery_app, failure_task)
89+
define_task(celery_app, dreamer_task)
90+
91+
return _
92+
93+
94+
@pytest.mark.usefixtures("celery_worker")
95+
async def test_submitting_task_calling_async_function_results_with_success_state(
96+
celery_client: CeleryTaskQueueClient,
97+
):
98+
task_context = TaskContext(user_id=42)
99+
100+
task_uuid = await celery_client.send_task(
101+
"sync_archive",
102+
task_context=task_context,
103+
files=[f"file{n}" for n in range(5)],
104+
)
105+
106+
for attempt in Retrying(
107+
retry=retry_if_exception_type(AssertionError),
108+
wait=wait_fixed(1),
109+
stop=stop_after_delay(30),
110+
):
111+
with attempt:
112+
status = await celery_client.get_task_status(task_context, task_uuid)
113+
assert status.task_state == TaskState.SUCCESS
114+
115+
assert (
116+
await celery_client.get_task_status(task_context, task_uuid)
117+
).task_state == TaskState.SUCCESS
118+
assert (
119+
await celery_client.get_task_result(task_context, task_uuid)
120+
) == "archive.zip"
121+
122+
123+
@pytest.mark.usefixtures("celery_worker")
124+
async def test_submitting_task_with_failure_results_with_error(
125+
celery_client: CeleryTaskQueueClient,
126+
):
127+
task_context = TaskContext(user_id=42)
128+
129+
task_uuid = await celery_client.send_task("failure_task", task_context=task_context)
130+
131+
for attempt in Retrying(
132+
retry=retry_if_exception_type((AssertionError, ValidationError)),
133+
wait=wait_fixed(1),
134+
stop=stop_after_delay(30),
135+
):
136+
with attempt:
137+
raw_result = await celery_client.get_task_result(task_context, task_uuid)
138+
result = TypeAdapter(TaskError).validate_python(raw_result)
139+
assert isinstance(result, TaskError)
140+
141+
assert (
142+
await celery_client.get_task_status(task_context, task_uuid)
143+
).task_state == TaskState.ERROR
144+
raw_result = await celery_client.get_task_result(task_context, task_uuid)
145+
result = TypeAdapter(TaskError).validate_python(raw_result)
146+
assert f"{result.exc_msg}" == "Something strange happened: BOOM!"
147+
148+
149+
@pytest.mark.usefixtures("celery_worker")
150+
async def test_aborting_task_results_with_aborted_state(
151+
celery_client: CeleryTaskQueueClient,
152+
):
153+
task_context = TaskContext(user_id=42)
154+
155+
task_uuid = await celery_client.send_task(
156+
"dreamer_task",
157+
task_context=task_context,
158+
)
159+
160+
await celery_client.abort_task(task_context, task_uuid)
161+
162+
for attempt in Retrying(
163+
retry=retry_if_exception_type(AssertionError),
164+
wait=wait_fixed(1),
165+
stop=stop_after_delay(30),
166+
):
167+
with attempt:
168+
progress = await celery_client.get_task_status(task_context, task_uuid)
169+
assert progress.task_state == TaskState.ABORTED
170+
171+
assert (
172+
await celery_client.get_task_status(task_context, task_uuid)
173+
).task_state == TaskState.ABORTED
174+
175+
176+
@pytest.mark.usefixtures("celery_worker")
177+
async def test_listing_task_uuids_contains_submitted_task(
178+
celery_client: CeleryTaskQueueClient,
179+
):
180+
task_context = TaskContext(user_id=42)
181+
182+
task_uuid = await celery_client.send_task(
183+
"dreamer_task",
184+
task_context=task_context,
185+
)
186+
187+
for attempt in Retrying(
188+
retry=retry_if_exception_type(AssertionError),
189+
wait=wait_fixed(0.1),
190+
stop=stop_after_delay(10),
191+
):
192+
with attempt:
193+
assert task_uuid in await celery_client.get_task_uuids(task_context)
194+
195+
assert task_uuid in await celery_client.get_task_uuids(task_context)

0 commit comments

Comments
 (0)