Skip to content

Commit e504f0e

Browse files
Ilyas GasanovIlyasDevelopment
authored andcommitted
[DOP-20962] Add scheduler integration test
1 parent c1e27dd commit e504f0e

File tree

10 files changed

+194
-3
lines changed

10 files changed

+194
-3
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
name: Scheduler Tests
2+
on:
3+
workflow_call:
4+
5+
env:
6+
DEFAULT_PYTHON: '3.12'
7+
8+
jobs:
9+
tests:
10+
name: Run Scheduler tests
11+
runs-on: ubuntu-latest
12+
13+
steps:
14+
- name: Checkout code
15+
uses: actions/checkout@v4
16+
17+
- name: Set up QEMU
18+
uses: docker/setup-qemu-action@v3
19+
20+
- name: Set up Docker Buildx
21+
uses: docker/setup-buildx-action@v3
22+
23+
- name: Cache jars
24+
uses: actions/cache@v4
25+
with:
26+
path: ./cached_jars
27+
key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler
28+
restore-keys: |
29+
${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler
30+
${{ runner.os }}-python-
31+
32+
- name: Build Worker Image
33+
uses: docker/build-push-action@v6
34+
with:
35+
context: .
36+
tags: mtsrus/syncmaster-worker:${{ github.sha }}
37+
target: test
38+
file: docker/Dockerfile.worker
39+
load: true
40+
cache-from: mtsrus/syncmaster-worker:develop
41+
42+
- name: Docker compose up
43+
run: |
44+
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
45+
docker compose -f docker-compose.test.yml --profile worker up -d --wait --wait-timeout 200
46+
env:
47+
WORKER_IMAGE_TAG: ${{ github.sha }}
48+
49+
# This is important, as coverage is exported after receiving SIGTERM
50+
- name: Run Scheduler Tests
51+
run: |
52+
docker compose -f ./docker-compose.test.yml --profile worker exec -T worker coverage run -m pytest -vvv -s -m "scheduler"
53+
54+
- name: Dump worker logs on failure
55+
if: failure()
56+
uses: jwalton/gh-docker-logs@v2
57+
with:
58+
images: mtsrus/syncmaster-worker
59+
dest: ./logs
60+
61+
- name: Shutdown
62+
if: always()
63+
run: |
64+
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
65+
66+
- name: Upload worker logs
67+
uses: actions/upload-artifact@v4
68+
if: failure()
69+
with:
70+
name: worker-logs-scheduler
71+
path: logs/*
72+
73+
- name: Upload coverage results
74+
uses: actions/upload-artifact@v4
75+
with:
76+
name: coverage-scheduler
77+
path: reports/*
78+
# https://github.com/actions/upload-artifact/issues/602
79+
include-hidden-files: true

.github/workflows/tests.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ jobs:
3232
name: S3 tests
3333
uses: ./.github/workflows/s3-tests.yml
3434

35+
scheduler_tests:
36+
name: Scheduler tests
37+
uses: ./.github/workflows/scheduler-tests.yml
38+
3539
unit_tests:
3640
name: Unit tests
3741
uses: ./.github/workflows/unit-test.yml

syncmaster/scheduler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
4+
from syncmaster.scheduler.transfer_job_manager import TransferJobManager

syncmaster/worker/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
task_cls=WorkerTask,
1616
imports=[
1717
"syncmaster.worker.transfer",
18+
"tests.test_integration.test_scheduler.test_task",
1819
],
1920
)

tests/test_integration/test_scheduler/__init__.py

Whitespace-only changes.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
3+
import pytest
4+
from apscheduler.triggers.cron import CronTrigger
5+
from pytest_mock import MockerFixture
6+
7+
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
8+
from syncmaster.worker.config import celery
9+
10+
11+
@pytest.fixture
12+
def mock_send_task_to_tick(mocker: MockerFixture):
13+
original_to_thread = asyncio.to_thread
14+
return mocker.patch(
15+
"asyncio.to_thread",
16+
new=lambda func, *args, **kwargs: original_to_thread(celery.send_task, "tick", *args[1:], **kwargs),
17+
)
18+
19+
20+
@pytest.fixture
21+
def mock_add_job(mocker: MockerFixture, transfer_job_manager: TransferJobManager):
22+
original_add_job = transfer_job_manager.scheduler.add_job
23+
return mocker.patch.object(
24+
transfer_job_manager.scheduler,
25+
"add_job",
26+
side_effect=lambda func, id, trigger, misfire_grace_time, args: original_add_job(
27+
func=func,
28+
id=id,
29+
trigger=CronTrigger(second="*"),
30+
misfire_grace_time=misfire_grace_time,
31+
args=args,
32+
),
33+
)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
3+
import pytest
4+
from sqlalchemy import select
5+
from sqlalchemy.ext.asyncio import AsyncSession
6+
7+
from syncmaster.db.models import Run, Status
8+
from syncmaster.scheduler import TransferFetcher, TransferJobManager
9+
from syncmaster.settings import Settings
10+
from tests.mocks import MockTransfer
11+
12+
pytestmark = [pytest.mark.asyncio, pytest.mark.worker]
13+
14+
15+
async def test_scheduler(
16+
session: AsyncSession,
17+
settings: Settings,
18+
group_transfer: MockTransfer,
19+
transfer_job_manager: TransferJobManager,
20+
mock_send_task_to_tick,
21+
mock_add_job,
22+
):
23+
transfer_fetcher = TransferFetcher(settings)
24+
transfers = await transfer_fetcher.fetch_updated_jobs()
25+
assert transfers
26+
assert group_transfer.transfer.id in [t.id for t in transfers]
27+
28+
transfer_job_manager.update_jobs(transfers)
29+
30+
job = transfer_job_manager.scheduler.get_job(str(group_transfer.id))
31+
assert job is not None
32+
33+
await asyncio.sleep(1) # make sure that created job with every-second cron worked
34+
35+
run = await session.scalar(
36+
select(Run).filter_by(transfer_id=group_transfer.id).order_by(Run.created_at.desc()),
37+
)
38+
assert run is not None
39+
assert run.status in [Status.CREATED, Status.STARTED]
40+
41+
await asyncio.sleep(2)
42+
await session.refresh(run)
43+
completed_run = await session.scalar(select(Run).filter_by(id=run.id))
44+
assert completed_run.status == Status.FINISHED
45+
assert completed_run.ended_at is not None
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import time
2+
from datetime import datetime, timezone
3+
4+
from sqlalchemy.orm import Session
5+
6+
from syncmaster.db.models.run import Run, Status
7+
from syncmaster.exceptions.run import RunNotFoundError
8+
from syncmaster.worker.base import WorkerTask
9+
from syncmaster.worker.config import celery
10+
11+
12+
@celery.task(name="tick", bind=True, track_started=True)
13+
def tick(self: WorkerTask, run_id: int) -> None:
14+
with Session(self.engine) as session:
15+
run = session.get(Run, run_id)
16+
if run is None:
17+
raise RunNotFoundError
18+
19+
run.started_at = datetime.now(tz=timezone.utc)
20+
run.status = Status.STARTED
21+
session.add(run)
22+
session.commit()
23+
24+
time.sleep(2) # to make sure that previous status is handled in test
25+
run.status = Status.FINISHED
26+
run.ended_at = datetime.now(tz=timezone.utc)
27+
session.add(run)
28+
session.commit()

tests/test_unit/test_transfers/transfer_fixtures/transfer_fixture.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import secrets
21
from collections.abc import AsyncGenerator
32

43
import pytest_asyncio
@@ -46,7 +45,7 @@ async def group_transfer(
4645

4746
queue = await create_queue(
4847
session=session,
49-
name=f"{secrets.token_hex(5)}_test_queue",
48+
name=f"test_queue",
5049
group_id=group.id,
5150
)
5251

tests/test_unit/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ async def create_transfer(
154154
source_params: dict | None = None,
155155
target_params: dict | None = None,
156156
is_scheduled: bool = True,
157-
schedule: str = "0 0 * * *",
157+
schedule: str = "* * * * *",
158158
strategy_params: dict | None = None,
159159
description: str = "",
160160
) -> Transfer:

0 commit comments

Comments
 (0)