Skip to content

Commit e711b6a

Browse files
z275748353zhanglongbinLBWhite55
authored
更改md2json工具下载模型地址 (#76)
* Fix the bug of dataflow with ID #34 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug where tasks get stuck when the trunk parameter configuration is unreasonable. * Fix the bug of dataflow with ID #69 * 更改md2json工具下载模型地址 * fix the bug of 66.Can not delete trunk tool job which is on-going --------- Co-authored-by: zhanglongbin <[email protected]> Co-authored-by: JingTY <[email protected]>
1 parent 1cf66d9 commit e711b6a

File tree

4 files changed

+246
-9
lines changed

4 files changed

+246
-9
lines changed

data_engine/utils/model_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,9 @@ def get_opencsg_model_path(model_name: str, cache_dir: Optional[str] = None) ->
726726
return model_cache_path
727727

728728
# Get model information from OpenCSG API
729-
api_url = 'https://hub.opencsg.com/api/v1/models'
729+
# Use environment variable for endpoint, consistent with model_validator.py
730+
csghub_endpoint = os.getenv('CSGHUB_ENDPOINT', 'https://hub.opencsg.com')
731+
api_url = f'{csghub_endpoint}/api/v1/models'
730732
params = {
731733
'page': 1,
732734
'per': 1,
@@ -736,6 +738,7 @@ def get_opencsg_model_path(model_name: str, cache_dir: Optional[str] = None) ->
736738
}
737739

738740
try:
741+
logger.info(f'Using endpoint: {csghub_endpoint}')
739742
logger.info(f'Searching for model from OpenCSG Hub API: {model_name}')
740743
response = requests.get(api_url, params=params, timeout=30)
741744
response.raise_for_status()

data_server/job/JobExecutor.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,42 @@ def run_executor(config, job_id, job_name, user_id, user_name, user_token):
100100
setattr(job, 'work_dir', work_dir)
101101
setattr(job, 'status', JOB_STATUS.PROCESSING.value)
102102
_, branch_name = executor.run()
103+
104+
# write data to db after pipeline finish
105+
date_finish = datetime.datetime.now()
103106
except Exception as e:
104-
logger.info(f'executor.run error with: {str(e)}')
107+
logger.error(f'Job {job_id} execution failed with error: {str(e)}')
105108
# write data to db after pipeline failed
106109
date_finish = datetime.datetime.now()
107-
with get_sync_session() as session:
108-
with session.begin():
109-
job = session.query(Job).filter(Job.job_id == job_id).first()
110-
if job:
111-
setattr(job, 'status', JOB_STATUS.FAILED.value)
112-
setattr(job, 'date_finish', date_finish)
110+
try:
111+
with get_sync_session() as session:
112+
with session.begin():
113+
job = session.query(Job).filter(Job.job_id == job_id).first()
114+
if job:
115+
setattr(job, 'status', JOB_STATUS.FAILED.value)
116+
setattr(job, 'date_finish', date_finish)
117+
logger.info(f'Job {job_id} marked as FAILED')
118+
except Exception as db_error:
119+
logger.error(f'Failed to update job {job_id} status to FAILED: {str(db_error)}')
113120
return
121+
finally:
122+
# Final safety check: ensure job status is not stuck in PROCESSING
123+
try:
124+
with get_sync_session() as session:
125+
job = session.query(Job).filter(Job.job_id == job_id).first()
126+
if job and job.status == JOB_STATUS.PROCESSING.value:
127+
logger.warning(
128+
f'Job {job_id} still in PROCESSING state in finally block, '
129+
f'marking as FAILED'
130+
)
131+
job.status = JOB_STATUS.FAILED.value
132+
if not job.date_finish:
133+
job.date_finish = datetime.datetime.now()
134+
session.commit()
135+
except Exception as final_error:
136+
logger.error(f'Failed to update job {job_id} in finally block: {str(final_error)}')
114137

115-
# write data to db after pipeline finish
138+
# Continue with success path
116139
date_finish = datetime.datetime.now()
117140
if config.job_source == "tool":
118141
with get_sync_session() as session:

data_server/job/JobHealthCheck.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
"""
2+
Job Health Check Module
3+
4+
This module provides functionality to detect and fix orphaned jobs
5+
that remain in 'Processing' state after service restart or unexpected interruptions.
6+
"""
7+
8+
import datetime
9+
from typing import Dict, List
10+
from sqlalchemy.orm import Session
11+
from celery.result import AsyncResult
12+
from loguru import logger
13+
14+
from data_server.database.session import get_sync_session
15+
from data_celery.main import celery_app
16+
from data_server.job.JobModels import Job
17+
from data_server.schemas.responses import JOB_STATUS
18+
19+
# Celery task states that indicate a task is actually running
20+
CELERY_RUNNING_STATES = ['PENDING', 'RECEIVED', 'STARTED', 'RETRY']
21+
22+
23+
def fix_orphaned_jobs() -> Dict[str, any]:
24+
"""
25+
Fix orphaned jobs after service restart.
26+
27+
This function checks all jobs in 'Processing' state and verifies if they are
28+
actually running in Celery. If not, marks them as 'Failed'.
29+
30+
Returns:
31+
dict: Statistics about fixed jobs
32+
- fixed_count: Number of jobs marked as failed
33+
- skipped_count: Number of jobs still running
34+
- fixed_jobs: List of fixed job details
35+
"""
36+
logger.info("Starting orphaned jobs health check...")
37+
38+
fixed_count = 0
39+
skipped_count = 0
40+
fixed_jobs = []
41+
42+
try:
43+
with get_sync_session() as session:
44+
# Query all jobs in Processing state
45+
processing_jobs = session.query(Job).filter(
46+
Job.status == JOB_STATUS.PROCESSING.value
47+
).all()
48+
49+
total_jobs = len(processing_jobs)
50+
logger.info(f"Found {total_jobs} jobs in Processing state")
51+
52+
if total_jobs == 0:
53+
logger.info("No jobs to check, health check completed")
54+
return {
55+
'fixed_count': 0,
56+
'skipped_count': 0,
57+
'fixed_jobs': []
58+
}
59+
60+
for job in processing_jobs:
61+
try:
62+
is_running = _check_job_actually_running(job)
63+
64+
if not is_running:
65+
# Job is not running, mark as failed
66+
_mark_job_as_failed(session, job, "Job was interrupted by service restart")
67+
68+
fixed_jobs.append({
69+
'job_id': job.job_id,
70+
'job_name': job.job_name,
71+
'job_source': job.job_source,
72+
'celery_uuid': job.job_celery_uuid or 'N/A'
73+
})
74+
fixed_count += 1
75+
76+
logger.warning(
77+
f"Job {job.job_id} ({job.job_name}): "
78+
f"Not running in backend, marked as Failed"
79+
)
80+
else:
81+
skipped_count += 1
82+
logger.info(
83+
f"Job {job.job_id} ({job.job_name}): "
84+
f"Still running, skipping"
85+
)
86+
87+
except Exception as e:
88+
logger.error(
89+
f"Error checking job {job.job_id}: {str(e)}, "
90+
f"marking as Failed for safety"
91+
)
92+
# If we can't verify, mark as failed for safety
93+
try:
94+
_mark_job_as_failed(session, job, f"Health check failed: {str(e)}")
95+
fixed_count += 1
96+
except Exception as mark_error:
97+
logger.error(f"Failed to mark job {job.job_id} as Failed: {mark_error}")
98+
99+
# Commit all changes
100+
session.commit()
101+
102+
logger.info(
103+
f"Orphaned jobs health check completed: "
104+
f"{fixed_count} jobs marked as Failed, "
105+
f"{skipped_count} jobs still running"
106+
)
107+
108+
return {
109+
'fixed_count': fixed_count,
110+
'skipped_count': skipped_count,
111+
'fixed_jobs': fixed_jobs
112+
}
113+
114+
except Exception as e:
115+
logger.error(f"Failed to complete orphaned jobs health check: {str(e)}")
116+
raise
117+
118+
119+
def _check_job_actually_running(job: Job) -> bool:
120+
"""
121+
Check if a job is actually running in the backend.
122+
123+
For pipeline jobs (Celery-based), checks Celery task status.
124+
For regular jobs, assumes not running (as they run in separate processes).
125+
126+
Args:
127+
job: Job object to check
128+
129+
Returns:
130+
bool: True if job is actually running, False otherwise
131+
"""
132+
# Check if it's a pipeline job with Celery UUID
133+
if job.job_celery_uuid and len(job.job_celery_uuid) > 0:
134+
logger.debug(
135+
f"Job {job.job_id}: Checking Celery task {job.job_celery_uuid}"
136+
)
137+
return _check_celery_task_running(job.job_celery_uuid)
138+
139+
# For regular jobs without Celery UUID, assume not running after restart
140+
# (as they run in multiprocessing.Process which won't survive restart)
141+
logger.debug(
142+
f"Job {job.job_id}: No Celery UUID found, "
143+
f"assuming not running (job_source: {job.job_source})"
144+
)
145+
return False
146+
147+
148+
def _check_celery_task_running(celery_uuid: str) -> bool:
149+
"""
150+
Check if a Celery task is actually running.
151+
152+
Args:
153+
celery_uuid: Celery task UUID
154+
155+
Returns:
156+
bool: True if task is in running state, False otherwise
157+
"""
158+
try:
159+
task_result = AsyncResult(celery_uuid, app=celery_app)
160+
task_status = task_result.status
161+
162+
is_running = task_status in CELERY_RUNNING_STATES
163+
164+
logger.debug(
165+
f"Celery task {celery_uuid}: status = {task_status}, "
166+
f"is_running = {is_running}"
167+
)
168+
169+
return is_running
170+
171+
except Exception as e:
172+
logger.warning(
173+
f"Failed to check Celery task {celery_uuid}: {str(e)}, "
174+
f"assuming not running"
175+
)
176+
return False
177+
178+
179+
def _mark_job_as_failed(session: Session, job: Job, reason: str):
180+
"""
181+
Mark a job as failed with proper logging.
182+
183+
Args:
184+
session: Database session
185+
job: Job object to mark as failed
186+
reason: Reason for marking as failed
187+
"""
188+
old_status = job.status
189+
job.status = JOB_STATUS.FAILED.value
190+
191+
if not job.date_finish:
192+
job.date_finish = datetime.datetime.now()
193+
194+
logger.warning(
195+
f"Marking job {job.job_id} ({job.job_name}) as Failed: "
196+
f"previous_status={old_status}, reason={reason}"
197+
)
198+

data_server/main.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,19 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
7272
# Initialize database data
7373
initialize_database()
7474

75+
# Fix orphaned jobs after service restart
76+
from data_server.job.JobHealthCheck import fix_orphaned_jobs
77+
try:
78+
fix_result = fix_orphaned_jobs()
79+
logger.info(
80+
f"Orphaned jobs check completed: "
81+
f"{fix_result['fixed_count']} jobs fixed, "
82+
f"{fix_result['skipped_count']} jobs still running"
83+
)
84+
except Exception as fix_error:
85+
logger.error(f"Failed to fix orphaned jobs during startup: {str(fix_error)}")
86+
# Don't interrupt startup process
87+
7588
# Any other initialization code
7689
logger.info("Application startup complete")
7790

0 commit comments

Comments
 (0)