Skip to content

Commit 5d263d9

Browse files
author
Ilyas Gasanov
committed
[DOP-20962] Add scheduler integration test
1 parent a26bb04 commit 5d263d9

File tree

4 files changed

+22
-30
lines changed

4 files changed

+22
-30
lines changed

syncmaster/worker/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,5 @@
1515
task_cls=WorkerTask,
1616
imports=[
1717
"syncmaster.worker.transfer",
18-
"tests.test_integration.test_scheduler.test_task",
1918
],
2019
)

syncmaster/worker/transfer.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
4+
import time
45
from datetime import datetime, timezone
56

67
import onetl
@@ -103,3 +104,23 @@ def load_correlation_id(task, *args, **kwargs) -> None:
103104
# Here we're able to load the correlation ID from the headers
104105
id_value = task.request.get(CORRELATION_CELERY_HEADER_ID)
105106
correlation_id.set(id_value)
107+
108+
109+
@celery.task(name="tick", bind=True, track_started=True)
110+
def tick(self: WorkerTask, run_id: int) -> None:
111+
"""This task can be used for testing."""
112+
with Session(self.engine) as session:
113+
run = session.get(Run, run_id)
114+
if run is None:
115+
raise RunNotFoundError
116+
117+
run.started_at = datetime.now(tz=timezone.utc)
118+
run.status = Status.STARTED
119+
session.add(run)
120+
session.commit()
121+
122+
time.sleep(2) # to make sure that previous status is handled in test
123+
run.status = Status.FINISHED
124+
run.ended_at = datetime.now(tz=timezone.utc)
125+
session.add(run)
126+
session.commit()

tests/test_integration/test_scheduler/test_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async def test_scheduler(
2323
transfer_fetcher = TransferFetcher(settings)
2424
transfers = await transfer_fetcher.fetch_updated_jobs()
2525
assert transfers
26-
assert group_transfer.transfer.id in [t.id for t in transfers]
26+
assert group_transfer.transfer.id in {t.id for t in transfers}
2727

2828
transfer_job_manager.update_jobs(transfers)
2929

tests/test_integration/test_scheduler/test_task.py

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)