11import shutil
22from data_celery .main import celery_app
3- import time ,os ,json
3+ import time , os , json
44from data_server .database .session import get_sync_session
55from sqlalchemy .orm import Session
6- from data_server .datasource .DatasourceModels import CollectionTask ,DataSourceTaskStatusEnum ,DataSourceTypeEnum
6+ from data_server .datasource .DatasourceModels import CollectionTask , DataSourceTaskStatusEnum , DataSourceTypeEnum
77from data_celery .db .DatasourceManager import get_collection_task_by_uid
88from data_celery .utils import (ensure_directory_exists ,
99 get_current_ip , get_current_time , get_datasource_temp_parquet_dir ,
1010 ensure_directory_exists_remove , get_datasource_csg_hub_server_dir )
1111from data_server .datasource .services .datasource import get_datasource_connector
12- from data_celery .mongo_tools .tools import insert_datasource_run_task_log_info ,insert_datasource_run_task_log_error
12+ from data_celery .mongo_tools .tools import insert_datasource_run_task_log_info , insert_datasource_run_task_log_error
1313from data_engine .exporter .load import load_exporter
1414from pathlib import Path
1515import pandas as pd
1616from loguru import logger
1717
18+ # Import BSON types for MongoDB ObjectId conversion
19+ from datetime import datetime , date
20+
21+ try :
22+ from bson import ObjectId
23+ from bson .errors import InvalidId
24+
25+ BSON_AVAILABLE = True
26+ except ImportError :
27+ BSON_AVAILABLE = False
28+ ObjectId = None
29+
30+
31+ def convert_mongo_document (doc ):
32+ """
33+ Convert MongoDB document to JSON-serializable format.
34+ Handles ObjectId, datetime, and other BSON types.
35+ """
36+ if isinstance (doc , dict ):
37+ return {key : convert_mongo_document (value ) for key , value in doc .items ()}
38+ elif isinstance (doc , list ):
39+ return [convert_mongo_document (item ) for item in doc ]
40+ elif BSON_AVAILABLE and isinstance (doc , ObjectId ):
41+ return str (doc )
42+ elif isinstance (doc , (datetime , date )):
43+ return doc .isoformat ()
44+ else :
45+ return doc
46+
1847
1948@celery_app .task (name = "collection_mongo_task" )
20- def collection_mongo_task (task_uid : str ,user_name : str ,user_token : str ):
49+ def collection_mongo_task (task_uid : str , user_name : str , user_token : str ):
2150 """
2251 Collection task
2352 Args:
@@ -64,7 +93,7 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
6493 collection_task .start_run_at = get_current_time ()
6594 db_session .commit ()
6695 # Read data source
67- extra_config = collection_task .datasource .extra_config
96+ extra_config = json . loads ( collection_task .datasource .extra_config )
6897 if not extra_config :
6998 collection_task .task_status = DataSourceTaskStatusEnum .ERROR .value
7099 insert_datasource_run_task_log_error (task_uid , f"Task with UID { task_uid } has no extra configuration." )
@@ -76,13 +105,19 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
76105 return False
77106 mongo_config = extra_config ["mongo" ]
78107 max_line = 10000
79- csg_hub_dataset_id = 0
108+ csg_hub_dataset_id = ''
80109 csg_hub_dataset_default_branch = "main"
81110 if "csg_hub_dataset_default_branch" in extra_config :
82111 csg_hub_dataset_default_branch = extra_config ["csg_hub_dataset_default_branch" ]
83- if "csg_hub_dataset_id" in extra_config and isinstance ( extra_config [ 'csg_hub_dataset_id' ], int ) :
112+ if "csg_hub_dataset_id" in extra_config :
84113 csg_hub_dataset_id = extra_config ["csg_hub_dataset_id" ]
85- if csg_hub_dataset_id <= 0 :
114+ # Read csg_hub_dataset_name if provided, otherwise use default branch
115+ csg_hub_dataset_name = None
116+ if "csg_hub_dataset_name" in extra_config and extra_config ['csg_hub_dataset_name' ] != '' :
117+ csg_hub_dataset_name = extra_config ["csg_hub_dataset_name" ]
118+ else :
119+ csg_hub_dataset_name = csg_hub_dataset_default_branch
120+ if csg_hub_dataset_id is None or csg_hub_dataset_id == '' :
86121 collection_task .task_status = DataSourceTaskStatusEnum .ERROR .value
87122 insert_datasource_run_task_log_error (task_uid , f"Task with UID { task_uid } has no CSG Hub Dataset ID." )
88123 return False
@@ -96,7 +131,8 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
96131 connector = get_datasource_connector (collection_task .datasource )
97132 if not connector .test_connection ():
98133 collection_task .task_status = DataSourceTaskStatusEnum .ERROR .value
99- insert_datasource_run_task_log_error (task_uid , f"Task with UID { task_uid } failed to connect to the database." )
134+ insert_datasource_run_task_log_error (task_uid ,
135+ f"Task with UID { task_uid } failed to connect to the database." )
100136 return False
101137
102138 total_count = 0
@@ -117,11 +153,22 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
117153 while True :
118154 # Execute pagination query (specific implementation depends on connector details)
119155 rows = connector .query_collection (collection_name , offset = (page - 1 ) * page_size ,
120- limit = page_size )
156+ limit = page_size )
121157
122158 if not rows :
123159 break # If there is no more data, exit the loop
124160
161+ # Add rows to buffer, converting MongoDB types to JSON-serializable format
162+ if isinstance (rows , list ):
163+ # Convert each document to handle ObjectId and other BSON types
164+ converted_rows = [convert_mongo_document (row ) for row in rows ]
165+ rows_buffer .extend (converted_rows )
166+ else :
167+ # If rows is a generator or iterator, convert to list first
168+ rows_list = list (rows )
169+ converted_rows = [convert_mongo_document (row ) for row in rows_list ]
170+ rows_buffer .extend (converted_rows )
171+
125172 # If the number of rows in the buffer list reaches or exceeds the maximum number of rows, write to the file and clear the buffer list
126173 if len (rows_buffer ) >= max_line :
127174 file_path = os .path .join (table_dir , f"data_{ file_index :04d} .parquet" )
@@ -130,7 +177,8 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
130177 current_file_row_count += len (rows_buffer )
131178 records_count += len (rows_buffer )
132179 collection_task .records_count = records_count
133- insert_datasource_run_task_log_info (task_uid , f"Task with UID { task_uid } get data count { records_count } ..." )
180+ insert_datasource_run_task_log_info (task_uid ,
181+ f"Task with UID { task_uid } get data count { records_count } ..." )
134182 db_session .commit ()
135183 file_index += 1
136184 rows_buffer = [] # Clear the buffer list
@@ -143,16 +191,18 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
143191 current_file_row_count += len (rows_buffer )
144192 records_count += len (rows_buffer )
145193 collection_task .records_count = records_count
146- insert_datasource_run_task_log_info (task_uid , f"Task with UID { task_uid } get data count { records_count } ..." )
194+ insert_datasource_run_task_log_info (task_uid ,
195+ f"Task with UID { task_uid } get data count { records_count } ..." )
147196 db_session .commit ()
148197
149198 except Exception as e :
150- insert_datasource_run_task_log_error (task_uid , f"Task with UID { task_uid } failed to get collection document { collection_name } : { e } " )
199+ insert_datasource_run_task_log_error (task_uid ,
200+ f"Task with UID { task_uid } failed to get collection document { collection_name } : { e } " )
151201 collection_task .records_count = total_count
152202 collection_task .total_count = total_count
153203 db_session .commit ()
154204 upload_to_csg_hub_server (csg_hub_dataset_id ,
155- csg_hub_dataset_default_branch ,
205+ csg_hub_dataset_name ,
156206 user_name , user_token , db_session ,
157207 collection_task , datasource_temp_parquet_dir ,
158208 datasource_csg_hub_server_dir )
@@ -176,15 +226,15 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
176226 return True
177227
178228
179- def upload_to_csg_hub_server (csg_hub_dataset_id : int ,
229+ def upload_to_csg_hub_server (csg_hub_dataset_id : str ,
180230 csg_hub_dataset_default_branch : str ,
181- user_name : str ,user_token : str ,db_session : Session ,
182- collection_task : CollectionTask ,datasource_temp_json_dir : str ,
231+ user_name : str , user_token : str , db_session : Session ,
232+ collection_task : CollectionTask , datasource_temp_json_dir : str ,
183233 datasource_csg_hub_server_dir : str ):
184234 """
185235 Upload to CSG Hub server
186236 Args:
187- csg_hub_dataset_id (int ): CSG Hub dataset ID
237+ csg_hub_dataset_id (str ): CSG Hub dataset ID
188238 csg_hub_dataset_default_branch (str): CSG Hub dataset default branch
189239 user_name (str): User name
190240 user_token (str): User token
@@ -198,26 +248,54 @@ def upload_to_csg_hub_server(csg_hub_dataset_id: int,
198248 # Upload to CSG Hub server
199249
200250 ensure_directory_exists_remove (datasource_csg_hub_server_dir )
201- insert_datasource_run_task_log_info (collection_task .task_uid , f"Starting upload csg hub-server the task[{ collection_task .task_uid } ]..." )
251+ insert_datasource_run_task_log_info (collection_task .task_uid ,
252+ f"Starting upload csg hub-server the task[{ collection_task .task_uid } ]..." )
202253 exporter = load_exporter (
203254 export_path = datasource_temp_json_dir ,
204- repo_id = str ( csg_hub_dataset_id ) ,
255+ repo_id = csg_hub_dataset_id ,
205256 branch = csg_hub_dataset_default_branch ,
206257 user_name = user_name ,
207258 user_token = user_token ,
208259 work_dir = datasource_csg_hub_server_dir
209260 )
210- upload_path : Path = Path (datasource_csg_hub_server_dir )
261+ upload_path : Path = Path (datasource_temp_json_dir )
262+ # Check whether the uploaded directory exists and is not empty
263+ if not os .path .exists (upload_path ):
264+ insert_datasource_run_task_log_error (collection_task .task_uid ,
265+ f"the task[{ collection_task .task_uid } ] upload csg hub-server fail: upload path { upload_path } does not exist" )
266+ return False
267+
268+ # List all files in the upload directory for debugging
269+ file_list = []
270+ for root , dirs , files in os .walk (upload_path ):
271+ for file in files :
272+ file_list .append (os .path .join (root , file ))
273+ insert_datasource_run_task_log_info (collection_task .task_uid ,
274+ f"Files to upload: { len (file_list )} files found in { upload_path } " )
275+ if len (file_list ) == 0 :
276+ insert_datasource_run_task_log_error (collection_task .task_uid ,
277+ f"the task[{ collection_task .task_uid } ] upload csg hub-server fail: upload path { upload_path } is empty" )
278+ return False
279+
211280 output_branch_name = exporter .export_from_files (upload_path )
212281
213282 if output_branch_name :
214283 collection_task .csg_hub_branch = output_branch_name
215284 db_session .commit ()
216- insert_datasource_run_task_log_info (collection_task .task_uid , f"the task[{ collection_task .task_uid } ] upload csg hub-server success..." )
285+ insert_datasource_run_task_log_info (collection_task .task_uid ,
286+ f"the task[{ collection_task .task_uid } ] upload csg hub-server success..." )
217287 else :
218- insert_datasource_run_task_log_error (collection_task .task_uid , f"the task[{ collection_task .task_uid } ] upload csg hub-server fail..." )
288+ insert_datasource_run_task_log_error (collection_task .task_uid ,
289+ f"the task[{ collection_task .task_uid } ] upload csg hub-server fail: export_from_files returned None" )
219290 except Exception as e :
220291 logger .error (e )
221- insert_datasource_run_task_log_error (collection_task .task_uid ,f"Task UID { collection_task .task_uid } Error occurred while uploading to CSG Hub server: { e } " )
292+ error_msg = str (e )
293+ # Check if this is a "nothing to commit" error
294+ if "nothing to commit" in error_msg .lower () or "working tree clean" in error_msg .lower ():
295+ insert_datasource_run_task_log_error (collection_task .task_uid ,
296+ f"the task[{ collection_task .task_uid } ] upload csg hub-server fail: No files to commit. This may happen if: 1) Files are already committed in the branch, 2) Files are ignored by .gitignore, 3) File paths are incorrect. Error: { error_msg } " )
297+ else :
298+ insert_datasource_run_task_log_error (collection_task .task_uid ,
299+ f"Task UID { collection_task .task_uid } Error occurred while uploading to CSG Hub server: { error_msg } " )
222300 return False
223301 return True
0 commit comments