Skip to content

Commit 3bb2c6b

Browse files
fix tests
1 parent 26160a8 commit 3bb2c6b

File tree

3 files changed

+105
-67
lines changed

3 files changed

+105
-67
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import asyncio
2+
import logging
3+
import threading
4+
from typing import Final
5+
6+
from asgi_lifespan import LifespanManager
7+
from celery import Celery
8+
from fastapi import FastAPI
9+
from servicelib.async_utils import cancel_wait_task
10+
from simcore_service_storage.core.application import create_app
11+
from simcore_service_storage.core.settings import ApplicationSettings
12+
from simcore_service_storage.modules.celery import get_event_loop, set_event_loop
13+
from simcore_service_storage.modules.celery.utils import (
14+
get_fastapi_app,
15+
set_celery_worker,
16+
set_fastapi_app,
17+
)
18+
from simcore_service_storage.modules.celery.worker import CeleryTaskQueueWorker
19+
20+
_logger = logging.getLogger(__name__)
21+
22+
_LIFESPAN_TIMEOUT: Final[int] = 10
23+
24+
25+
def on_worker_init(sender, **_kwargs):
26+
def _init_fastapi():
27+
loop = asyncio.new_event_loop()
28+
asyncio.set_event_loop(loop)
29+
shutdown_event = asyncio.Event()
30+
31+
fastapi_app = create_app(ApplicationSettings.create_from_envs())
32+
33+
async def lifespan():
34+
async with LifespanManager(
35+
fastapi_app,
36+
startup_timeout=_LIFESPAN_TIMEOUT,
37+
shutdown_timeout=_LIFESPAN_TIMEOUT,
38+
):
39+
try:
40+
await shutdown_event.wait()
41+
except asyncio.CancelledError:
42+
_logger.warning("Lifespan task cancelled")
43+
44+
lifespan_task = loop.create_task(lifespan())
45+
fastapi_app.state.lifespan_task = lifespan_task
46+
fastapi_app.state.shutdown_event = shutdown_event
47+
set_event_loop(fastapi_app, loop)
48+
49+
set_fastapi_app(sender.app, fastapi_app)
50+
set_celery_worker(sender.app, CeleryTaskQueueWorker(sender.app))
51+
52+
loop.run_forever()
53+
54+
thread = threading.Thread(target=_init_fastapi, daemon=True)
55+
thread.start()
56+
57+
58+
def on_worker_shutdown(sender, **_kwargs):
59+
assert isinstance(sender.app, Celery)
60+
61+
fastapi_app = get_fastapi_app(sender.app)
62+
assert isinstance(fastapi_app, FastAPI)
63+
event_loop = get_event_loop(fastapi_app)
64+
65+
async def shutdown():
66+
fastapi_app.state.shutdown_event.set()
67+
68+
await cancel_wait_task(fastapi_app.state.lifespan_task, max_delay=5)
69+
70+
asyncio.run_coroutine_threadsafe(shutdown(), event_loop)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import logging
2+
import time
3+
4+
from celery import Task
5+
from models_library.progress_bar import ProgressReport
6+
from models_library.projects_nodes_io import StorageFileID
7+
from servicelib.logging_utils import log_context
8+
from simcore_service_storage.modules.celery.utils import get_celery_worker
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
13+
def export_data(task: Task, files: list[StorageFileID]):
14+
for n, file in enumerate(files, start=1):
15+
with log_context(
16+
_logger,
17+
logging.INFO,
18+
msg=f"Exporting {file=} ({n}/{len(files)})",
19+
):
20+
assert task.name
21+
get_celery_worker(task.app).set_task_progress(
22+
task_name=task.name,
23+
task_id=task.request.id,
24+
report=ProgressReport(actual_value=n / len(files) * 100),
25+
)
26+
time.sleep(10)
27+
return "done"
Lines changed: 8 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,18 @@
11
"""Main application to be deployed in for example uvicorn."""
22

3-
import asyncio
43
import logging
5-
import threading
6-
from typing import Final
74

8-
from asgi_lifespan import LifespanManager
9-
from celery import Celery
5+
from celery.contrib.abortable import AbortableTask
106
from celery.signals import worker_init, worker_shutdown
11-
from fastapi import FastAPI
12-
from servicelib.background_task import cancel_wait_task
137
from servicelib.logging_utils import config_all_loggers
14-
from simcore_service_storage.modules.celery import get_event_loop, set_event_loop
15-
from simcore_service_storage.modules.celery.utils import (
16-
CeleryTaskQueueWorker,
17-
get_fastapi_app,
18-
set_celery_worker,
19-
set_fastapi_app,
8+
from simcore_service_storage.modules.celery.signals import (
9+
on_worker_init,
10+
on_worker_shutdown,
2011
)
2112

22-
from ...core.application import create_app
2313
from ...core.settings import ApplicationSettings
2414
from ._common import create_app as create_celery_app
15+
from .tasks import export_data
2516

2617
_settings = ApplicationSettings.create_from_envs()
2718

@@ -35,58 +26,8 @@
3526

3627
_logger = logging.getLogger(__name__)
3728

38-
_LIFESPAN_TIMEOUT: Final[int] = 10
39-
40-
41-
@worker_init.connect
42-
def on_worker_init(sender, **_kwargs):
43-
def _init_fastapi():
44-
loop = asyncio.new_event_loop()
45-
asyncio.set_event_loop(loop)
46-
shutdown_event = asyncio.Event()
47-
48-
fastapi_app = create_app(_settings)
49-
50-
async def lifespan():
51-
async with LifespanManager(
52-
fastapi_app,
53-
startup_timeout=_LIFESPAN_TIMEOUT,
54-
shutdown_timeout=_LIFESPAN_TIMEOUT,
55-
):
56-
try:
57-
await shutdown_event.wait()
58-
except asyncio.CancelledError:
59-
_logger.warning("Lifespan task cancelled")
60-
61-
lifespan_task = loop.create_task(lifespan())
62-
fastapi_app.state.lifespan_task = lifespan_task
63-
fastapi_app.state.shutdown_event = shutdown_event
64-
set_event_loop(fastapi_app, loop)
65-
66-
set_fastapi_app(sender.app, fastapi_app)
67-
set_celery_worker(sender.app, CeleryTaskQueueWorker(sender.app))
68-
69-
loop.run_forever()
70-
71-
thread = threading.Thread(target=_init_fastapi, daemon=True)
72-
thread.start()
73-
74-
75-
@worker_shutdown.connect
76-
def on_worker_shutdown(sender, **_kwargs):
77-
assert isinstance(sender.app, Celery)
78-
79-
fastapi_app = get_fastapi_app(sender.app)
80-
assert isinstance(fastapi_app, FastAPI)
81-
event_loop = get_event_loop(fastapi_app)
82-
83-
async def shutdown():
84-
fastapi_app.state.shutdown_event.set()
85-
86-
await cancel_wait_task(fastapi_app.state.lifespan_task, max_delay=5)
87-
88-
asyncio.run_coroutine_threadsafe(shutdown(), event_loop)
89-
90-
9129
assert _settings.STORAGE_CELERY
9230
app = create_celery_app(_settings.STORAGE_CELERY)
31+
worker_init.connect(on_worker_init)
32+
worker_shutdown.connect(on_worker_shutdown)
33+
app.task(name="export_data", bind=True, base=AbortableTask)(export_data)

0 commit comments

Comments
 (0)