Skip to content

Commit d465bdf

Browse files
[DOP-19790] Implement scheduler (#114)
1 parent 890b312 commit d465bdf

32 files changed

+587
-17
lines changed

.github/workflows/unit-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
run: |
5757
source .env.local
5858
poetry run python -m syncmaster.db.migrations upgrade head
59-
poetry run coverage run -m pytest -vvv -s -m backend
59+
poetry run coverage run -m pytest -vvv -s -m "backend or scheduler"
6060
6161
- name: Shutdown
6262
if: always()

docker-compose.test.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,25 @@ services:
5151
condition: service_healthy
5252
profiles: [backend, all]
5353

54+
scheduler:
55+
image: mtsrus/syncmaster-backend:${BACKEND_IMAGE_TAG:-test}
56+
restart: unless-stopped
57+
build:
58+
dockerfile: docker/Dockerfile.backend
59+
context: .
60+
target: test
61+
env_file: .env.docker
62+
volumes:
63+
- ./syncmaster:/app/syncmaster
64+
- ./pyproject.toml:/app/pyproject.toml
65+
depends_on:
66+
db:
67+
condition: service_healthy
68+
rabbitmq:
69+
condition: service_healthy
70+
entrypoint: [python, -m, syncmaster.scheduler]
71+
profiles: [scheduler, all]
72+
5473
worker:
5574
image: mtsrus/syncmaster-worker:${WORKER_IMAGE_TAG:-test}
5675
restart: unless-stopped
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement a scheduler to run celery tasks on a schedule. This can be done via the `Transfer` table by fields `is_scheduled` and `schedule` (cron-like expression). The Run model now has a `type` field with options `MANUAL` and `SCHEDULED`.

poetry.lock

Lines changed: 65 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ uuid6 = "^2024.7.10"
6666
coloredlogs = {version = "*", optional = true}
6767
python-json-logger = {version = "*", optional = true}
6868
asyncpg = { version = ">=0.29,<0.31", optional = true }
69+
apscheduler = { version = "^3.10.4", optional = true }
6970

7071
[tool.poetry.extras]
7172
backend = [
@@ -86,6 +87,7 @@ backend = [
8687
"asyncpg",
8788
# migrations only
8889
"celery",
90+
"apscheduler",
8991
]
9092

9193
worker = [
@@ -101,6 +103,18 @@ worker = [
101103
"coloredlogs",
102104
"python-json-logger",
103105
]
106+
107+
scheduler = [
108+
"apscheduler",
109+
"pydantic-settings",
110+
"sqlalchemy",
111+
"sqlalchemy-utils",
112+
"asyncpg",
113+
"python-multipart",
114+
"python-jose",
115+
"celery",
116+
]
117+
104118
[tool.poetry.group.test.dependencies]
105119
pandas-stubs = "^2.2.2.240909"
106120
pytest = "^8.3.3"
@@ -204,6 +218,7 @@ ignore_missing_imports = true
204218
[tool.pytest.ini_options]
205219
markers = [
206220
"backend: tests for backend (require running database)",
221+
"scheduler: tests for scheduler (require running database)",
207222
"worker: tests for syncmaster worker",
208223
"hive: tests for Hive",
209224
"postgres: tests on Postgres",

syncmaster/backend/api/v1/runs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from syncmaster.backend.dependencies import Stub
1212
from syncmaster.backend.services import UnitOfWork, get_user
13-
from syncmaster.db.models import Status, User
13+
from syncmaster.db.models import RunType, Status, User
1414
from syncmaster.db.utils import Permission
1515
from syncmaster.errors.registration import get_error_responses
1616
from syncmaster.exceptions.base import ActionNotAllowedError
@@ -116,6 +116,7 @@ async def start_run(
116116
# the work of checking fields and removing passwords is delegated to the ReadAuthDataSchema class
117117
source_creds=ReadAuthDataSchema(auth_data=credentials_source).dict(),
118118
target_creds=ReadAuthDataSchema(auth_data=credentials_target).dict(),
119+
type=RunType.MANUAL,
119120
)
120121

121122
log_url = Template(settings.worker.LOG_URL_TEMPLATE).render(

syncmaster/backend/api/v1/transfers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ async def create_transfer(
131131
target_params=transfer_data.target_params.dict(),
132132
strategy_params=transfer_data.strategy_params.dict(),
133133
queue_id=transfer_data.queue_id,
134+
is_scheduled=transfer_data.is_scheduled,
135+
schedule=transfer_data.schedule,
134136
)
135137
return ReadTransferSchema.from_orm(transfer)
136138

syncmaster/db/migrations/versions/2023-11-23_0008_create_run_table.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def upgrade():
2525
sa.Column("started_at", sa.DateTime(), nullable=True),
2626
sa.Column("ended_at", sa.DateTime(), nullable=True),
2727
sa.Column("status", sa.String(255), nullable=False),
28+
sa.Column("type", sa.String(64), nullable=False),
2829
sa.Column("log_url", sa.String(length=512), nullable=True),
2930
sa.Column("transfer_dump", sa.JSON(), nullable=False),
3031
sa.Column("created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
@@ -37,11 +38,13 @@ def upgrade():
3738
),
3839
sa.PrimaryKeyConstraint("id", name=op.f("pk__run")),
3940
)
41+
op.create_index(op.f("ix__run__type"), "run", ["type"], unique=False)
4042
op.create_index(op.f("ix__run__status"), "run", ["status"], unique=False)
4143
op.create_index(op.f("ix__run__transfer_id"), "run", ["transfer_id"], unique=False)
4244

4345

4446
def downgrade():
4547
op.drop_index(op.f("ix__run__transfer_id"), table_name="run")
4648
op.drop_index(op.f("ix__run__status"), table_name="run")
49+
op.drop_index(op.f("ix__run__type"), table_name="run")
4750
op.drop_table("run")
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Create apscheduler table
4+
5+
Revision ID: 0011
6+
Revises: 0010
7+
Create Date: 2024-11-01 08:37:47.078657
8+
9+
"""
10+
import sqlalchemy as sa
11+
from alembic import op
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "0011"
15+
down_revision = "0010"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def exists_table(table_name: str) -> bool:
21+
conn = op.get_bind()
22+
inspector = sa.inspect(conn)
23+
tables = inspector.get_table_names()
24+
return table_name in tables
25+
26+
27+
def upgrade() -> None:
28+
if exists_table("apscheduler_jobs"):
29+
return
30+
31+
op.create_table(
32+
"apscheduler_jobs",
33+
sa.Column("id", sa.VARCHAR(length=191), autoincrement=False, nullable=False),
34+
sa.Column("next_run_time", sa.Float(25), autoincrement=False, nullable=True),
35+
sa.Column("job_state", sa.LargeBinary(), autoincrement=False, nullable=False),
36+
sa.PrimaryKeyConstraint("id", name="apscheduler_jobs_pkey"),
37+
)
38+
39+
op.create_index("ix_apscheduler_jobs_next_run_time", "apscheduler_jobs", ["next_run_time"], unique=False)
40+
41+
42+
def downgrade() -> None:
43+
op.drop_index("ix_apscheduler_jobs_next_run_time", table_name="apscheduler_jobs")
44+
op.drop_table("apscheduler_jobs")

syncmaster/db/models/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.db.models.apscheduler_job import APSchedulerJob
34
from syncmaster.db.models.auth_data import AuthData
45
from syncmaster.db.models.base import Base
56
from syncmaster.db.models.connection import Connection
67
from syncmaster.db.models.group import Group, GroupMemberRole, UserGroup
78
from syncmaster.db.models.queue import Queue
8-
from syncmaster.db.models.run import Run, Status
9+
from syncmaster.db.models.run import Run, RunType, Status
910
from syncmaster.db.models.transfer import Transfer
1011
from syncmaster.db.models.user import User

0 commit comments

Comments
 (0)