Skip to content

Commit 431ab07

Browse files
author
maxim-lixakov
committed
[DOP-22267] - add removing orphan transfers from scheduler
1 parent da40fb5 commit 431ab07

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

syncmaster/scheduler/__main__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ async def main():
2020

2121
while True:
2222
logger.info("Looking at the transfer table...")
23+
24+
await transfer_job_manager.remove_orphan_jobs()
2325
transfers = await transfer_fetcher.fetch_updated_jobs()
2426

2527
if transfers:
@@ -29,6 +31,7 @@ async def main():
2931
", ".join(str(t.id) for t in transfers),
3032
)
3133
transfer_job_manager.update_jobs(transfers)
34+
3235
transfer_fetcher.last_updated_at = max(t.updated_at for t in transfers)
3336
logger.info("Scheduler state has been updated. Last updated at: %s", transfer_fetcher.last_updated_at)
3437

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
from apscheduler.schedulers.asyncio import AsyncIOScheduler
66
from apscheduler.triggers.cron import CronTrigger
77
from kombu.exceptions import KombuError
8+
from sqlalchemy import select
89

910
from syncmaster.backend.services.unit_of_work import UnitOfWork
1011
from syncmaster.db.models import RunType, Status, Transfer
1112
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
13+
from syncmaster.exceptions.transfer import TransferNotFoundError
1214
from syncmaster.scheduler.celery import app as celery
1315
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1416
from syncmaster.scheduler.utils import get_async_session
@@ -47,6 +49,18 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
4749
args=(transfer.id,),
4850
)
4951

52+
async def remove_orphan_jobs(self) -> None:
53+
all_jobs = self.scheduler.get_jobs()
54+
settings = self.settings
55+
56+
async with get_async_session(settings) as session:
57+
transfer_ids = {t.id for t in (await session.execute(select(Transfer))).scalars().all()}
58+
59+
for job in all_jobs:
60+
transfer_id = int(job.id)
61+
if transfer_id not in transfer_ids:
62+
self.scheduler.remove_job(job.id)
63+
5064
@staticmethod
5165
async def send_job_to_celery(transfer_id: int) -> None:
5266
"""
@@ -61,7 +75,11 @@ async def send_job_to_celery(transfer_id: int) -> None:
6175
async with get_async_session(settings) as session:
6276
unit_of_work = UnitOfWork(session=session, settings=settings)
6377

64-
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
78+
try:
79+
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
80+
except TransferNotFoundError:
81+
return
82+
6583
credentials_source = await unit_of_work.credentials.read(transfer.source_connection_id)
6684
credentials_target = await unit_of_work.credentials.read(transfer.target_connection_id)
6785

0 commit comments

Comments
 (0)