Skip to content

Commit f6747f7

Browse files
z275748353zhanglongbin
andauthored
Fix the bug of dataflow with ID #69 (#70)
* Fix the bug of dataflow with ID #34 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug where tasks get stuck when the trunk parameter configuration is unreasonable. * Fix the bug of dataflow with ID #69 --------- Co-authored-by: zhanglongbin <[email protected]>
1 parent 9a3a94d commit f6747f7

File tree

5 files changed

+364
-56
lines changed

5 files changed

+364
-56
lines changed

data_celery/job/tasks.py

Lines changed: 76 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import tempfile,time
2020
import traceback
2121
import io
22+
from loguru import logger
2223

2324

2425
@celery_app.task(name="run_pipline_job")
@@ -91,18 +92,42 @@ def run_pipline_job(job_uuid,user_id, user_name, user_token):
9192
# insert_pipline_job_run_task_log_error(job_uuid, f"not exists yaml config : {job_uuid}")
9293
# return False
9394
except Exception as e:
94-
if job_obj is not None:
95-
job_obj.status = JOB_STATUS.FAILED.value
96-
# print(f"--------{job_uuid} Error occurred while executing the task: {e.__str__()}")
95+
if job_obj is not None and db_session:
96+
try:
97+
# 确保任务状态被正确更新为 FAILED
98+
job_obj.status = JOB_STATUS.FAILED.value
99+
job_obj.date_finish = get_current_time()
100+
db_session.flush() # 先 flush 确保更改被记录
101+
db_session.commit() # 然后 commit 确保持久化
102+
except Exception as db_error:
103+
logger.error(f"Failed to update job status to FAILED in outer exception handler: {db_error}")
104+
try:
105+
db_session.rollback()
106+
# 重新查询 job 对象
107+
job_obj_refreshed = get_pipline_job_by_uid(db_session, job_uuid)
108+
if job_obj_refreshed:
109+
job_obj_refreshed.status = JOB_STATUS.FAILED.value
110+
job_obj_refreshed.date_finish = get_current_time()
111+
db_session.commit()
112+
except Exception as e2:
113+
logger.error(f"Failed to update job status even after rollback: {e2}")
97114
insert_pipline_job_run_task_log_error(job_uuid, f"{job_uuid} Error occurred while executing the task: {e.__str__()}")
98115
traceback.print_exc()
99116
return False
100117
finally:
101-
if job_obj:
102-
job_obj.date_finish = get_current_time()
103-
if db_session and job_obj:
104-
db_session.commit()
105-
db_session.close()
118+
if job_obj and db_session:
119+
try:
120+
# 确保完成时间被设置(如果之前没有设置)
121+
if not job_obj.date_finish:
122+
job_obj.date_finish = get_current_time()
123+
# 如果状态还是 PROCESSING,说明异常处理可能没有正确执行,设置为 FAILED
124+
if job_obj.status == JOB_STATUS.PROCESSING.value:
125+
job_obj.status = JOB_STATUS.FAILED.value
126+
db_session.commit()
127+
except Exception as e:
128+
logger.error(f"Failed to update job in finally block: {e}")
129+
finally:
130+
db_session.close()
106131
# if yaml_temp_dir and os.path.exists(yaml_temp_dir) and os.path.isdir(yaml_temp_dir):
107132
# shutil.rmtree(yaml_temp_dir)
108133
if current_process_id > 0 and current_ip is not None and work_name is not None:
@@ -183,21 +208,46 @@ def run_pipline_job_task(config,job,session,user_id, user_name, user_token):
183208
job.work_dir = work_dir
184209
job.status = JOB_STATUS.PROCESSING.value
185210
session.commit()
186-
_, branch_name = executor.run()
187-
188-
trace_dir = os.path.join(work_dir, 'trace')
189-
first_op = list(cfg.process[0])[0]
190-
count_filename = f"count-{first_op}.txt"
191-
count_filepath = os.path.join(trace_dir, count_filename)
192-
data_count = 0
193-
if os.path.exists(count_filepath):
194-
with open(count_filepath, 'r') as f:
195-
data_lines = f.read().strip()
196-
data_count = int(data_lines)
197-
198-
job.data_count = data_count
199-
job.status = JOB_STATUS.FINISHED.value
200-
job.process_count = data_count
201-
job.export_repo_id = repo_id
202-
job.export_branch_name = branch_name
203-
session.commit()
211+
try:
212+
_, branch_name = executor.run()
213+
214+
trace_dir = os.path.join(work_dir, 'trace')
215+
first_op = list(cfg.process[0])[0]
216+
count_filename = f"count-{first_op}.txt"
217+
count_filepath = os.path.join(trace_dir, count_filename)
218+
data_count = 0
219+
if os.path.exists(count_filepath):
220+
with open(count_filepath, 'r') as f:
221+
data_lines = f.read().strip()
222+
data_count = int(data_lines) if data_lines else 0
223+
224+
job.data_count = data_count
225+
job.status = JOB_STATUS.FINISHED.value
226+
job.process_count = data_count
227+
job.export_repo_id = repo_id
228+
job.export_branch_name = branch_name
229+
session.commit()
230+
except Exception as e:
231+
# 当操作符执行失败时,确保任务状态被正确更新为 FAILED
232+
try:
233+
# 使用 flush 和 commit 确保状态被正确保存
234+
job.status = JOB_STATUS.FAILED.value
235+
job.date_finish = get_current_time()
236+
session.flush() # 先 flush 确保更改被记录
237+
session.commit() # 然后 commit 确保持久化
238+
except Exception as db_error:
239+
# 如果更新失败,记录错误但继续抛出原始异常
240+
logger.error(f"Failed to update job status to FAILED: {db_error}")
241+
# 尝试回滚并重新提交
242+
try:
243+
session.rollback()
244+
from data_celery.db.JobsManager import get_pipline_job_by_uid
245+
job_from_db = get_pipline_job_by_uid(session, job.uuid)
246+
if job_from_db:
247+
job_from_db.status = JOB_STATUS.FAILED.value
248+
job_from_db.date_finish = get_current_time()
249+
session.commit()
250+
except Exception as e2:
251+
logger.error(f"Failed to update job status even after rollback: {e2}")
252+
insert_pipline_job_run_task_log_error(job.uuid, f"{job.uuid} Error occurred during pipeline execution: {str(e)}")
253+
raise

data_engine/core/data.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,20 @@ def process(self,
201201
end = time()
202202
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
203203
f'Left {len(dataset)} samples.')
204-
except: # noqa: E722
204+
except Exception as e:
205205
logger.error(f'An error occurred during Op [{op._name}].')
206+
# 同时写入 MongoDB,让任务日志界面可以看到
207+
if hasattr(op, 'job_uid') and op.job_uid:
208+
from data_celery.mongo_tools.tools import insert_pipline_job_run_task_log_error
209+
insert_pipline_job_run_task_log_error(
210+
op.job_uid,
211+
f'An error occurred during Op [{op._name}]: {e}',
212+
operator_name=op._name,
213+
operator_index=getattr(op, 'pipline_index', 0)
214+
)
206215
traceback.print_exc()
207-
# exit(1)
216+
# 重新抛出异常,让上层异常处理能够捕获并停止任务
217+
raise
208218
finally:
209219
if checkpointer and dataset is not self:
210220
logger.info('Writing checkpoint of dataset processed by '

data_engine/ops/load.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import sys
2-
2+
import traceback
33
from loguru import logger
44

55
from data_engine.utils.availability_utils import UNAVAILABLE_OPERATORS
66

77
from .base_op import OPERATORS
88
from .op_fusion import fuse_operators
9-
from data_celery.mongo_tools.tools import insert_pipline_job_run_task_log_error
9+
from data_celery.mongo_tools.tools import (insert_pipline_job_run_task_log_error,
10+
set_pipline_job_operator_status,
11+
OperatorStatusEnum)
12+
1013

1114
def load_ops(process_list, op_fusion=False,job_uid=""):
1215
"""
@@ -22,24 +25,39 @@ def load_ops(process_list, op_fusion=False,job_uid=""):
2225
ops = []
2326
new_process_list = []
2427
index = 0
25-
for process in process_list:
26-
op_name, args = list(process.items())[0]
27-
if op_name in UNAVAILABLE_OPERATORS:
28-
logger.error(UNAVAILABLE_OPERATORS[op_name].get_warning_msg())
29-
insert_pipline_job_run_task_log_error(job_uid, UNAVAILABLE_OPERATORS[op_name].get_warning_msg())
30-
sys.exit(UNAVAILABLE_OPERATORS[op_name].get_warning_msg())
31-
op = OPERATORS.modules[op_name](**args)
32-
op.pipline_index = index
33-
op.job_uid = job_uid
34-
ops.append(op)
35-
new_process_list.append(process)
36-
index += 1
37-
38-
# detect filter groups
39-
if op_fusion:
40-
new_process_list, ops = fuse_operators(new_process_list, ops,job_uid=job_uid)
41-
42-
for op_cfg, op in zip(new_process_list, ops):
43-
op._op_cfg = op_cfg
44-
45-
return ops
28+
try:
29+
for process in process_list:
30+
op_name, args = list(process.items())[0]
31+
# 检查 OP 是否可用
32+
if op_name in UNAVAILABLE_OPERATORS:
33+
# OP 不可用,记录日志,抛出异常,退出进程
34+
error_msg = UNAVAILABLE_OPERATORS[op_name].get_warning_msg()
35+
logger.error(error_msg)
36+
logger.error("Exiting process due to operator unavailability.")
37+
insert_pipline_job_run_task_log_error(job_uid, error_msg, operator_name=op_name, operator_index=index)
38+
set_pipline_job_operator_status(job_uid, OperatorStatusEnum.ERROR, op_name, operator_index=index)
39+
# 抛出异常,让上层能够捕获并更新任务状态
40+
raise RuntimeError(error_msg)
41+
op = OPERATORS.modules[op_name](**args)
42+
op.pipline_index = index
43+
op.job_uid = job_uid
44+
ops.append(op)
45+
new_process_list.append(process)
46+
index += 1
47+
48+
# detect filter groups
49+
if op_fusion:
50+
new_process_list, ops = fuse_operators(new_process_list, ops,job_uid=job_uid)
51+
52+
for op_cfg, op in zip(new_process_list, ops):
53+
op._op_cfg = op_cfg
54+
55+
return ops
56+
except Exception as e:
57+
# 捕获异常,记录日志,抛出异常
58+
logger.error(f"Error occurred while loading operators: {e}")
59+
traceback.print_exc()
60+
if job_uid:
61+
insert_pipline_job_run_task_log_error(job_uid, f"Error occurred while loading operators: {e}")
62+
# 抛出异常,让上层能够捕获并更新任务状态
63+
raise

0 commit comments

Comments
 (0)