Skip to content

Commit fc062cd

Browse files
committed
[DOP-25451] Update run status in separated commit
1 parent f7a928f commit fc062cd

File tree

2 files changed

+53
-32
lines changed

2 files changed

+53
-32
lines changed

docs/changelog/0.2.3.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
0.2.2 (2025-04-11)
2+
==================
3+
4+
Bug fixes
5+
---------
6+
7+
- Fix Worker not updating Run ``status`` and ``ended_at`` fields after executing a very long ETL process.
8+

syncmaster/worker/transfer.py

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@
44

55
from asgi_correlation_id import correlation_id
66
from asgi_correlation_id.extensions.celery import load_correlation_ids
7-
from celery import Celery
87
from celery.signals import after_setup_task_logger
98
from celery.utils.log import get_task_logger
109
from jinja2 import Template
11-
from sqlalchemy import select
10+
from sqlalchemy import Engine, select
1211
from sqlalchemy.orm import Session, selectinload
1312

1413
from syncmaster.db.models import AuthData, Run, Status, Transfer
1514
from syncmaster.db.repositories.utils import decrypt_auth_data
1615
from syncmaster.exceptions.run import RunNotFoundError
1716
from syncmaster.settings.log import setup_logging
17+
from syncmaster.worker.base import WorkerTask
1818
from syncmaster.worker.celery import app as celery
1919
from syncmaster.worker.controller import TransferController
2020
from syncmaster.worker.settings import WorkerAppSettings
@@ -24,41 +24,47 @@
2424

2525

2626
@celery.task(name="run_transfer_task", bind=True, track_started=True)
27-
def run_transfer_task(self: Celery, run_id: int) -> None:
28-
with Session(self.engine) as session:
29-
run_transfer(
30-
session=session,
31-
run_id=run_id,
32-
settings=self.settings,
33-
)
27+
def run_transfer_task(self: WorkerTask, run_id: int) -> None:
28+
run_transfer(
29+
run_id=run_id,
30+
engine=self.engine,
31+
settings=self.settings,
32+
)
3433

3534

36-
def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
37-
logger.info("Start transfer")
38-
run = session.get(
39-
Run,
40-
run_id,
41-
options=(
35+
def run_transfer(run_id: int, engine: Engine, settings: WorkerAppSettings):
36+
run_query = (
37+
select(Run)
38+
.where(
39+
Run.id == run_id,
40+
Run.status == Status.CREATED,
41+
)
42+
.options(
4243
selectinload(Run.transfer),
4344
selectinload(Run.transfer).selectinload(Transfer.group),
4445
selectinload(Run.transfer).selectinload(Transfer.source_connection),
4546
selectinload(Run.transfer).selectinload(Transfer.target_connection),
46-
),
47+
)
4748
)
48-
if run is None:
49-
raise RunNotFoundError
5049

51-
run.status = Status.STARTED
52-
run.started_at = datetime.now(tz=timezone.utc)
53-
run.log_url = Template(settings.worker.log_url_template).render(run=run, correlation_id=correlation_id.get())
54-
session.add(run)
55-
session.commit()
50+
with Session(engine) as session:
51+
run = session.scalar(run_query)
52+
if run is None:
53+
raise RunNotFoundError
54+
55+
q_source_auth_data = select(AuthData).where(AuthData.connection_id == run.transfer.source_connection.id)
56+
q_target_auth_data = select(AuthData).where(AuthData.connection_id == run.transfer.target_connection.id)
5657

57-
q_source_auth_data = select(AuthData).where(AuthData.connection_id == run.transfer.source_connection.id)
58-
q_target_auth_data = select(AuthData).where(AuthData.connection_id == run.transfer.target_connection.id)
58+
target_auth_data = decrypt_auth_data(session.scalars(q_target_auth_data).one().value, settings)
59+
source_auth_data = decrypt_auth_data(session.scalars(q_source_auth_data).one().value, settings)
5960

60-
target_auth_data = decrypt_auth_data(session.scalars(q_target_auth_data).one().value, settings)
61-
source_auth_data = decrypt_auth_data(session.scalars(q_source_auth_data).one().value, settings)
61+
logger.info("Starting run")
62+
run.status = Status.STARTED
63+
run.started_at = datetime.now(tz=timezone.utc)
64+
run.log_url = Template(settings.worker.log_url_template).render(run=run, correlation_id=correlation_id.get())
65+
session.add(run)
66+
session.commit()
67+
session.expunge_all()
6268

6369
try:
6470
controller = TransferController(
@@ -71,15 +77,22 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
7177
)
7278
controller.perform_transfer()
7379
except Exception:
74-
run.status = Status.FAILED
80+
status = Status.FAILED
7581
logger.exception("Run %r was failed", run.id)
7682
else:
77-
run.status = Status.FINISHED
83+
status = Status.FINISHED
7884
logger.warning("Run %r was successful", run.id)
7985

80-
run.ended_at = datetime.now(tz=timezone.utc)
81-
session.add(run)
82-
session.commit()
86+
with Session(engine) as session:
87+
run = session.get(Run, run_id)
88+
if run is None:
89+
raise RunNotFoundError
90+
91+
logger.info("Updating run status")
92+
run.status = status
93+
run.ended_at = datetime.now(tz=timezone.utc)
94+
session.add(run)
95+
session.commit()
8396

8497

8598
@after_setup_task_logger.connect

0 commit comments

Comments
 (0)