Skip to content

Commit 4b5534e

Browse files
LBWhite55HaiHui886
authored andcommitted
fix the half bug of #100.Operator task status bug.
1 parent c2ac0a3 commit 4b5534e

File tree

3 files changed

+9
-17
lines changed

3 files changed

+9
-17
lines changed

data_celery/job/tasks.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ def run_pipline_job(job_uuid,user_id, user_name, user_token):
3333
try:
3434
db_session = get_sync_session()
3535
job_celery_uuid = ""
36-
for i in range(10):
36+
for i in range(15):
3737
job_obj = get_pipline_job_by_uid(db_session, job_uuid)
3838
if job_obj is not None and job_obj.job_celery_uuid is not None and job_obj.job_celery_uuid != "":
3939
job_celery_uuid = job_obj.job_celery_uuid
4040
break
41+
db_session.expire_all()
4142
time.sleep(1)
4243
if job_celery_uuid == "":
4344
insert_pipline_job_run_task_log_error(job_uuid, f"not found job celery uuid : {job_uuid}")
@@ -55,9 +56,6 @@ def run_pipline_job(job_uuid,user_id, user_name, user_token):
5556
return False
5657
current_process_id = os.getpid()
5758
add_process_to_redis(job_uuid,current_process_id, current_ip,work_name)
58-
# for i in range(100000):
59-
# print(f"任务进行中:{i}")
60-
# time.sleep(1)
6159

6260
job_obj.task_run_host = current_ip
6361
job_obj.job_celery_work_name = work_name
@@ -94,16 +92,14 @@ def run_pipline_job(job_uuid,user_id, user_name, user_token):
9492
except Exception as e:
9593
if job_obj is not None and db_session:
9694
try:
97-
# 确保任务状态被正确更新为 FAILED
9895
job_obj.status = JOB_STATUS.FAILED.value
9996
job_obj.date_finish = get_current_time()
100-
db_session.flush() # 先 flush 确保更改被记录
101-
db_session.commit() # 然后 commit 确保持久化
97+
db_session.flush()
98+
db_session.commit()
10299
except Exception as db_error:
103100
logger.error(f"Failed to update job status to FAILED in outer exception handler: {db_error}")
104101
try:
105102
db_session.rollback()
106-
# 重新查询 job 对象
107103
job_obj_refreshed = get_pipline_job_by_uid(db_session, job_uuid)
108104
if job_obj_refreshed:
109105
job_obj_refreshed.status = JOB_STATUS.FAILED.value
@@ -117,10 +113,8 @@ def run_pipline_job(job_uuid,user_id, user_name, user_token):
117113
finally:
118114
if job_obj and db_session:
119115
try:
120-
# 确保完成时间被设置(如果之前没有设置)
121116
if not job_obj.date_finish:
122117
job_obj.date_finish = get_current_time()
123-
# 如果状态还是 PROCESSING,说明异常处理可能没有正确执行,设置为 FAILED
124118
if job_obj.status == JOB_STATUS.PROCESSING.value:
125119
job_obj.status = JOB_STATUS.FAILED.value
126120
db_session.commit()
@@ -228,17 +222,13 @@ def run_pipline_job_task(config,job,session,user_id, user_name, user_token):
228222
job.export_branch_name = branch_name
229223
session.commit()
230224
except Exception as e:
231-
# 当操作符执行失败时,确保任务状态被正确更新为 FAILED
232225
try:
233-
# 使用 flush 和 commit 确保状态被正确保存
234226
job.status = JOB_STATUS.FAILED.value
235227
job.date_finish = get_current_time()
236-
session.flush() # 先 flush 确保更改被记录
237-
session.commit() # 然后 commit 确保持久化
228+
session.flush()
229+
session.commit()
238230
except Exception as db_error:
239-
# 如果更新失败,记录错误但继续抛出原始异常
240231
logger.error(f"Failed to update job status to FAILED: {db_error}")
241-
# 尝试回滚并重新提交
242232
try:
243233
session.rollback()
244234
from data_celery.db.JobsManager import get_pipline_job_by_uid

data_server/job/JobTask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def run_pipline_task(task_uuid: str,user_id:int,user_name:str,user_token:str,tas
1212
eta=parse_shanghai_datetime(task_run_time))
1313
return task_celery.id
1414
else:
15-
task_celery = run_pipline_job.delay(task_uuid, user_id,user_name, user_token)
15+
task_celery = run_pipline_job.apply_async(args=[task_uuid, user_id,user_name, user_token], countdown=1)
1616
return task_celery.id
1717

1818

data_server/job/JobsManager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ def run_pipline_job_only(job, user_id, user_name, user_token,session,task_run_ti
272272
run_time = None
273273
job_celery_uid = run_pipline_task(job.uuid, user_id, user_name, user_token,run_time)
274274
job.job_celery_uuid = job_celery_uid
275+
job.status = responses.JOB_STATUS.PROCESSING.value
276+
session.flush()
275277
session.commit()
276278
return True
277279

0 commit comments

Comments
 (0)