Skip to content

Commit 53314d9

Browse files
authored
Merge branch 'main' into main
2 parents 2c1d6df + a8c5bbc commit 53314d9

File tree

16 files changed

+423
-201
lines changed

16 files changed

+423
-201
lines changed

data_celery/datasource/hive/tasks.py

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
ensure_directory_exists_remove, get_datasource_csg_hub_server_dir)
1313
from data_celery.mongo_tools.tools import insert_datasource_run_task_log_info,insert_datasource_run_task_log_error
1414
from data_server.datasource.services.datasource import get_datasource_connector
15+
from data_server.datasource.schemas import DataSourceCreate
1516
from data_engine.exporter.load import load_exporter
1617
from pathlib import Path
1718
import pandas as pd
@@ -103,13 +104,32 @@ def collection_hive_task(task_uid: str,user_name: str,user_token: str):
103104
max_line = extra_config["max_line_json"]
104105
if use_type == "sql":
105106
if use_sql:
106-
connector = get_datasource_connector(collection_task.datasource)
107-
if not connector.test_connection():
107+
try:
108+
# 将数据库对象转换为 DataSourceCreate 对象
109+
datasource_create = DataSourceCreate(
110+
name=collection_task.datasource.name,
111+
des=collection_task.datasource.des,
112+
source_type=collection_task.datasource.source_type,
113+
host=collection_task.datasource.host,
114+
port=collection_task.datasource.port,
115+
username=collection_task.datasource.username,
116+
password=collection_task.datasource.password,
117+
database=collection_task.datasource.database,
118+
auth_type=collection_task.datasource.auth_type
119+
)
120+
connector = get_datasource_connector(datasource_create)
121+
test_result = connector.test_connection()
122+
if not test_result or not test_result.get("success", False):
123+
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
124+
error_msg = test_result.get("message", "Connection failed") if test_result else "Connection test returned None"
125+
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} failed to connect to the database: {error_msg}")
126+
return False
127+
get_table_dataset_by_sql(connector, task_uid, use_sql, db_session, collection_task,
128+
datasource_temp_parquet_dir, max_line=max_line)
129+
except Exception as e:
108130
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
109-
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} failed to connect to the database.")
131+
insert_datasource_run_task_log_error(task_uid, f"Error occurred while executing the task: {str(e)}")
110132
return False
111-
get_table_dataset_by_sql(connector, task_uid, use_sql, db_session, collection_task,
112-
datasource_temp_parquet_dir, max_line=max_line)
113133
upload_path = datasource_temp_parquet_dir.join('run_sql')
114134
upload_to_csg_hub_server(csg_hub_dataset_id,
115135
csg_hub_dataset_name,
@@ -125,14 +145,34 @@ def collection_hive_task(task_uid: str,user_name: str,user_token: str):
125145
source = hive_config["source"]
126146
total_count = 0
127147
records_count = 0
128-
connector = get_datasource_connector(collection_task.datasource)
129-
if not connector.test_connection():
148+
try:
149+
# 将数据库对象转换为 DataSourceCreate 对象
150+
datasource_create = DataSourceCreate(
151+
name=collection_task.datasource.name,
152+
des=collection_task.datasource.des,
153+
source_type=collection_task.datasource.source_type,
154+
host=collection_task.datasource.host,
155+
port=collection_task.datasource.port,
156+
username=collection_task.datasource.username,
157+
password=collection_task.datasource.password,
158+
database=collection_task.datasource.database,
159+
auth_type=collection_task.datasource.auth_type
160+
)
161+
connector = get_datasource_connector(datasource_create)
162+
test_result = connector.test_connection()
163+
if not test_result or not test_result.get("success", False):
164+
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
165+
error_msg = test_result.get("message", "Connection failed") if test_result else "Connection test returned None"
166+
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} failed to connect to the database: {error_msg}")
167+
return False
168+
for table_name in source.keys():
169+
table_total = connector.get_table_total_count_hive(table_name)
170+
total_count += table_total
171+
except Exception as e:
130172
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
131-
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} failed to connect to the database.")
173+
insert_datasource_run_task_log_error(task_uid, f"Error occurred while executing the task: {str(e)}")
132174
return False
133-
for table_name in source.keys():
134-
table_total = connector.get_table_total_count_hive(table_name)
135-
total_count += table_total
175+
136176
collection_task.total_count = total_count
137177
collection_task.records_count = records_count
138178
db_session.commit()
@@ -165,8 +205,14 @@ def collection_hive_task(task_uid: str,user_name: str,user_token: str):
165205
except Exception as e:
166206
if collection_task:
167207
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
208+
error_type = type(e).__name__
209+
error_msg = str(e)
210+
error_traceback = traceback.format_exc()
211+
logger.error(f"Task {task_uid} error: {error_type}: {error_msg}")
212+
logger.error(f"Full traceback:\n{error_traceback}")
168213
traceback.print_exc()
169-
insert_datasource_run_task_log_error(task_uid, f"Error occurred while executing the task: {e}")
214+
insert_datasource_run_task_log_error(task_uid, f"Error occurred while executing the task: {error_type}: {error_msg}")
215+
insert_datasource_run_task_log_error(task_uid, f"Traceback: {error_traceback}")
170216
return False
171217
finally:
172218
if collection_task:

data_celery/datasource/mongo/tasks.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from data_server.datasource.services.datasource import get_datasource_connector
1212
from data_celery.mongo_tools.tools import insert_datasource_run_task_log_info, insert_datasource_run_task_log_error
1313
from data_engine.exporter.load import load_exporter
14-
from pathlib import Path
1514
import pandas as pd
1615
from loguru import logger
1716

@@ -256,46 +255,21 @@ def upload_to_csg_hub_server(csg_hub_dataset_id: str,
256255
branch=csg_hub_dataset_default_branch,
257256
user_name=user_name,
258257
user_token=user_token,
259-
work_dir=datasource_csg_hub_server_dir
258+
work_dir=datasource_csg_hub_server_dir,
259+
path_is_dir=True
260260
)
261-
upload_path: Path = Path(datasource_temp_json_dir)
262-
# Check whether the uploaded directory exists and is not empty
263-
if not os.path.exists(upload_path):
264-
insert_datasource_run_task_log_error(collection_task.task_uid,
265-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: upload path {upload_path} does not exist")
266-
return False
267-
268-
# List all files in the upload directory for debugging
269-
file_list = []
270-
for root, dirs, files in os.walk(upload_path):
271-
for file in files:
272-
file_list.append(os.path.join(root, file))
273-
insert_datasource_run_task_log_info(collection_task.task_uid,
274-
f"Files to upload: {len(file_list)} files found in {upload_path}")
275-
if len(file_list) == 0:
276-
insert_datasource_run_task_log_error(collection_task.task_uid,
277-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: upload path {upload_path} is empty")
278-
return False
279-
280-
output_branch_name = exporter.export_from_files(upload_path)
281-
282-
if output_branch_name:
283-
collection_task.csg_hub_branch = output_branch_name
261+
exporter.export_large_folder()
262+
if csg_hub_dataset_default_branch:
263+
collection_task.csg_hub_branch = csg_hub_dataset_default_branch
284264
db_session.commit()
285265
insert_datasource_run_task_log_info(collection_task.task_uid,
286266
f"the task[{collection_task.task_uid}] upload csg hub-server success...")
287267
else:
288268
insert_datasource_run_task_log_error(collection_task.task_uid,
289-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: export_from_files returned None")
269+
f"the task[{collection_task.task_uid}] upload csg hub-server fail...")
290270
except Exception as e:
291271
logger.error(e)
292-
error_msg = str(e)
293-
# Check if this is a "nothing to commit" error
294-
if "nothing to commit" in error_msg.lower() or "working tree clean" in error_msg.lower():
295-
insert_datasource_run_task_log_error(collection_task.task_uid,
296-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: No files to commit. This may happen if: 1) Files are already committed in the branch, 2) Files are ignored by .gitignore, 3) File paths are incorrect. Error: {error_msg}")
297-
else:
298-
insert_datasource_run_task_log_error(collection_task.task_uid,
299-
f"Task UID {collection_task.task_uid} Error occurred while uploading to CSG Hub server: {error_msg}")
272+
insert_datasource_run_task_log_error(collection_task.task_uid,
273+
f"Task UID {collection_task.task_uid} Error occurred while uploading to CSG Hub server: {e}")
300274
return False
301275
return True

data_celery/datasource/mysql/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def upload_to_csg_hub_server(csg_hub_dataset_id: str,
210210
try:
211211
# Upload to CSG Hub server
212212
ensure_directory_exists_remove(datasource_csg_hub_server_dir)
213-
insert_datasource_run_task_log_error(collection_task.task_uid,
213+
insert_datasource_run_task_log_info(collection_task.task_uid,
214214
f"Starting upload csg hub-server the task[{collection_task.task_uid}]...")
215215
exporter = load_exporter(
216216
export_path=datasource_temp_json_dir,

data_celery/formatify/tasks.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626
get_endpoint,
2727
REPO_TYPE_DATASET)
2828
from data_engine.utils.env import GetHubEndpoint
29+
30+
2931
@celery_app.task
3032
def format_task(task_id: int, user_name: str, user_token: str):
31-
3233
tmp_path: str = None
3334
db_session: Session = None
3435
format_task: DataFormatTask = None
@@ -49,9 +50,10 @@ def format_task(task_id: int, user_name: str, user_token: str):
4950
user_token=user_token,
5051
)
5152
ingester_result = ingesterCSGHUB.ingest()
52-
insert_formatity_task_log_info(format_task.task_uid, f"Download directory completed... Directory address:{ingester_result}")
53+
insert_formatity_task_log_info(format_task.task_uid,
54+
f"Download directory completed... Directory address:{ingester_result}")
5355
work_dir = Path(tmp_path).joinpath('work')
54-
file_bool = search_files(tmp_path,[format_task.from_data_type])
56+
file_bool = search_files(tmp_path, [format_task.from_data_type])
5557

5658
if not file_bool:
5759
insert_formatity_task_log_info(format_task.task_uid, f"file not found. task ended....")
@@ -78,10 +80,44 @@ def format_task(task_id: int, user_name: str, user_token: str):
7880
path_is_dir=True,
7981
work_dir=str(work_dir)
8082
)
81-
exporter.export_large_folder()
82-
insert_formatity_task_log_info(format_task.task_uid, 'Upload completed...')
83-
format_task.task_status = DataFormatTaskStatusEnum.COMPLETED.value
84-
db_session.commit()
83+
84+
# 上传文件重试逻辑:最多重试3次
85+
max_retry_count = 3
86+
upload_success = False
87+
retry_count = 0
88+
89+
for attempt in range(max_retry_count):
90+
try:
91+
if attempt == 0:
92+
insert_formatity_task_log_info(format_task.task_uid, f'开始上传文件(第1次尝试)...')
93+
else:
94+
retry_count += 1
95+
insert_formatity_task_log_info(format_task.task_uid,
96+
f'重新尝试上传文件(第{retry_count + 1}次尝试,共{max_retry_count}次)...')
97+
98+
exporter.export_large_folder()
99+
upload_success = True
100+
insert_formatity_task_log_info(format_task.task_uid, 'Upload completed...')
101+
break
102+
except Exception as e:
103+
error_msg = f'上传文件失败(第{attempt + 1}次尝试): {str(e)}'
104+
insert_formatity_task_log_error(format_task.task_uid, error_msg)
105+
logger.error(f"Task {format_task.task_uid} upload attempt {attempt + 1} failed: {error_msg}")
106+
107+
# 如果已经是最后一次尝试,终止任务
108+
if attempt == max_retry_count - 1:
109+
final_error_msg = f'上传文件失败,已重试{max_retry_count}次,任务终止。错误信息: {str(e)}'
110+
insert_formatity_task_log_error(format_task.task_uid, final_error_msg)
111+
logger.error(
112+
f"Task {format_task.task_uid} upload failed after {max_retry_count} attempts: {final_error_msg}")
113+
format_task.task_status = DataFormatTaskStatusEnum.ERROR.value
114+
db_session.commit()
115+
raise RuntimeError(final_error_msg)
116+
117+
# 如果上传成功,更新任务状态为完成
118+
if upload_success:
119+
format_task.task_status = DataFormatTaskStatusEnum.COMPLETED.value
120+
db_session.commit()
85121
pass
86122
except Exception as e:
87123
traceback.print_exc()
@@ -245,22 +281,20 @@ def convert_ppt_to_markdown(file_path: str, task_uid):
245281

246282
from typing import List, Dict, Tuple
247283

248-
def search_files(folder_path: str, types: List[int]) -> Tuple[bool, List[str]]:
249284

285+
def search_files(folder_path: str, types: List[int]) -> Tuple[bool, List[str]]:
250286
type_map: Dict[int, List[str]] = {
251287
0: ['.ppt', '.pptx'], # PPT
252288
1: ['.doc', '.docx'], # Word
253289
3: ['.xls', '.xlsx'] # Excel
254290
}
255291

256-
257292
target_extensions = set()
258293
for file_type in types:
259294
if file_type in type_map:
260295
for ext in type_map[file_type]:
261296
target_extensions.add(ext.lower())
262297

263-
264298
found_files: List[str] = []
265299

266300
def traverse(current_path: str) -> None:
@@ -286,8 +320,6 @@ def traverse(current_path: str) -> None:
286320
except Exception as e:
287321
print(f"Processing path {current_path} error: {str(e)}")
288322

289-
290323
traverse(folder_path)
291324

292-
293325
return bool(len(found_files) > 0)

data_engine/core/executor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __init__(
7474

7575
# Check if this is the specific output_only tool by tool name
7676
tool_name = getattr(self.cfg, 'tool_name', '')
77-
is_specific_output_only = (tool_name == 'template_executor_06_common_internal')
77+
is_specific_output_only = (tool_name == 'smoltalk_chinese_common_internal')
7878

7979
# normal_logic
8080
if not is_specific_output_only:
@@ -127,7 +127,8 @@ def __init__(
127127
branch = self.cfg.branch,
128128
user_name=self.user_name,
129129
user_token=self.user_token,
130-
work_dir=self.work_dir
130+
work_dir=self.work_dir,
131+
auto_version=True # Pipeline jobs use auto versioning
131132
)
132133

133134
# setup tracer

data_engine/core/ray_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ def __init__(self, cfg=None):
101101
branch=self.cfg.branch,
102102
user_name=self.user_name,
103103
user_token=self.user_token,
104-
work_dir=self.work_dir
104+
work_dir=self.work_dir,
105+
auto_version=True # Pipeline jobs use auto versioning
105106
)
106107

107108
# setup tracer

0 commit comments

Comments
 (0)