Skip to content

Commit 14dab44

Browse files
author
zhanglongbin
committed
Fix the bug of dataflow with ID #49
1 parent d28704b commit 14dab44

File tree

3 files changed

+48
-14
lines changed

3 files changed

+48
-14
lines changed

data_celery/datasource/mysql/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def upload_to_csg_hub_server(csg_hub_dataset_id: str,
210210
try:
211211
# Upload to CSG Hub server
212212
ensure_directory_exists_remove(datasource_csg_hub_server_dir)
213-
insert_datasource_run_task_log_error(collection_task.task_uid,
213+
insert_datasource_run_task_log_info(collection_task.task_uid,
214214
f"Starting upload csg hub-server the task[{collection_task.task_uid}]...")
215215
exporter = load_exporter(
216216
export_path=datasource_temp_json_dir,

data_celery/formatify/tasks.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626
get_endpoint,
2727
REPO_TYPE_DATASET)
2828
from data_engine.utils.env import GetHubEndpoint
29+
30+
2931
@celery_app.task
3032
def format_task(task_id: int, user_name: str, user_token: str):
31-
3233
tmp_path: str = None
3334
db_session: Session = None
3435
format_task: DataFormatTask = None
@@ -49,9 +50,10 @@ def format_task(task_id: int, user_name: str, user_token: str):
4950
user_token=user_token,
5051
)
5152
ingester_result = ingesterCSGHUB.ingest()
52-
insert_formatity_task_log_info(format_task.task_uid, f"Download directory completed... Directory address:{ingester_result}")
53+
insert_formatity_task_log_info(format_task.task_uid,
54+
f"Download directory completed... Directory address:{ingester_result}")
5355
work_dir = Path(tmp_path).joinpath('work')
54-
file_bool = search_files(tmp_path,[format_task.from_data_type])
56+
file_bool = search_files(tmp_path, [format_task.from_data_type])
5557

5658
if not file_bool:
5759
insert_formatity_task_log_info(format_task.task_uid, f"file not found. task ended....")
@@ -78,10 +80,44 @@ def format_task(task_id: int, user_name: str, user_token: str):
7880
path_is_dir=True,
7981
work_dir=str(work_dir)
8082
)
81-
exporter.export_large_folder()
82-
insert_formatity_task_log_info(format_task.task_uid, 'Upload completed...')
83-
format_task.task_status = DataFormatTaskStatusEnum.COMPLETED.value
84-
db_session.commit()
83+
84+
# 上传文件重试逻辑:最多重试3次
85+
max_retry_count = 3
86+
upload_success = False
87+
retry_count = 0
88+
89+
for attempt in range(max_retry_count):
90+
try:
91+
if attempt == 0:
92+
insert_formatity_task_log_info(format_task.task_uid, f'开始上传文件(第1次尝试)...')
93+
else:
94+
retry_count += 1
95+
insert_formatity_task_log_info(format_task.task_uid,
96+
f'重新尝试上传文件(第{retry_count + 1}次尝试,共{max_retry_count}次)...')
97+
98+
exporter.export_large_folder()
99+
upload_success = True
100+
insert_formatity_task_log_info(format_task.task_uid, 'Upload completed...')
101+
break
102+
except Exception as e:
103+
error_msg = f'上传文件失败(第{attempt + 1}次尝试): {str(e)}'
104+
insert_formatity_task_log_error(format_task.task_uid, error_msg)
105+
logger.error(f"Task {format_task.task_uid} upload attempt {attempt + 1} failed: {error_msg}")
106+
107+
# 如果已经是最后一次尝试,终止任务
108+
if attempt == max_retry_count - 1:
109+
final_error_msg = f'上传文件失败,已重试{max_retry_count}次,任务终止。错误信息: {str(e)}'
110+
insert_formatity_task_log_error(format_task.task_uid, final_error_msg)
111+
logger.error(
112+
f"Task {format_task.task_uid} upload failed after {max_retry_count} attempts: {final_error_msg}")
113+
format_task.task_status = DataFormatTaskStatusEnum.ERROR.value
114+
db_session.commit()
115+
raise RuntimeError(final_error_msg)
116+
117+
# 如果上传成功,更新任务状态为完成
118+
if upload_success:
119+
format_task.task_status = DataFormatTaskStatusEnum.COMPLETED.value
120+
db_session.commit()
85121
pass
86122
except Exception as e:
87123
traceback.print_exc()
@@ -245,22 +281,20 @@ def convert_ppt_to_markdown(file_path: str, task_uid):
245281

246282
from typing import List, Dict, Tuple
247283

248-
def search_files(folder_path: str, types: List[int]) -> Tuple[bool, List[str]]:
249284

285+
def search_files(folder_path: str, types: List[int]) -> Tuple[bool, List[str]]:
250286
type_map: Dict[int, List[str]] = {
251287
0: ['.ppt', '.pptx'], # PPT
252288
1: ['.doc', '.docx'], # Word
253289
3: ['.xls', '.xlsx'] # Excel
254290
}
255291

256-
257292
target_extensions = set()
258293
for file_type in types:
259294
if file_type in type_map:
260295
for ext in type_map[file_type]:
261296
target_extensions.add(ext.lower())
262297

263-
264298
found_files: List[str] = []
265299

266300
def traverse(current_path: str) -> None:
@@ -286,8 +320,6 @@ def traverse(current_path: str) -> None:
286320
except Exception as e:
287321
print(f"Processing path {current_path} error: {str(e)}")
288322

289-
290323
traverse(folder_path)
291324

292-
293325
return bool(len(found_files) > 0)

data_engine/exporter/csghub_exporter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ def export_large_folder(self):
120120
)
121121
except Exception as e:
122122
traceback.print_exc()
123-
123+
logger.error(f'Failed to upload folder to {self.repo_id}: {str(e)}')
124+
# 重新抛出异常,让调用者知道上传失败
125+
raise
124126
def export(self, dataset):
125127
"""
126128
Export method for a dataset.

0 commit comments

Comments
 (0)