Skip to content

Commit f8b36bc

Browse files
Ilyas GasanovIlyasDevelopment
authored andcommitted
[DOP-20962] Use test celery instance, add separate transfer fixture
1 parent 8144b46 commit f8b36bc

File tree

11 files changed

+207
-32
lines changed

11 files changed

+207
-32
lines changed

docker-compose.test.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ services:
7878
context: .
7979
target: test
8080
command: --loglevel=info -Q test_queue
81+
entrypoint: [python, -m, celery, -A, tests.test_integration.celery_test, worker, --max-tasks-per-child=1]
8182
env_file: .env.docker
8283
volumes:
8384
- ./syncmaster:/app/syncmaster
@@ -90,7 +91,7 @@ services:
9091
condition: service_healthy
9192
rabbitmq:
9293
condition: service_healthy
93-
profiles: [worker, s3, oracle, hdfs, hive, all]
94+
profiles: [worker, scheduler, s3, oracle, hdfs, hive, all]
9495

9596
test-postgres:
9697
image: postgres

syncmaster/settings/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ class Settings(BaseSettings):
5353
SCHEDULER_TRANSFER_FETCHING_TIMEOUT: int = 180 # seconds
5454
SCHEDULER_MISFIRE_GRACE_TIME: int = 300 # seconds
5555

56-
CORRELATION_CELERY_HEADER_ID: str = "CORRELATION_CELERY_HEADER_ID"
57-
5856
TOKEN_EXPIRED_TIME: int = 60 * 60 * 10 # 10 hours
5957
CREATE_SPARK_SESSION_FUNCTION: ImportString = "syncmaster.worker.spark.get_worker_spark_session"
6058

syncmaster/worker/transfer.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
4-
import time
54
from datetime import datetime, timezone
65

76
import onetl
@@ -104,23 +103,3 @@ def load_correlation_id(task, *args, **kwargs) -> None:
104103
# Here we're able to load the correlation ID from the headers
105104
id_value = task.request.get(CORRELATION_CELERY_HEADER_ID)
106105
correlation_id.set(id_value)
107-
108-
109-
@celery.task(name="tick", bind=True, track_started=True)
110-
def tick(self: WorkerTask, run_id: int) -> None:
111-
"""This task can be used for testing."""
112-
with Session(self.engine) as session:
113-
run = session.get(Run, run_id)
114-
if run is None:
115-
raise RunNotFoundError
116-
117-
run.started_at = datetime.now(tz=timezone.utc)
118-
run.status = Status.STARTED
119-
session.add(run)
120-
session.commit()
121-
122-
time.sleep(2) # to make sure that previous status is handled in test
123-
run.status = Status.FINISHED
124-
run.ended_at = datetime.now(tz=timezone.utc)
125-
session.add(run)
126-
session.commit()

tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"tests.test_unit.test_runs.run_fixtures",
3333
"tests.test_unit.test_connections.connection_fixtures",
3434
"tests.test_unit.test_scheduler.scheduler_fixtures",
35+
"tests.test_integration.test_scheduler.scheduler_fixtures",
3536
]
3637

3738

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from celery import Celery
2+
3+
from syncmaster.settings import Settings
4+
from syncmaster.worker.base import WorkerTask
5+
6+
# TODO: remove settings object creating during import
7+
settings = Settings()
8+
9+
celery = Celery(
10+
__name__,
11+
broker=settings.broker.url,
12+
backend="db+" + settings.database.sync_url,
13+
task_cls=WorkerTask,
14+
imports=[
15+
"syncmaster.worker.transfer",
16+
"tests.test_integration.test_scheduler.test_task",
17+
],
18+
)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from tests.test_integration.test_scheduler.scheduler_fixtures.mocker_fixtures import (
2+
mock_add_job,
3+
mock_send_task_to_tick,
4+
)
5+
from tests.test_integration.test_scheduler.scheduler_fixtures.transfer_fixture import (
6+
group_transfer_integration_mock,
7+
)

tests/test_integration/test_scheduler/conftest.py renamed to tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
import pytest
44
from apscheduler.triggers.cron import CronTrigger
5-
from pytest_mock import MockerFixture
5+
from pytest_mock import MockerFixture, MockType
66

77
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
88
from syncmaster.worker.config import celery
99

1010

1111
@pytest.fixture
12-
def mock_send_task_to_tick(mocker: MockerFixture):
12+
def mock_send_task_to_tick(mocker: MockerFixture) -> MockType:
1313
original_to_thread = asyncio.to_thread
1414
return mocker.patch(
1515
"asyncio.to_thread",
@@ -18,7 +18,7 @@ def mock_send_task_to_tick(mocker: MockerFixture):
1818

1919

2020
@pytest.fixture
21-
def mock_add_job(mocker: MockerFixture, transfer_job_manager: TransferJobManager):
21+
def mock_add_job(mocker: MockerFixture, transfer_job_manager: TransferJobManager) -> MockType:
2222
original_add_job = transfer_job_manager.scheduler.add_job
2323
return mocker.patch.object(
2424
transfer_job_manager.scheduler,
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from collections.abc import AsyncGenerator
2+
3+
import pytest_asyncio
4+
from sqlalchemy.ext.asyncio import AsyncSession
5+
6+
from syncmaster.backend.api.v1.auth.utils import sign_jwt
7+
from syncmaster.db.repositories.utils import decrypt_auth_data
8+
from syncmaster.settings import Settings
9+
from tests.mocks import (
10+
MockConnection,
11+
MockCredentials,
12+
MockGroup,
13+
MockTransfer,
14+
MockUser,
15+
UserTestRoles,
16+
)
17+
from tests.test_unit.conftest import create_group_member
18+
from tests.test_unit.utils import (
19+
create_connection,
20+
create_credentials,
21+
create_group,
22+
create_queue,
23+
create_transfer,
24+
create_user,
25+
)
26+
27+
28+
@pytest_asyncio.fixture
29+
async def group_transfer_integration_mock(
30+
session: AsyncSession,
31+
settings: Settings,
32+
create_connection_data: dict | None,
33+
create_transfer_data: dict | None,
34+
) -> AsyncGenerator[MockTransfer, None]:
35+
group_owner = await create_user(
36+
session=session,
37+
username="group_transfer_owner",
38+
is_active=True,
39+
)
40+
group = await create_group(
41+
session=session,
42+
name="group_for_group_transfer",
43+
owner_id=group_owner.id,
44+
)
45+
46+
queue = await create_queue(
47+
session=session,
48+
name="test_queue",
49+
group_id=group.id,
50+
)
51+
52+
members: list[MockUser] = []
53+
for username in (
54+
"transfer_group_member_maintainer",
55+
"transfer_group_member_developer",
56+
"transfer_group_member_guest",
57+
):
58+
members.append(
59+
await create_group_member(
60+
username=username,
61+
group_id=group.id,
62+
session=session,
63+
settings=settings,
64+
),
65+
)
66+
67+
await session.commit()
68+
mock_group = MockGroup(
69+
group=group,
70+
owner=MockUser(
71+
user=group_owner,
72+
auth_token=sign_jwt(group_owner.id, settings),
73+
role=UserTestRoles.Owner,
74+
),
75+
members=members,
76+
)
77+
78+
source_connection = await create_connection(
79+
session=session,
80+
name="group_transfer_source_connection",
81+
group_id=group.id,
82+
data=create_connection_data,
83+
)
84+
source_connection_creds = await create_credentials(
85+
session=session,
86+
settings=settings,
87+
connection_id=source_connection.id,
88+
)
89+
target_connection = await create_connection(
90+
session=session,
91+
name="group_transfer_target_connection",
92+
group_id=group.id,
93+
data=create_connection_data,
94+
)
95+
target_connection_creds = await create_credentials(
96+
session=session,
97+
settings=settings,
98+
connection_id=target_connection.id,
99+
)
100+
101+
transfer = await create_transfer(
102+
session=session,
103+
name="group_transfer",
104+
group_id=group.id,
105+
source_connection_id=source_connection.id,
106+
target_connection_id=target_connection.id,
107+
queue_id=queue.id,
108+
source_params=create_transfer_data,
109+
target_params=create_transfer_data,
110+
)
111+
112+
yield MockTransfer(
113+
transfer=transfer,
114+
source_connection=MockConnection(
115+
connection=source_connection,
116+
owner_group=mock_group,
117+
credentials=MockCredentials(
118+
value=decrypt_auth_data(source_connection_creds.value, settings=settings),
119+
connection_id=source_connection.id,
120+
),
121+
),
122+
target_connection=MockConnection(
123+
connection=target_connection,
124+
owner_group=mock_group,
125+
credentials=MockCredentials(
126+
value=decrypt_auth_data(target_connection_creds.value, settings=settings),
127+
connection_id=target_connection.id,
128+
),
129+
),
130+
owner_group=mock_group,
131+
)
132+
await session.delete(transfer)
133+
await session.delete(source_connection)
134+
await session.delete(target_connection)
135+
await session.delete(group)
136+
await session.delete(group_owner)
137+
await session.delete(queue)
138+
for member in members:
139+
await session.delete(member.user)
140+
await session.commit()

tests/test_integration/test_scheduler/test_scheduler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22

33
import pytest
4+
from pytest_mock import MockType
45
from sqlalchemy import select
56
from sqlalchemy.ext.asyncio import AsyncSession
67

@@ -15,11 +16,12 @@
1516
async def test_scheduler(
1617
session: AsyncSession,
1718
settings: Settings,
18-
group_transfer: MockTransfer,
19+
group_transfer_integration_mock: MockTransfer,
1920
transfer_job_manager: TransferJobManager,
20-
mock_send_task_to_tick,
21-
mock_add_job,
21+
mock_send_task_to_tick: MockType,
22+
mock_add_job: MockType,
2223
):
24+
group_transfer = group_transfer_integration_mock
2325
transfer_fetcher = TransferFetcher(settings)
2426
transfers = await transfer_fetcher.fetch_updated_jobs()
2527
assert transfers
@@ -30,7 +32,7 @@ async def test_scheduler(
3032
job = transfer_job_manager.scheduler.get_job(str(group_transfer.id))
3133
assert job is not None
3234

33-
await asyncio.sleep(1) # make sure that created job with every-second cron worked
35+
await asyncio.sleep(1.5) # make sure that created job with every-second cron worked
3436

3537
run = await session.scalar(
3638
select(Run).filter_by(transfer_id=group_transfer.id).order_by(Run.created_at.desc()),
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import time
2+
from datetime import datetime, timezone
3+
4+
from sqlalchemy.orm import Session
5+
6+
from syncmaster.db.models.run import Run, Status
7+
from syncmaster.exceptions.run import RunNotFoundError
8+
from syncmaster.worker.base import WorkerTask
9+
from syncmaster.worker.config import celery
10+
11+
12+
@celery.task(name="tick", bind=True, track_started=True)
13+
def tick(self: WorkerTask, run_id: int) -> None:
14+
with Session(self.engine) as session:
15+
run = session.get(Run, run_id)
16+
if run is None:
17+
raise RunNotFoundError
18+
19+
run.started_at = datetime.now(tz=timezone.utc)
20+
run.status = Status.STARTED
21+
session.add(run)
22+
session.commit()
23+
24+
time.sleep(2) # to make sure that previous status is handled in test
25+
run.status = Status.FINISHED
26+
run.ended_at = datetime.now(tz=timezone.utc)
27+
session.add(run)
28+
session.commit()

0 commit comments

Comments
 (0)