Skip to content

Commit 5a995c1

Browse files
author
maxim-lixakov
committed
[DOP-22267] - add removing orphan transfers from scheduler
1 parent da40fb5 commit 5a995c1

File tree

5 files changed

+48
-20
lines changed

5 files changed

+48
-20
lines changed

syncmaster/scheduler/__main__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ async def main():
2020

2121
while True:
2222
logger.info("Looking at the transfer table...")
23+
24+
await transfer_job_manager.remove_orphan_jobs()
2325
transfers = await transfer_fetcher.fetch_updated_jobs()
2426

2527
if transfers:
@@ -29,6 +31,7 @@ async def main():
2931
", ".join(str(t.id) for t in transfers),
3032
)
3133
transfer_job_manager.update_jobs(transfers)
34+
3235
transfer_fetcher.last_updated_at = max(t.updated_at for t in transfers)
3336
logger.info("Scheduler state has been updated. Last updated at: %s", transfer_fetcher.last_updated_at)
3437

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
from apscheduler.schedulers.asyncio import AsyncIOScheduler
66
from apscheduler.triggers.cron import CronTrigger
77
from kombu.exceptions import KombuError
8+
from sqlalchemy import any_, select
89

910
from syncmaster.backend.services.unit_of_work import UnitOfWork
1011
from syncmaster.db.models import RunType, Status, Transfer
1112
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
13+
from syncmaster.exceptions.transfer import TransferNotFoundError
1214
from syncmaster.scheduler.celery import app as celery
1315
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1416
from syncmaster.scheduler.utils import get_async_session
@@ -47,6 +49,22 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
4749
args=(transfer.id,),
4850
)
4951

52+
async def remove_orphan_jobs(self) -> None:
53+
all_jobs = self.scheduler.get_jobs()
54+
settings = self.settings
55+
job_transfer_ids = [int(job.id) for job in all_jobs]
56+
57+
async with get_async_session(settings) as session:
58+
result = await session.execute(
59+
select(Transfer).where(Transfer.id == any_(job_transfer_ids)), # type: ignore[arg-type]
60+
)
61+
existing_transfers = result.scalars().all()
62+
existing_transfer_ids = {t.id for t in existing_transfers}
63+
64+
missing_job_ids = set(job_transfer_ids) - existing_transfer_ids
65+
for job_id in missing_job_ids:
66+
self.scheduler.remove_job(str(job_id))
67+
5068
@staticmethod
5169
async def send_job_to_celery(transfer_id: int) -> None:
5270
"""
@@ -61,7 +79,11 @@ async def send_job_to_celery(transfer_id: int) -> None:
6179
async with get_async_session(settings) as session:
6280
unit_of_work = UnitOfWork(session=session, settings=settings)
6381

64-
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
82+
try:
83+
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
84+
except TransferNotFoundError:
85+
return
86+
6587
credentials_source = await unit_of_work.credentials.read(transfer.source_connection_id)
6688
credentials_target = await unit_of_work.credentials.read(transfer.target_connection_id)
6789

tests/test_unit/test_groups/test_delete_group_by_id.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import pytest
22
from httpx import AsyncClient
3-
from sqlalchemy import select
43
from sqlalchemy.ext.asyncio import AsyncSession
54

65
from syncmaster.db.models import Group
@@ -16,7 +15,6 @@ async def test_only_superuser_can_delete_group(
1615
session: AsyncSession,
1716
):
1817
# Arrange
19-
await session.get(Group, empty_group.group.id)
2018
g_id = empty_group.group.id
2119

2220
# Act
@@ -35,9 +33,8 @@ async def test_only_superuser_can_delete_group(
3533
}
3634

3735
# Assert group was deleted
38-
stmt = select(Group).where(Group.id == g_id)
39-
res = await session.execute(stmt)
40-
group_in_db = res.scalars().first()
36+
session.expunge_all()
37+
group_in_db = await session.get(Group, g_id)
4138
assert group_in_db is None
4239

4340

tests/test_unit/test_queue/test_delete_queue.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import pytest
22
from httpx import AsyncClient
3-
from sqlalchemy import select
43
from sqlalchemy.ext.asyncio import AsyncSession
54

65
from syncmaster.db.models import Queue
@@ -18,6 +17,7 @@ async def test_maintainer_plus_can_delete_queue(
1817
):
1918
# Arrange
2019
user = mock_group.get_member_of_role(role_maintainer_plus)
20+
q_id = group_queue.id
2121

2222
# Act
2323
result = await client.delete(
@@ -32,6 +32,11 @@ async def test_maintainer_plus_can_delete_queue(
3232
}
3333
assert result.status_code == 200
3434

35+
# Assert queue was deleted
36+
session.expunge_all()
37+
queue_in_db = await session.get(Queue, q_id)
38+
assert queue_in_db is None
39+
3540

3641
async def test_superuser_can_delete_queue(
3742
client: AsyncClient,
@@ -56,9 +61,8 @@ async def test_superuser_can_delete_queue(
5661
assert result.status_code == 200
5762

5863
# Assert queue was deleted
59-
stmt = select(Queue).where(Queue.id == q_id)
60-
res = await session.execute(stmt)
61-
queue_in_db = res.scalars().first()
64+
session.expunge_all()
65+
queue_in_db = await session.get(Queue, q_id)
6266
assert queue_in_db is None
6367

6468

tests/test_unit/test_transfers/test_delete_transfer.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import pytest
22
from httpx import AsyncClient
3-
from sqlalchemy import select
43
from sqlalchemy.ext.asyncio import AsyncSession
54

65
from syncmaster.db.models import Transfer
@@ -17,8 +16,7 @@ async def test_maintainer_plus_can_delete_group_transfer(
1716
):
1817
# Arrange
1918
user = group_transfer.owner_group.get_member_of_role(role_maintainer_plus)
20-
t_id = group_transfer.id
21-
await session.get(Transfer, group_transfer.id)
19+
transfer = await session.get(Transfer, group_transfer.id)
2220

2321
# Act
2422
result = await client.delete(
@@ -35,9 +33,8 @@ async def test_maintainer_plus_can_delete_group_transfer(
3533
}
3634

3735
# Assert transfer was deleted
38-
stmt = select(Transfer).where(Transfer.id == t_id)
39-
res = await session.execute(stmt)
40-
transfer_in_db = res.scalars().first()
36+
session.expunge(transfer)
37+
transfer_in_db = await session.get(Transfer, transfer.id)
4138
assert transfer_in_db is None
4239

4340

@@ -47,7 +44,7 @@ async def test_superuser_can_delete_transfer(
4744
superuser: MockUser,
4845
session: AsyncSession,
4946
):
50-
t_id = group_transfer.id
47+
transfer = await session.get(Transfer, group_transfer.id)
5148

5249
# Act
5350
result = await client.delete(
@@ -64,9 +61,8 @@ async def test_superuser_can_delete_transfer(
6461
}
6562

6663
# Assert transfer was deleted
67-
stmt = select(Transfer).where(Transfer.id == t_id)
68-
res = await session.execute(stmt)
69-
transfer_in_db = res.scalars().first()
64+
session.expunge(transfer)
65+
transfer_in_db = await session.get(Transfer, transfer.id)
7066
assert transfer_in_db is None
7167

7268

@@ -99,6 +95,7 @@ async def test_developer_or_below_cannot_delete_transfer(
9995
role_developer_or_below: UserTestRoles,
10096
):
10197
# Act
98+
transfer = await session.get(Transfer, group_transfer.id)
10299
user = group_transfer.owner_group.get_member_of_role(role_developer_or_below)
103100

104101
# Assert
@@ -117,6 +114,11 @@ async def test_developer_or_below_cannot_delete_transfer(
117114
}
118115
assert result.status_code == 403
119116

117+
# Assert transfer was not deleted
118+
session.expunge(transfer)
119+
transfer_in_db = await session.get(Transfer, transfer.id)
120+
assert transfer_in_db is not None
121+
120122

121123
async def test_group_member_cannot_delete_other_group_transfer(
122124
client: AsyncClient,

0 commit comments

Comments
 (0)