Skip to content

Commit fb0958c

Browse files
author
zhanglongbin
committed
修改数据流向显示错误问题
1 parent 0ecc413 commit fb0958c

File tree

7 files changed

+101
-56
lines changed

7 files changed

+101
-56
lines changed

data_celery/datasource/mongo/tasks.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from data_server.datasource.services.datasource import get_datasource_connector
1212
from data_celery.mongo_tools.tools import insert_datasource_run_task_log_info, insert_datasource_run_task_log_error
1313
from data_engine.exporter.load import load_exporter
14-
from pathlib import Path
1514
import pandas as pd
1615
from loguru import logger
1716

@@ -256,46 +255,21 @@ def upload_to_csg_hub_server(csg_hub_dataset_id: str,
256255
branch=csg_hub_dataset_default_branch,
257256
user_name=user_name,
258257
user_token=user_token,
259-
work_dir=datasource_csg_hub_server_dir
258+
work_dir=datasource_csg_hub_server_dir,
259+
path_is_dir=True
260260
)
261-
upload_path: Path = Path(datasource_temp_json_dir)
262-
# Check whether the uploaded directory exists and is not empty
263-
if not os.path.exists(upload_path):
264-
insert_datasource_run_task_log_error(collection_task.task_uid,
265-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: upload path {upload_path} does not exist")
266-
return False
267-
268-
# List all files in the upload directory for debugging
269-
file_list = []
270-
for root, dirs, files in os.walk(upload_path):
271-
for file in files:
272-
file_list.append(os.path.join(root, file))
273-
insert_datasource_run_task_log_info(collection_task.task_uid,
274-
f"Files to upload: {len(file_list)} files found in {upload_path}")
275-
if len(file_list) == 0:
276-
insert_datasource_run_task_log_error(collection_task.task_uid,
277-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: upload path {upload_path} is empty")
278-
return False
279-
280-
output_branch_name = exporter.export_from_files(upload_path)
281-
282-
if output_branch_name:
283-
collection_task.csg_hub_branch = output_branch_name
261+
exporter.export_large_folder()
262+
if csg_hub_dataset_default_branch:
263+
collection_task.csg_hub_branch = csg_hub_dataset_default_branch
284264
db_session.commit()
285265
insert_datasource_run_task_log_info(collection_task.task_uid,
286266
f"the task[{collection_task.task_uid}] upload csg hub-server success...")
287267
else:
288268
insert_datasource_run_task_log_error(collection_task.task_uid,
289-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: export_from_files returned None")
269+
f"the task[{collection_task.task_uid}] upload csg hub-server fail...")
290270
except Exception as e:
291271
logger.error(e)
292-
error_msg = str(e)
293-
# Check if this is a "nothing to commit" error
294-
if "nothing to commit" in error_msg.lower() or "working tree clean" in error_msg.lower():
295-
insert_datasource_run_task_log_error(collection_task.task_uid,
296-
f"the task[{collection_task.task_uid}] upload csg hub-server fail: No files to commit. This may happen if: 1) Files are already committed in the branch, 2) Files are ignored by .gitignore, 3) File paths are incorrect. Error: {error_msg}")
297-
else:
298-
insert_datasource_run_task_log_error(collection_task.task_uid,
299-
f"Task UID {collection_task.task_uid} Error occurred while uploading to CSG Hub server: {error_msg}")
272+
insert_datasource_run_task_log_error(collection_task.task_uid,
273+
f"Task UID {collection_task.task_uid} Error occurred while uploading to CSG Hub server: {e}")
300274
return False
301275
return True

data_engine/core/executor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __init__(
7474

7575
# Check if this is the specific output_only tool by tool name
7676
tool_name = getattr(self.cfg, 'tool_name', '')
77-
is_specific_output_only = (tool_name == 'template_executor_06_common_internal')
77+
is_specific_output_only = (tool_name == 'smoltalk_chinese_common_internal')
7878

7979
# normal_logic
8080
if not is_specific_output_only:
@@ -127,7 +127,8 @@ def __init__(
127127
branch = self.cfg.branch,
128128
user_name=self.user_name,
129129
user_token=self.user_token,
130-
work_dir=self.work_dir
130+
work_dir=self.work_dir,
131+
auto_version=True # Pipeline jobs use auto versioning
131132
)
132133

133134
# setup tracer

data_engine/core/ray_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ def __init__(self, cfg=None):
101101
branch=self.cfg.branch,
102102
user_name=self.user_name,
103103
user_token=self.user_token,
104-
work_dir=self.work_dir
104+
work_dir=self.work_dir,
105+
auto_version=True # Pipeline jobs use auto versioning
105106
)
106107

107108
# setup tracer

data_engine/exporter/csghub_exporter.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import traceback
22
from typing import List
3+
import os
4+
import re
35

46
from pycsghub.cmd.repo_types import RepoType
57
from pycsghub.upload_large_folder.main import upload_large_folder_internal
68

79
from data_engine.exporter.base_exporter import Exporter
8-
import os
9-
import re
1010
from loguru import logger
1111
from pycsghub.repository import Repository
1212
from pycsghub.utils import (build_csg_headers,
@@ -38,6 +38,7 @@ def __init__(
3838
user_token: str = None,
3939
work_dir: str = None,
4040
path_is_dir: bool = False,
41+
auto_version: bool = False,
4142
):
4243
"""
4344
Initialization method.
@@ -64,6 +65,7 @@ def __init__(
6465
self.user_name = user_name
6566
self.user_token = user_token
6667
self.work_dir = work_dir
68+
self.auto_version = auto_version # True for pipeline jobs, False for formatify tasks
6769
super().__init__(
6870
export_path=export_path,
6971
export_shard_size=export_shard_size,
@@ -193,26 +195,42 @@ def get_avai_branch(self, origin_branch: str) -> str:
193195
# valid_branches = ["main", "v3.1", "v1.11", "v1.5", "v2", "v1", "v1.2", "v1.11.2"]
194196
valid_branches.sort()
195197
logger.info(f'repo {self.repo_id} all branches: {valid_branches}')
196-
return self.find_next_version(origin_branch=origin_branch, valid_branches=valid_branches)
198+
result = self.find_next_version(origin_branch=origin_branch, valid_branches=valid_branches)
199+
return result
197200

198201
def find_next_version(self, origin_branch: str, valid_branches: List):
202+
"""
203+
根据任务类型选择版本生成逻辑:
204+
- auto_version=False (文件转换任务): 直接返回用户指定的分支名
205+
- auto_version=True (算子执行任务): 自动生成版本号 (v1, v2, ...)
206+
"""
207+
if not self.auto_version:
208+
# 文件转换任务:直接返回用户指定的分支名
209+
if origin_branch in valid_branches:
210+
return origin_branch
211+
return origin_branch
212+
213+
# 算子执行任务:自动生成版本号
199214
latestNum = 0
215+
200216
for b in valid_branches:
201217
if origin_branch == "main" and re.match(r"^v\d+", b):
218+
# 处理 main 分支的情况,查找 v1, v2, v3 等
202219
numStr = b.split(".")[0][1:]
203220
if not numStr.isdigit():
204221
continue
205222
num = int(numStr)
206223
latestNum = max(latestNum, num)
207224
elif b.startswith(origin_branch) and len(b) > len(origin_branch):
225+
# 处理其他分支的情况,查找 origin_branch.1, origin_branch.2 等
208226
numStr = b[len(origin_branch) + 1:]
209227
if not numStr.isdigit():
210228
continue
229+
num = int(numStr)
230+
latestNum = max(latestNum, num)
211231
else:
212232
continue
213-
num = int(numStr)
214-
latestNum = max(latestNum, num)
215-
233+
216234
if origin_branch == "main":
217235
if latestNum > 0:
218236
return "v" + str(latestNum + 1)

data_engine/exporter/load.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ def load_exporter(
1515
user_name: str=None,
1616
user_token: str=None,
1717
work_dir: str=None,
18-
path_is_dir: bool=False
18+
path_is_dir: bool=False,
19+
auto_version: bool=False
1920
) -> Exporter:
2021
return ExporterCSGHUB(
2122
export_path=export_path,
@@ -31,6 +32,7 @@ def load_exporter(
3132
user_name=user_name,
3233
user_token=user_token,
3334
work_dir=work_dir,
34-
path_is_dir=path_is_dir
35+
path_is_dir=path_is_dir,
36+
auto_version=auto_version
3537
)
3638

data_server/datasource/DatasourceModels.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from sqlalchemy import create_engine, Column, Integer, String, Text, Boolean, ForeignKey, DateTime, JSON
22
from sqlalchemy.orm import relationship
33
import datetime
4+
import json
45
from enum import Enum
56
from data_server.database.bean.base import Base
67

@@ -47,6 +48,38 @@ class DataSource(Base):
4748
updated_at = Column(DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now, comment='更新时间')
4849

4950
def to_json(self):
51+
# 处理 extra_config,确保正确解析并返回
52+
extra_config_raw = self.extra_config
53+
extra_config_dict = None
54+
55+
# 解析 extra_config
56+
if isinstance(extra_config_raw, str):
57+
try:
58+
extra_config_dict = json.loads(extra_config_raw)
59+
except (json.JSONDecodeError, TypeError):
60+
extra_config_dict = None
61+
elif isinstance(extra_config_raw, dict):
62+
extra_config_dict = extra_config_raw.copy()
63+
64+
# 处理分支字段:保持 csg_hub_dataset_default_branch 原样(从数据库读取,不做修改),同时添加 csg_hub_dataset_branch
65+
if extra_config_dict is not None:
66+
# 获取 csg_hub_dataset_default_branch 的值(如果存在),用于设置 csg_hub_dataset_branch
67+
# 保持 csg_hub_dataset_default_branch 原样,不做任何修改
68+
branch_value = extra_config_dict.get("csg_hub_dataset_default_branch")
69+
if not branch_value or (isinstance(branch_value, str) and branch_value.strip() == ""):
70+
branch_value = "main"
71+
72+
# 添加 csg_hub_dataset_branch 字段(从 csg_hub_dataset_default_branch 获取值,如果不存在则使用 main)
73+
extra_config_dict["csg_hub_dataset_branch"] = branch_value
74+
75+
# csg_hub_dataset_default_branch 保持原样,不做修改(如果数据库中有就返回,如果没有就不添加)
76+
77+
# 将更新后的字典转换回 JSON 字符串格式返回
78+
extra_config = json.dumps(extra_config_dict, ensure_ascii=False, indent=4)
79+
else:
80+
# 如果 extra_config 为空,保持原样返回空配置
81+
extra_config = extra_config_raw if extra_config_raw else "{}"
82+
5083
return {
5184
"id": self.id,
5285
"name": self.name,
@@ -57,7 +90,7 @@ def to_json(self):
5790
"username": self.username,
5891
"password": self.password,
5992
"database": self.database,
60-
"extra_config": self.extra_config,
93+
"extra_config": extra_config,
6194
"source_status": self.source_status,
6295
"owner_id": self.owner_id,
6396
"created_at": self.created_at.strftime("%Y-%m-%d %H:%M:%S") if self.created_at else None,

data_server/job/branch_tool.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,37 @@ def get_avai_branch(self, origin_branch: str) -> str:
3737
raise
3838

3939
def find_next_version(self, origin_branch: str, valid_branches: List) -> str:
40+
"""
41+
算子执行任务的版本生成逻辑:自动生成 v1, v2 等版本号
42+
"""
4043
latestNum = 0
44+
4145
for b in valid_branches:
4246
if origin_branch == "main" and re.match(r"^v\d+", b):
47+
# 处理 main 分支的情况,查找 v1, v2, v3 等
4348
numStr = b.split(".")[0][1:]
44-
if numStr.isdigit():
45-
num = int(numStr)
46-
latestNum = max(latestNum, num)
49+
if not numStr.isdigit():
50+
continue
51+
num = int(numStr)
52+
latestNum = max(latestNum, num)
4753
elif b.startswith(origin_branch) and len(b) > len(origin_branch):
48-
numStr = b[len(origin_branch)+1:]
49-
if numStr.isdigit():
50-
num = int(numStr)
51-
latestNum = max(latestNum, num)
52-
54+
# 处理其他分支的情况,查找 origin_branch.1, origin_branch.2 等
55+
numStr = b[len(origin_branch) + 1:]
56+
if not numStr.isdigit():
57+
continue
58+
num = int(numStr)
59+
latestNum = max(latestNum, num)
60+
else:
61+
continue
62+
5363
if origin_branch == "main":
54-
return f"v{latestNum + 1}" if latestNum > 0 else "v1"
64+
if latestNum > 0:
65+
return "v" + str(latestNum + 1)
66+
else:
67+
return "v1"
5568
else:
56-
return f"{origin_branch}.{latestNum + 1}" if latestNum > 0 else f"{origin_branch}.1"
69+
if latestNum > 0:
70+
return origin_branch + "." + str(latestNum + 1)
71+
else:
72+
return origin_branch + ".1"
5773

0 commit comments

Comments
 (0)