Skip to content

Commit 5e11322

Browse files
šŸ› Celery tasks list doesn't include submitted, active and scheduled ones (#7366)
1 parent 755a3b2 commit 5e11322

File tree

5 files changed

+60
-22
lines changed

5 files changed

+60
-22
lines changed

ā€Žservices/storage/src/simcore_service_storage/api/rpc/_data_export.pyā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async def start_data_export(
5252
) from err
5353

5454
task_uuid = await get_celery_client(app).send_task(
55-
"export_data_with_error",
55+
"export_data",
5656
task_context=job_id_data.model_dump(),
5757
files=data_export_start.file_and_folder_ids, # ANE: adapt here your signature
5858
)

ā€Žservices/storage/src/simcore_service_storage/modules/celery/_common.pyā€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ def create_app(celery_settings: CelerySettings) -> Celery:
2727
app.conf.result_expires = celery_settings.CELERY_RESULT_EXPIRES
2828
app.conf.result_extended = True # original args are included in the results
2929
app.conf.result_serializer = "json"
30+
app.conf.task_send_sent_event = True
3031
app.conf.task_track_started = True
32+
app.conf.worker_send_task_events = True # enable tasks monitoring
33+
3134
return app
3235

3336

ā€Žservices/storage/src/simcore_service_storage/modules/celery/client.pyā€Ž

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
_CELERY_INSPECT_TASK_STATUSES: Final[tuple[str, ...]] = (
2020
"active",
21-
"registered",
2221
"scheduled",
2322
"revoked",
2423
)
@@ -131,21 +130,22 @@ def _get_completed_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
131130

132131
@make_async()
133132
def get_task_uuids(self, task_context: TaskContext) -> set[TaskUUID]:
134-
all_task_ids = self._get_completed_task_uuids(task_context)
133+
task_uuids = self._get_completed_task_uuids(task_context)
135134

136-
search_key = _CELERY_TASK_META_PREFIX + _build_task_id_prefix(task_context)
135+
task_id_prefix = _build_task_id_prefix(task_context)
136+
inspect = self._celery_app.control.inspect()
137137
for task_inspect_status in _CELERY_INSPECT_TASK_STATUSES:
138-
if task_ids := getattr(
139-
self._celery_app.control.inspect(), task_inspect_status
140-
)():
141-
for values in task_ids.values():
142-
for value in values:
143-
all_task_ids.add(
144-
TaskUUID(
145-
value.removeprefix(
146-
search_key + _CELERY_TASK_ID_KEY_SEPARATOR
147-
)
148-
)
149-
)
150-
151-
return all_task_ids
138+
tasks = getattr(inspect, task_inspect_status)() or {}
139+
140+
task_uuids.update(
141+
TaskUUID(
142+
task_info["id"].removeprefix(
143+
task_id_prefix + _CELERY_TASK_ID_KEY_SEPARATOR
144+
)
145+
)
146+
for tasks_per_worker in tasks.values()
147+
for task_info in tasks_per_worker
148+
if "id" in task_info
149+
)
150+
151+
return task_uuids

ā€Žservices/storage/tests/unit/modules/celery/conftest.pyā€Ž

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ def celery_conf() -> dict[str, Any]:
4343
"result_expires": timedelta(days=7),
4444
"result_extended": True,
4545
"pool": "threads",
46+
"worker_send_task_events": True,
47+
"task_track_started": True,
48+
"task_send_sent_event": True,
4649
}
4750

4851

@@ -77,7 +80,13 @@ def celery_worker_controller(
7780

7881
register_celery_tasks(celery_app)
7982

80-
with start_worker(celery_app, loglevel="info", perform_ping_check=False) as worker:
83+
with start_worker(
84+
celery_app,
85+
pool="threads",
86+
loglevel="info",
87+
perform_ping_check=False,
88+
worker_kwargs={"hostname": "celery@worker1"},
89+
) as worker:
8190
worker_init.send(sender=worker)
8291

8392
yield worker

ā€Žservices/storage/tests/unit/modules/celery/test_celery.pyā€Ž

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,21 @@
44
from collections.abc import Callable
55
from random import randint
66

7-
from pydantic import TypeAdapter, ValidationError
87
import pytest
98
from celery import Celery, Task
109
from celery.contrib.abortable import AbortableTask
1110
from common_library.errors_classes import OsparcErrorMixin
1211
from models_library.progress_bar import ProgressReport
12+
from pydantic import TypeAdapter, ValidationError
1313
from servicelib.logging_utils import log_context
1414
from simcore_service_storage.modules.celery import get_event_loop
1515
from simcore_service_storage.modules.celery._common import define_task
1616
from simcore_service_storage.modules.celery.client import CeleryTaskQueueClient
17-
from simcore_service_storage.modules.celery.models import TaskContext, TaskError, TaskState
17+
from simcore_service_storage.modules.celery.models import (
18+
TaskContext,
19+
TaskError,
20+
TaskState,
21+
)
1822
from simcore_service_storage.modules.celery.utils import (
1923
get_celery_worker,
2024
get_fastapi_app,
@@ -54,7 +58,7 @@ def sync_archive(task: Task, files: list[str]) -> str:
5458

5559

5660
class MyError(OsparcErrorMixin, Exception):
57-
msg_template = "Something strange happened: {msg}"
61+
msg_template = "Something strange happened: {msg}"
5862

5963

6064
def failure_task(task: Task):
@@ -163,3 +167,25 @@ async def test_aborting_task_results_with_aborted_state(
163167
assert (
164168
await celery_client.get_task_status(task_context, task_uuid)
165169
).task_state == TaskState.ABORTED
170+
171+
172+
@pytest.mark.usefixtures("celery_worker")
173+
async def test_listing_task_uuids_contains_submitted_task(
174+
celery_client: CeleryTaskQueueClient,
175+
):
176+
task_context = TaskContext(user_id=42)
177+
178+
task_uuid = await celery_client.send_task(
179+
"dreamer_task",
180+
task_context=task_context,
181+
)
182+
183+
for attempt in Retrying(
184+
retry=retry_if_exception_type(AssertionError),
185+
wait=wait_fixed(1),
186+
stop=stop_after_delay(10),
187+
):
188+
with attempt:
189+
assert task_uuid in await celery_client.get_task_uuids(task_context)
190+
191+
assert task_uuid in await celery_client.get_task_uuids(task_context)

0 commit comments

Comments
Ā (0)