Skip to content

Commit 511c51a

Browse files
727854320HaiHui886
authored andcommitted
新增格式转换 1.创建meta的参数skip_meta 2.将meta.json改成meta.log
1 parent 9d3cb4d commit 511c51a

File tree

7 files changed

+243
-135
lines changed

7 files changed

+243
-135
lines changed

README_zh.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pip install -r docker/requirements.txt
1010
### 启动api-server
1111
```cmd
1212
uvicorn data_server.main:app --reload
13+
uvicorn data_server.main:app --host 0.0.0.0 --port 8001
1314
```
1415

1516
### 启动celery

data_celery/formatify/tasks.py

Lines changed: 176 additions & 130 deletions
Large diffs are not rendered by default.

data_server/api/endpoints/formatify.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def get_progress_from_mongodb_logs(task_uid: str) -> Optional[dict]:
222222
collection = get_formatity_collection(client, task_uid)
223223

224224
# Find logs containing progress information (sorted by time descending)
225-
# Match format: Updated and uploaded meta.json (total: X, success: Y, failure: Z)
225+
# Match format: Updated and uploaded meta.log (total: X, success: Y, failure: Z)
226226
# Or: All files processed. Total: X, Success: Y, Failure: Z
227227
progress_patterns = [
228228
r'\(total:\s*(\d+),\s*success:\s*(\d+),\s*failure:\s*(\d+)\)', # (total: X, success: Y, failure: Z)

data_server/database/session.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,31 @@ def add_mineru_backend_column():
185185
logger.info("Column 'mineru_backend' added successfully to data_format_tasks table")
186186

187187

188+
def add_skip_meta_column():
189+
"""Add skip_meta column to data_format_tasks table"""
190+
try:
191+
logger.info("Checking if skip_meta column exists in data_format_tasks table...")
192+
with get_sync_session() as session:
193+
with session.begin():
194+
result = session.execute(text("""
195+
SELECT column_name
196+
FROM information_schema.columns
197+
WHERE table_name = 'data_format_tasks' AND column_name = 'skip_meta';
198+
"""))
199+
200+
if not result.fetchone():
201+
logger.info("skip_meta column does not exist, adding it...")
202+
session.execute(text("ALTER TABLE data_format_tasks ADD COLUMN skip_meta BOOLEAN DEFAULT FALSE;"))
203+
logger.info("Column 'skip_meta' added successfully to data_format_tasks table")
204+
else:
205+
logger.info("Column 'skip_meta' already exists in data_format_tasks table")
206+
except Exception as e:
207+
logger.error(f"Error adding skip_meta column: {e}")
208+
import traceback
209+
logger.error(traceback.format_exc())
210+
logger.warning("Continuing despite error...")
211+
212+
188213
_initialized = False
189214
from data_server.database.bean.work import Worker
190215
from data_server.job.JobModels import Job
@@ -215,9 +240,28 @@ def create_tables():
215240

216241
_initialized = True
217242

218-
add_first_op_column()
219-
add_mineru_api_url_column()
220-
add_mineru_backend_column()
243+
logger.info("Starting database column migrations...")
244+
try:
245+
add_first_op_column()
246+
except Exception as e:
247+
logger.error(f"Error in add_first_op_column: {e}")
248+
249+
try:
250+
add_mineru_api_url_column()
251+
except Exception as e:
252+
logger.error(f"Error in add_mineru_api_url_column: {e}")
253+
254+
try:
255+
add_mineru_backend_column()
256+
except Exception as e:
257+
logger.error(f"Error in add_mineru_backend_column: {e}")
258+
259+
try:
260+
add_skip_meta_column()
261+
except Exception as e:
262+
logger.error(f"Error in add_skip_meta_column: {e}")
263+
264+
logger.info("Database column migrations completed")
221265
def is_table_initialized(table_name: str) -> bool:
222266
"""
223267
Check if a specific table contains any data.
@@ -236,6 +280,12 @@ def initialize_database():
236280
Automatically executes one-time deletion on first startup (tracked in deletion_status table).
237281
This should be called once when the application starts.
238282
"""
283+
# Ensure database schema migrations are applied (check on every startup)
284+
try:
285+
add_skip_meta_column()
286+
except Exception as e:
287+
logger.warning(f"Could not check/add skip_meta column: {e}")
288+
239289
tables_to_initialize = [
240290
'operator_info',
241291
'operator_config',

data_server/formatify/FormatifyManager.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import uuid, os
55
from typing import List, Tuple, Optional
66
from data_server.formatify.FormatifyTask import run_format_task, stop_celery_task
7+
from loguru import logger
78

89

910
def greate_task_uid():
@@ -16,6 +17,10 @@ def create_formatify_task(db_session: Session, dataFormatTask: DataFormatTaskReq
1617

1718
# create db model
1819
task_uid = greate_task_uid()
20+
21+
# Prepare skip_meta value (use provided value or default to False)
22+
skip_meta_value = dataFormatTask.skip_meta if dataFormatTask.skip_meta is not None else False
23+
1924
data_format_task_db = DataFormatTask(name=dataFormatTask.name,
2025
des=dataFormatTask.des,
2126
from_csg_hub_dataset_name=dataFormatTask.from_csg_hub_dataset_name,
@@ -30,9 +35,12 @@ def create_formatify_task(db_session: Session, dataFormatTask: DataFormatTaskReq
3035
to_data_type=dataFormatTask.to_data_type,
3136
mineru_api_url=dataFormatTask.mineru_api_url,
3237
mineru_backend=dataFormatTask.mineru_backend,
38+
skip_meta=skip_meta_value,
3339
task_uid=task_uid,
3440
task_status=DataFormatTaskStatusEnum.WAITING.value,
3541
owner_id=user_id)
42+
43+
logger.info(f"Created task with skip_meta={skip_meta_value} for task {task_uid}")
3644

3745
db_session.add(data_format_task_db)
3846
db_session.commit()
@@ -76,7 +84,7 @@ def update_formatify_task(db_session: Session, formatify_id: int, dataFormatTask
7684
'name', 'des', 'from_csg_hub_dataset_name', 'from_csg_hub_dataset_id',
7785
'from_csg_hub_dataset_branch', 'from_data_type', 'to_csg_hub_dataset_name',
7886
'to_csg_hub_dataset_id', 'to_csg_hub_dataset_default_branch', 'to_data_type',
79-
'mineru_api_url', 'mineru_backend'
87+
'mineru_api_url', 'mineru_backend' # 'skip_meta' temporarily removed
8088
}
8189
for field in updatable_fields:
8290
value = getattr(dataFormatTaskRequest, field, None)

data_server/formatify/FormatifyModels.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class DataFormatTask(Base):
5555
owner_id = Column(Integer, comment="所属用户")
5656
mineru_api_url = Column(String(500), comment="MinerU API 地址")
5757
mineru_backend = Column(String(100), comment="MinerU 后端类型")
58+
skip_meta = Column(Boolean, default=False, comment="如果为 True,则生成并上传 meta.log 文件;如果为 False,则不生成 meta.log 文件")
5859
start_run_at = Column(DateTime, comment='运行开始时间')
5960
end_run_at = Column(DateTime, comment='运行结束时间')
6061
created_at = Column(DateTime, default=datetime.datetime.now, comment='任务创建时间')
@@ -81,6 +82,7 @@ def to_dict(self):
8182
"owner_id": self.owner_id,
8283
"mineru_api_url": self.mineru_api_url,
8384
"mineru_backend": self.mineru_backend,
85+
"skip_meta": getattr(self, 'skip_meta', False), # Use getattr to handle missing column
8486
"start_run_at": self.start_run_at,
8587
"end_run_at": self.end_run_at,
8688
"created_at": self.created_at.strftime("%Y-%m-%d %H:%M:%S") if self.created_at else None,

data_server/formatify/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ class DataFormatTaskRequest(BaseModel):
1717
to_data_type: Optional[int] = None
1818
mineru_api_url: Optional[str] = None
1919
mineru_backend: Optional[str] = None
20+
skip_meta: Optional[bool] = False # 如果为 True,则生成并上传 meta.log 文件;如果为 False,则不生成 meta.log 文件

0 commit comments

Comments
 (0)