Skip to content

Commit 94f4511

Browse files
committed
[DOP-25451] Return error status to Celery if run failed
1 parent fc062cd commit 94f4511

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

syncmaster/worker/transfer.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,28 @@ def run_transfer_task(self: WorkerTask, run_id: int) -> None:
3333

3434

3535
def run_transfer(run_id: int, engine: Engine, settings: WorkerAppSettings):
36-
run_query = (
37-
select(Run)
38-
.where(
39-
Run.id == run_id,
40-
Run.status == Status.CREATED,
41-
)
42-
.options(
43-
selectinload(Run.transfer),
44-
selectinload(Run.transfer).selectinload(Transfer.group),
45-
selectinload(Run.transfer).selectinload(Transfer.source_connection),
46-
selectinload(Run.transfer).selectinload(Transfer.target_connection),
47-
)
48-
)
49-
5036
with Session(engine) as session:
51-
run = session.scalar(run_query)
37+
run = session.scalar(
38+
select(Run)
39+
.where(
40+
Run.id == run_id,
41+
Run.status == Status.CREATED,
42+
)
43+
.options(
44+
selectinload(Run.transfer),
45+
selectinload(Run.transfer).selectinload(Transfer.group),
46+
selectinload(Run.transfer).selectinload(Transfer.source_connection),
47+
selectinload(Run.transfer).selectinload(Transfer.target_connection),
48+
),
49+
)
5250
if run is None:
5351
raise RunNotFoundError
5452

5553
q_source_auth_data = select(AuthData).where(AuthData.connection_id == run.transfer.source_connection.id)
5654
q_target_auth_data = select(AuthData).where(AuthData.connection_id == run.transfer.target_connection.id)
5755

58-
target_auth_data = decrypt_auth_data(session.scalars(q_target_auth_data).one().value, settings)
59-
source_auth_data = decrypt_auth_data(session.scalars(q_source_auth_data).one().value, settings)
56+
target_auth_data = decrypt_auth_data(session.scalar(q_target_auth_data).value, settings)
57+
source_auth_data = decrypt_auth_data(session.scalar(q_source_auth_data).value, settings)
6058

6159
logger.info("Starting run")
6260
run.status = Status.STARTED
@@ -76,12 +74,14 @@ def run_transfer(run_id: int, engine: Engine, settings: WorkerAppSettings):
7674
target_auth_data=target_auth_data,
7775
)
7876
controller.perform_transfer()
79-
except Exception:
77+
except Exception as e:
8078
status = Status.FAILED
81-
logger.exception("Run %r was failed", run.id)
79+
logger.exception("Run %r failed", run.id)
80+
exception = e
8281
else:
8382
status = Status.FINISHED
84-
logger.warning("Run %r was successful", run.id)
83+
logger.info("Run %r was successful", run.id)
84+
exception = None
8585

8686
with Session(engine) as session:
8787
run = session.get(Run, run_id)
@@ -94,6 +94,9 @@ def run_transfer(run_id: int, engine: Engine, settings: WorkerAppSettings):
9494
session.add(run)
9595
session.commit()
9696

97+
if exception is not None:
98+
raise exception
99+
97100

98101
@after_setup_task_logger.connect
99102
def setup_loggers(*args, **kwargs):

0 commit comments

Comments
 (0)