Skip to content

Commit fac4682

Browse files
committed
Merge branch 'develop_930' of github.com:ModelEngine-Group/data-platform into develop_930
2 parents 800834c + 968fed0 commit fac4682

File tree

5 files changed

+21
-21
lines changed

5 files changed

+21
-21
lines changed

backend/services/data-cleaning-service/src/main/java/com/dataengine/cleaning/application/service/CleaningTaskService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private void executeTask(CleaningTask task, CreateCleaningTaskRequest request, S
9696
private void prepareTask(CleaningTask task, List<OperatorInstance> instances) {
9797
TaskProcess process = new TaskProcess();
9898
process.setInstanceId(task.getId());
99-
process.setDatasetPath(DATASET_PATH + "/" + task.getSrcDatasetId());
99+
process.setDatasetPath(FLOW_PATH + "/" + task.getId() + "/dataset.jsonl");
100100
process.setExportPath(DATASET_PATH + "/" + task.getDestDatasetId());
101101
process.setExecutorType(ExecutorType.DATA_PLATFORM.getValue());
102102
process.setProcess(instances.stream()

runtime/python-executor/data_platform/core/constant.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
class Fields(object):
44
result = 'execute_result'
5-
flow_id = 'flow_id'
5+
instance_id = 'instance_id'
66
export_path = 'export_path'
77

88

runtime/python-executor/data_platform/core/dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def preprocess_dataset(dataset: rd.Dataset, cfg) -> rd.Dataset:
7070

7171
def process_batch_arrow(table: pa.Table, names_list=None) -> pa.Table:
7272
name2value_table = {
73-
Fields.flow_id: cfg.flow_id,
73+
Fields.instance_id: cfg.instance_id,
7474
Fields.export_path: cfg.export_path
7575
}
7676

runtime/python-executor/data_platform/sqlite_manager/persistence_atction.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
from data_platform.sqlite_manager.sqlite_manager import SQLiteManager
1212

1313

14-
def delete_task_info(flow_id: str):
15-
db_path = f"/flow/{flow_id}.db"
14+
def delete_task_info(instance_id: str):
15+
db_path = f"/flow/{instance_id}.db"
1616
try:
1717
os.remove(db_path)
1818
except Exception as e:
19-
logger.warning(f"delete database for flow:{flow_id} error", e)
19+
logger.warning(f"delete database for flow:{instance_id} error", e)
2020

2121

2222
class TaskInfoPersistence:
@@ -31,7 +31,7 @@ def load_sql_dict():
3131
return json.load(f)
3232

3333
def persistence_task_info(self, sample: Dict[str, Any]):
34-
flow_id = str(sample.get("flow_id"))
34+
instance_id = str(sample.get("instance_id"))
3535
meta_file_name = str(sample.get("sourceFileName"))
3636
meta_file_type = str(sample.get("sourceFileType"))
3737
meta_file_id = int(sample.get("sourceFileId"))
@@ -49,17 +49,17 @@ def persistence_task_info(self, sample: Dict[str, Any]):
4949
incremental = str(sample.get("incremental") if sample.get("incremental") else '')
5050
child_id = sample.get("childId")
5151
slice_num = sample.get('slice_num', 0)
52-
insert_data = [flow_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size,
52+
insert_data = [instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size,
5353
file_type, file_name, file_path, source_file_modify_time, status, operator_id, error_code,
5454
incremental, child_id, slice_num]
55-
self.insert_clean_result(insert_data, flow_id)
55+
self.insert_clean_result(insert_data, instance_id)
5656

57-
def insert_clean_result(self, insert_data, flow_id):
57+
def insert_clean_result(self, insert_data, instance_id):
5858
retries = 0
5959
max_retries = 20
6060
retry_delay = 1
6161
while retries <= max_retries:
62-
db_path = f"/flow/{flow_id}.db"
62+
db_path = f"/flow/{instance_id}.db"
6363
try:
6464
insert_sql = str(self.sql_dict.get("insert_sql"))
6565
create_tables_sql = str(self.sql_dict.get("create_tables_sql"))
@@ -78,21 +78,21 @@ def insert_clean_result(self, insert_data, flow_id):
7878
raise RuntimeError(82000, str(e)) from None
7979
raise Exception("Max retries exceeded")
8080

81-
def query_task_info(self, flow_ids: list[str]):
81+
def query_task_info(self, instance_ids: list[str]):
8282
result = {}
8383
create_tables_sql = self.sql_dict.get("create_tables_sql")
8484
query_sql = self.sql_dict.get("query_sql")
8585
current_result = None
86-
for flow_id in flow_ids:
87-
db_path = f"/flow/{flow_id}.db"
86+
for instance_id in instance_ids:
87+
db_path = f"/flow/{instance_id}.db"
8888
try:
8989
with SQLiteManager.create_connect(db_path) as conn:
9090
conn.execute("PRAGMA journal_mode=WAL")
9191
conn.execute(create_tables_sql)
92-
cursor = conn.execute(query_sql, [flow_id])
92+
cursor = conn.execute(query_sql, [instance_id])
9393
current_result = cursor.fetchall()
9494
except Exception as e:
95-
logger.warning("flow_id: {}, query job result error: {}", flow_id, str(e))
95+
logger.warning("instance_id: {}, query job result error: {}", instance_id, str(e))
9696
if current_result:
97-
result[flow_id] = current_result
97+
result[instance_id] = current_result
9898
return result
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
2-
"query_sql": "SELECT * FROM task_instance_info WHERE flow_id = ?",
2+
"query_sql": "SELECT * FROM task_instance_info WHERE instance_id = ?",
33
"db_path": "/flow/sqlite.db",
4-
"insert_sql": "INSERT INTO task_instance_info (flow_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size, file_type, file_name, file_path, source_file_modify_time, status, operator_id, error_code, incremental, childId, slice_num) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
5-
"create_tables_sql": "CREATE TABLE IF NOT EXISTS task_instance_info (flow_id TEXT, meta_file_name TEXT, meta_file_type TEXT, meta_file_id INTEGER, meta_file_size TEXT, file_id INTEGER, file_size TEXT, file_type TEXT, file_name TEXT, file_path TEXT, source_file_modify_time INTEGER, status INTEGER, operator_id TEXT, error_code TEXT, incremental TEXT, childId INTEGER, slice_num INTEGER DEFAULT 0)",
6-
"delete_task_instance_sql": "DELETE FROM task_instance_info WHERE flow_id = ?"
4+
"insert_sql": "INSERT INTO task_instance_info (instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size, file_type, file_name, file_path, source_file_modify_time, status, operator_id, error_code, incremental, childId, slice_num) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
5+
"create_tables_sql": "CREATE TABLE IF NOT EXISTS task_instance_info (instance_id TEXT, meta_file_name TEXT, meta_file_type TEXT, meta_file_id INTEGER, meta_file_size TEXT, file_id INTEGER, file_size TEXT, file_type TEXT, file_name TEXT, file_path TEXT, source_file_modify_time INTEGER, status INTEGER, operator_id TEXT, error_code TEXT, incremental TEXT, childId INTEGER, slice_num INTEGER DEFAULT 0)",
6+
"delete_task_instance_sql": "DELETE FROM task_instance_info WHERE instance_id = ?"
77
}

0 commit comments

Comments
 (0)