Skip to content

Commit e8683a9

Browse files
committed
add separate rabbitmq queue for api-worker
1 parent 329354f commit e8683a9

File tree

6 files changed

+35
-28
lines changed

6 files changed

+35
-28
lines changed

packages/service-library/src/servicelib/celery/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class TaskState(StrEnum):
2828
class TasksQueue(StrEnum):
2929
CPU_BOUND = "cpu_bound"
3030
DEFAULT = "default"
31+
API_WORKER_QUEUE = "api_worker_queue"
3132

3233

3334
class TaskMetadata(BaseModel):

services/api-server/src/simcore_service_api_server/api/routes/functions_routes.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from models_library.projects import ProjectID
2222
from models_library.projects_nodes_io import NodeID
2323
from models_library.users import UserID
24-
from servicelib.celery.models import TaskFilter, TaskMetadata
24+
from servicelib.celery.models import TaskFilter, TaskMetadata, TasksQueue
2525
from servicelib.fastapi.dependencies import get_reverse_url_mapper
2626
from servicelib.long_running_tasks.models import TaskGet
2727

@@ -356,6 +356,8 @@ async def run_function( # noqa: PLR0913
356356
task_uuid = await task_manager.submit_task(
357357
TaskMetadata(
358358
name=task_name,
359+
ephemeral=True,
360+
queue=TasksQueue.API_WORKER_QUEUE,
359361
),
360362
task_filter=task_filter,
361363
user_identity=user_identity,

services/api-server/src/simcore_service_api_server/celery/worker_main.py

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,33 +16,34 @@
1616
from .worker_tasks.tasks import setup_worker_tasks
1717

1818

19-
def app_factory():
20-
_settings = ApplicationSettings.create_from_envs()
21-
22-
setup_loggers(
23-
log_format_local_dev_enabled=_settings.API_SERVER_LOG_FORMAT_LOCAL_DEV_ENABLED,
24-
logger_filter_mapping=_settings.API_SERVER_LOG_FILTER_MAPPING,
25-
tracing_settings=_settings.API_SERVER_TRACING,
26-
log_base_level=_settings.log_level,
27-
noisy_loggers=None,
28-
)
19+
def _get_settings() -> ApplicationSettings:
20+
return ApplicationSettings.create_from_envs()
2921

30-
assert _settings.API_SERVER_CELERY # nosec
31-
app = create_celery_app(_settings.API_SERVER_CELERY)
3222

33-
app_server = FastAPIAppServer(app=create_app(_settings))
23+
_settings = _get_settings()
24+
25+
setup_loggers(
26+
log_format_local_dev_enabled=_settings.API_SERVER_LOG_FORMAT_LOCAL_DEV_ENABLED,
27+
logger_filter_mapping=_settings.API_SERVER_LOG_FILTER_MAPPING,
28+
tracing_settings=_settings.API_SERVER_TRACING,
29+
log_base_level=_settings.log_level,
30+
noisy_loggers=None,
31+
)
32+
33+
assert _settings.API_SERVER_CELERY # nosec
34+
app = create_celery_app(_settings.API_SERVER_CELERY)
35+
36+
app_server = FastAPIAppServer(app=create_app(_settings))
3437

35-
def worker_init_wrapper(sender, **_kwargs):
36-
assert _settings.API_SERVER_CELERY # nosec
37-
return partial(on_worker_init, app_server, _settings.API_SERVER_CELERY)(
38-
sender, **_kwargs
39-
)
4038

41-
worker_init.connect(worker_init_wrapper)
42-
worker_shutdown.connect(on_worker_shutdown)
39+
def worker_init_wrapper(sender, **_kwargs):
40+
assert _settings.API_SERVER_CELERY # nosec
41+
return partial(on_worker_init, app_server, _settings.API_SERVER_CELERY)(
42+
sender, **_kwargs
43+
)
4344

44-
setup_worker_tasks(app)
45-
return app
4645

46+
worker_init.connect(worker_init_wrapper)
47+
worker_shutdown.connect(on_worker_shutdown)
4748

48-
app = app_factory()
49+
setup_worker_tasks(app)

services/api-server/tests/unit/celery/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,6 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs):
135135
concurrency=1,
136136
loglevel="info",
137137
perform_ping_check=False,
138-
queues="default",
138+
queues="api_worker_queue",
139139
) as worker:
140140
yield worker

services/api-server/tests/unit/celery/test_functions.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
RegisteredProjectFunctionJob,
2626
)
2727
from models_library.projects import ProjectID
28-
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata
28+
from servicelib.celery.models import TaskFilter, TaskID, TaskMetadata, TasksQueue
2929
from servicelib.common_headers import (
3030
X_SIMCORE_PARENT_NODE_ID,
3131
X_SIMCORE_PARENT_PROJECT_UUID,
@@ -37,7 +37,7 @@
3737
get_task_manager,
3838
)
3939
from simcore_service_api_server.api.routes.functions_routes import get_function
40-
from simcore_service_api_server.celery._worker_tasks._functions_tasks import (
40+
from simcore_service_api_server.celery.worker_tasks.functions_tasks import (
4141
run_function as run_function_task,
4242
)
4343
from simcore_service_api_server.exceptions.backend_errors import BaseBackEndError
@@ -211,7 +211,9 @@ async def test_celery_error_propagation(
211211
)
212212
task_manager = get_task_manager(app=app)
213213
task_uuid = await task_manager.submit_task(
214-
task_metadata=TaskMetadata(name="exception_task"),
214+
task_metadata=TaskMetadata(
215+
name="exception_task", queue=TasksQueue.API_WORKER_QUEUE
216+
),
215217
task_filter=TaskFilter.model_validate(job_filter.model_dump()),
216218
)
217219

services/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ services:
9595
API_SERVER_WORKER_NAME: "api-worker-{{.Node.Hostname}}-{{.Task.Slot}}-{{.Task.ID}}"
9696
API_SERVER_WORKER_MODE: "true"
9797
CELERY_CONCURRENCY: 100
98+
CELERY_QUEUES: "api_worker_queue"
9899
networks: *api_server_networks
99100

100101

0 commit comments

Comments
 (0)