55from apscheduler .schedulers .asyncio import AsyncIOScheduler
66from apscheduler .triggers .cron import CronTrigger
77from kombu .exceptions import KombuError
8+ from sqlalchemy import select
89
910from syncmaster .backend .services .unit_of_work import UnitOfWork
1011from syncmaster .db .models import RunType , Status , Transfer
1112from syncmaster .exceptions .run import CannotConnectToTaskQueueError
13+ from syncmaster .exceptions .transfer import TransferNotFoundError
1214from syncmaster .scheduler .celery import app as celery
1315from syncmaster .scheduler .settings import SchedulerAppSettings as Settings
1416from syncmaster .scheduler .utils import get_async_session
@@ -47,6 +49,22 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
4749 args = (transfer .id ,),
4850 )
4951
52+ async def remove_orphan_jobs (self ) -> None :
53+ all_jobs = self .scheduler .get_jobs ()
54+ settings = self .settings
55+ job_transfer_ids = [int (job .id ) for job in all_jobs ]
56+
57+ async with get_async_session (settings ) as session :
58+ result = await session .execute (
59+ select (Transfer ).where (Transfer .id .in_ (job_transfer_ids )),
60+ )
61+ existing_transfers = result .scalars ().all ()
62+ existing_transfer_ids = {t .id for t in existing_transfers }
63+
64+ for job in all_jobs :
65+ if int (job .id ) not in existing_transfer_ids :
66+ self .scheduler .remove_job (job .id )
67+
5068 @staticmethod
5169 async def send_job_to_celery (transfer_id : int ) -> None :
5270 """
@@ -61,7 +79,11 @@ async def send_job_to_celery(transfer_id: int) -> None:
6179 async with get_async_session (settings ) as session :
6280 unit_of_work = UnitOfWork (session = session , settings = settings )
6381
64- transfer = await unit_of_work .transfer .read_by_id (transfer_id )
82+ try :
83+ transfer = await unit_of_work .transfer .read_by_id (transfer_id )
84+ except TransferNotFoundError :
85+ return
86+
6587 credentials_source = await unit_of_work .credentials .read (transfer .source_connection_id )
6688 credentials_target = await unit_of_work .credentials .read (transfer .target_connection_id )
6789
0 commit comments