Skip to content

Commit f4ce8a7

Browse files
author
maxim-lixakov
committed
[DOP-22267] - add removing orphan transfers from scheduler
1 parent da40fb5 commit f4ce8a7

File tree

4 files changed

+26
-3
lines changed

4 files changed

+26
-3
lines changed

syncmaster/scheduler/__main__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ async def main():
2020

2121
while True:
2222
logger.info("Looking at the transfer table...")
23+
24+
await transfer_job_manager.remove_orphan_jobs()
2325
transfers = await transfer_fetcher.fetch_updated_jobs()
2426

2527
if transfers:
@@ -29,6 +31,7 @@ async def main():
2931
", ".join(str(t.id) for t in transfers),
3032
)
3133
transfer_job_manager.update_jobs(transfers)
34+
3235
transfer_fetcher.last_updated_at = max(t.updated_at for t in transfers)
3336
logger.info("Scheduler state has been updated. Last updated at: %s", transfer_fetcher.last_updated_at)
3437

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
from apscheduler.schedulers.asyncio import AsyncIOScheduler
66
from apscheduler.triggers.cron import CronTrigger
77
from kombu.exceptions import KombuError
8+
from sqlalchemy import select
89

910
from syncmaster.backend.services.unit_of_work import UnitOfWork
1011
from syncmaster.db.models import RunType, Status, Transfer
1112
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
13+
from syncmaster.exceptions.transfer import TransferNotFoundError
1214
from syncmaster.scheduler.celery import app as celery
1315
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1416
from syncmaster.scheduler.utils import get_async_session
@@ -47,6 +49,22 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
4749
args=(transfer.id,),
4850
)
4951

52+
async def remove_orphan_jobs(self) -> None:
53+
all_jobs = self.scheduler.get_jobs()
54+
settings = self.settings
55+
job_transfer_ids = [int(job.id) for job in all_jobs]
56+
57+
async with get_async_session(settings) as session:
58+
result = await session.execute(
59+
select(Transfer).where(Transfer.id.in_(job_transfer_ids)),
60+
)
61+
existing_transfers = result.scalars().all()
62+
existing_transfer_ids = {t.id for t in existing_transfers}
63+
64+
for job in all_jobs:
65+
if int(job.id) not in existing_transfer_ids:
66+
self.scheduler.remove_job(job.id)
67+
5068
@staticmethod
5169
async def send_job_to_celery(transfer_id: int) -> None:
5270
"""
@@ -61,7 +79,11 @@ async def send_job_to_celery(transfer_id: int) -> None:
6179
async with get_async_session(settings) as session:
6280
unit_of_work = UnitOfWork(session=session, settings=settings)
6381

64-
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
82+
try:
83+
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
84+
except TransferNotFoundError:
85+
return
86+
6587
credentials_source = await unit_of_work.credentials.read(transfer.source_connection_id)
6688
credentials_target = await unit_of_work.credentials.read(transfer.target_connection_id)
6789

tests/test_unit/test_groups/test_delete_group_by_id.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ async def test_only_superuser_can_delete_group(
1616
session: AsyncSession,
1717
):
1818
# Arrange
19-
await session.get(Group, empty_group.group.id)
2019
g_id = empty_group.group.id
2120

2221
# Act

tests/test_unit/test_transfers/test_delete_transfer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ async def test_maintainer_plus_can_delete_group_transfer(
1818
# Arrange
1919
user = group_transfer.owner_group.get_member_of_role(role_maintainer_plus)
2020
t_id = group_transfer.id
21-
await session.get(Transfer, group_transfer.id)
2221

2322
# Act
2423
result = await client.delete(

0 commit comments

Comments
 (0)