Skip to content

Commit 994b68a

Browse files
author
Ilyas Gasanov
committed
[DOP-21799] Refactor celery initialization
1 parent a093ada commit 994b68a

File tree

9 files changed

+77
-61
lines changed

9 files changed

+77
-61
lines changed

syncmaster/backend/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from celery import Celery
34
from fastapi import FastAPI, HTTPException
45
from fastapi.exceptions import RequestValidationError
56
from pydantic import ValidationError
@@ -20,6 +21,15 @@
2021
from syncmaster.exceptions import SyncmasterError
2122

2223

24+
def celery_factory(settings: Settings) -> Celery:
25+
app = Celery(
26+
__name__,
27+
broker=settings.broker.url,
28+
backend="db+" + settings.database.sync_url,
29+
)
30+
return app
31+
32+
2333
def application_factory(settings: Settings) -> FastAPI:
2434
application = FastAPI(
2535
title="Syncmaster",
@@ -30,6 +40,7 @@ def application_factory(settings: Settings) -> FastAPI:
3040
redoc_url=None,
3141
)
3242
application.state.settings = settings
43+
application.state.celery = celery_factory(settings)
3344
application.include_router(api_router)
3445
application.exception_handler(RequestValidationError)(validation_exception_handler)
3546
application.exception_handler(ValidationError)(validation_exception_handler)
@@ -44,6 +55,7 @@ def application_factory(settings: Settings) -> FastAPI:
4455
{
4556
Settings: lambda: settings,
4657
UnitOfWork: get_uow(session_factory, settings=settings),
58+
Celery: lambda: application.state.celery,
4759
},
4860
)
4961

syncmaster/backend/api/v1/runs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
from typing import Annotated
66

77
from asgi_correlation_id import correlation_id
8+
from celery import Celery
89
from fastapi import APIRouter, Depends, Query
910
from jinja2 import Template
1011
from kombu.exceptions import KombuError
1112

12-
from syncmaster.backend.celery import app as celery
1313
from syncmaster.backend.dependencies import Stub
1414
from syncmaster.backend.services import UnitOfWork, get_user
1515
from syncmaster.backend.settings import ServerAppSettings as Settings
@@ -84,6 +84,7 @@ async def read_run(
8484
async def start_run(
8585
create_run_data: CreateRunSchema,
8686
settings: Annotated[Settings, Depends(Stub(Settings))],
87+
celery: Annotated[Celery, Depends(Stub(Celery))],
8788
unit_of_work: UnitOfWork = Depends(UnitOfWork),
8889
current_user: User = Depends(get_user(is_active=True)),
8990
) -> ReadRunSchema:

syncmaster/backend/celery.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

syncmaster/scheduler/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
4-
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
3+
from celery import Celery
4+
5+
6+
def celery_factory(settings) -> Celery:
7+
app = Celery(
8+
__name__,
9+
broker=settings.broker.url,
10+
backend="db+" + settings.database.sync_url,
11+
)
12+
return app

syncmaster/scheduler/celery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.scheduler import celery_factory
34
from syncmaster.scheduler.settings import SchedulerAppSettings
4-
from syncmaster.worker import celery_factory
55

66
app = celery_factory(SchedulerAppSettings())

tests/conftest.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import time
55
from collections.abc import AsyncGenerator, Callable
66
from pathlib import Path
7+
from unittest.mock import AsyncMock, Mock
78

89
import pytest
910
import pytest_asyncio
1011
from alembic.config import Config as AlembicConfig
1112
from celery import Celery
13+
from fastapi import FastAPI
1214
from httpx import AsyncClient
1315
from sqlalchemy.ext.asyncio import (
1416
AsyncEngine,
@@ -129,6 +131,26 @@ async def session(sessionmaker: async_sessionmaker[AsyncSession]):
129131
await session.close()
130132

131133

134+
@pytest.fixture(scope="session")
135+
def mocked_celery() -> Celery:
136+
celery_app = Mock(Celery)
137+
celery_app.send_task = AsyncMock()
138+
return celery_app
139+
140+
141+
@pytest_asyncio.fixture(scope="session")
142+
async def app(settings: Settings, mocked_celery: Celery) -> FastAPI:
143+
app = application_factory(settings=settings)
144+
app.dependency_overrides[Celery] = lambda: mocked_celery
145+
return app
146+
147+
148+
@pytest_asyncio.fixture(scope="session")
149+
async def client_with_mocked_celery(app: FastAPI) -> AsyncGenerator:
150+
async with AsyncClient(app=app, base_url="http://testserver") as client:
151+
yield client
152+
153+
132154
@pytest_asyncio.fixture(scope="session")
133155
async def client(settings: Settings) -> AsyncGenerator:
134156
logger.info("START CLIENT FIXTURE")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from syncmaster.scheduler.celery import app as celery
1+
from syncmaster.worker.celery import app as celery
22

33
celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"])

tests/test_integration/test_scheduler/test_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
from syncmaster.backend.settings import ServerAppSettings as Settings
99
from syncmaster.db.models import Run, Status
10-
from syncmaster.scheduler import TransferFetcher, TransferJobManager
10+
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
11+
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
1112
from tests.mocks import MockTransfer
1213

1314
pytestmark = [pytest.mark.asyncio, pytest.mark.worker, pytest.mark.scheduler_integration]

tests/test_unit/test_runs/test_create_run.py

Lines changed: 27 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from unittest.mock import AsyncMock
22

33
import pytest
4+
from celery import Celery
45
from httpx import AsyncClient
6+
from pytest_mock import MockerFixture
57
from sqlalchemy import desc, select
68
from sqlalchemy.ext.asyncio import AsyncSession
79

@@ -12,15 +14,15 @@
1214

1315

1416
async def test_developer_plus_can_create_run_of_transfer_his_group(
15-
client: AsyncClient,
17+
client_with_mocked_celery: AsyncClient,
18+
mocked_celery: Celery,
1619
group_transfer: MockTransfer,
1720
session: AsyncSession,
18-
mocker,
21+
mocker: MockerFixture,
1922
role_developer_plus: UserTestRoles,
2023
) -> None:
21-
# Arrange
2224
user = group_transfer.owner_group.get_member_of_role(role_developer_plus)
23-
mock_send_task = mocker.patch("syncmaster.backend.celery.app.send_task")
25+
mock_send_task = mocked_celery.send_task
2426
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
2527

2628
run = (
@@ -31,14 +33,12 @@ async def test_developer_plus_can_create_run_of_transfer_his_group(
3133

3234
assert not run
3335

34-
# Act
35-
result = await client.post(
36+
result = await client_with_mocked_celery.post(
3637
"v1/runs",
3738
headers={"Authorization": f"Bearer {user.token}"},
3839
json={"transfer_id": group_transfer.id},
3940
)
4041

41-
# Assert
4242
run = (
4343
await session.scalars(
4444
select(Run).filter_by(transfer_id=group_transfer.id, status=Status.CREATED).order_by(desc(Run.created_at)),
@@ -66,24 +66,20 @@ async def test_developer_plus_can_create_run_of_transfer_his_group(
6666

6767

6868
async def test_groupless_user_cannot_create_run(
69-
client: AsyncClient,
69+
client_with_mocked_celery: AsyncClient,
7070
simple_user: MockUser,
7171
group_transfer: MockTransfer,
7272
session: AsyncSession,
73-
mocker,
73+
mocker: MockerFixture,
7474
) -> None:
75-
# Arrange
76-
mocker.patch("syncmaster.backend.celery.app.send_task")
7775
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
7876

79-
# Act
80-
result = await client.post(
77+
result = await client_with_mocked_celery.post(
8178
"v1/runs",
8279
headers={"Authorization": f"Bearer {simple_user.token}"},
8380
json={"transfer_id": group_transfer.id},
8481
)
8582

86-
# Assert
8783
assert result.json() == {
8884
"error": {
8985
"code": "not_found",
@@ -95,26 +91,22 @@ async def test_groupless_user_cannot_create_run(
9591

9692

9793
async def test_group_member_cannot_create_run_of_other_group_transfer(
98-
client: AsyncClient,
94+
client_with_mocked_celery: AsyncClient,
9995
group_transfer: MockTransfer,
10096
group: MockGroup,
10197
session: AsyncSession,
102-
mocker,
98+
mocker: MockerFixture,
10399
role_guest_plus: UserTestRoles,
104100
):
105-
# Arrange
106-
mocker.patch("syncmaster.backend.celery.app.send_task")
107101
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
108102
user = group.get_member_of_role(role_guest_plus)
109103

110-
# Act
111-
result = await client.post(
104+
result = await client_with_mocked_celery.post(
112105
"v1/runs",
113106
headers={"Authorization": f"Bearer {user.token}"},
114107
json={"transfer_id": group_transfer.id},
115108
)
116109

117-
# Assert
118110
assert result.json() == {
119111
"error": {
120112
"code": "not_found",
@@ -132,18 +124,17 @@ async def test_group_member_cannot_create_run_of_other_group_transfer(
132124

133125

134126
async def test_superuser_can_create_run(
135-
client: AsyncClient,
127+
client_with_mocked_celery: AsyncClient,
128+
mocked_celery: Celery,
136129
superuser: MockUser,
137130
group_transfer: MockTransfer,
138131
session: AsyncSession,
139-
mocker,
132+
mocker: MockerFixture,
140133
) -> None:
141-
# Arrange
142-
mock_send_task = mocker.patch("syncmaster.backend.celery.app.send_task")
134+
mock_send_task = mocked_celery.send_task
143135
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
144136

145-
# Act
146-
result = await client.post(
137+
result = await client_with_mocked_celery.post(
147138
"v1/runs",
148139
headers={"Authorization": f"Bearer {superuser.token}"},
149140
json={"transfer_id": group_transfer.id},
@@ -154,7 +145,6 @@ async def test_superuser_can_create_run(
154145
)
155146
).first()
156147

157-
# Assert
158148
response = result.json()
159149
assert response == {
160150
"id": run.id,
@@ -178,21 +168,17 @@ async def test_superuser_can_create_run(
178168

179169

180170
async def test_unauthorized_user_cannot_create_run(
181-
client: AsyncClient,
171+
client_with_mocked_celery: AsyncClient,
182172
group_transfer: MockTransfer,
183-
mocker,
173+
mocker: MockerFixture,
184174
) -> None:
185-
# Arrange
186-
mocker.patch("syncmaster.backend.celery.app.send_task")
187175
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
188176

189-
# Act
190-
result = await client.post(
177+
result = await client_with_mocked_celery.post(
191178
"v1/runs",
192179
json={"transfer_id": group_transfer.id},
193180
)
194181

195-
# Assert
196182
assert result.json() == {
197183
"error": {
198184
"code": "unauthorized",
@@ -204,25 +190,21 @@ async def test_unauthorized_user_cannot_create_run(
204190

205191

206192
async def test_group_member_cannot_create_run_of_unknown_transfer_error(
207-
client: AsyncClient,
193+
client_with_mocked_celery: AsyncClient,
208194
group_transfer: MockTransfer,
209195
session: AsyncSession,
210-
mocker,
196+
mocker: MockerFixture,
211197
role_guest_plus: UserTestRoles,
212198
) -> None:
213-
# Arrange
214199
user = group_transfer.owner_group.get_member_of_role(role_guest_plus)
215-
mocker.patch("syncmaster.backend.celery.app.send_task")
216200
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
217201

218-
# Act
219-
result = await client.post(
202+
result = await client_with_mocked_celery.post(
220203
"v1/runs",
221204
headers={"Authorization": f"Bearer {user.token}"},
222205
json={"transfer_id": -1},
223206
)
224207

225-
# Assert
226208
assert result.json() == {
227209
"error": {
228210
"code": "not_found",
@@ -233,24 +215,20 @@ async def test_group_member_cannot_create_run_of_unknown_transfer_error(
233215

234216

235217
async def test_superuser_cannot_create_run_of_unknown_transfer_error(
236-
client: AsyncClient,
218+
client_with_mocked_celery: AsyncClient,
237219
superuser: MockUser,
238220
group_transfer: MockTransfer,
239221
session: AsyncSession,
240-
mocker,
222+
mocker: MockerFixture,
241223
) -> None:
242-
# Arrange
243-
mocker.patch("syncmaster.backend.celery.app.send_task")
244224
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
245225

246-
# Act
247-
result = await client.post(
226+
result = await client_with_mocked_celery.post(
248227
"v1/runs",
249228
headers={"Authorization": f"Bearer {superuser.token}"},
250229
json={"transfer_id": -1},
251230
)
252231

253-
# Assert
254232
assert result.json() == {
255233
"error": {
256234
"code": "not_found",

0 commit comments

Comments
 (0)