Skip to content

Commit a7c481b

Browse files
committed
feature: change sqlite to sqlalchemy+pymysql
1 parent 69cf4c9 commit a7c481b

File tree

15 files changed

+283
-196
lines changed

15 files changed

+283
-196
lines changed

runtime/operators/filter/img_duplicated_images_cleaner/process.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
import cv2
1919
from Crypto.Hash import MD5
20+
from sqlalchemy import text
2021

21-
from data_platform.sqlite_manager.sqlite_manager import SQLiteManager
22+
from data_platform.sql_manager.sql_manager import SQLManager
2223
from data_platform.common.utils import get_now_time
2324
from data_platform.common.utils import bytes_to_numpy, numpy_to_bytes
2425
from data_platform.core.base_op import Filter
@@ -77,24 +78,24 @@ def execute_sql(self, md5: str, file_name: str,
7778
query_sql = str(self.sql_dict.get("query_sql"))
7879
insert_sql = str(self.sql_dict.get("insert_sql"))
7980
create_tables_sql = str(self.sql_dict.get("create_tables_sql"))
80-
db_path = str(self.sql_dict.get("db_path"))
81-
query_sql_params = [self.task_uuid, md5]
82-
insert_sql_params = [self.task_uuid, md5, file_name.encode("utf-8"), timestamp]
81+
query_sql_params = {"task_uuid": self.task_uuid, "file_feature": md5}
82+
insert_sql_params = {"task_uuid": self.task_uuid, "file_feature": md5, "file_name": file_name.encode("utf-8"),
83+
"timestamp": timestamp}
8384

84-
db_manager = SQLiteManager()
85+
db_manager = SQLManager()
8586
try:
86-
self.conn = db_manager.create_connect(db_path)
87+
self.conn = db_manager.create_connect()
8788
except Exception as e:
8889
logger.error("fileName: %s, database connection failed: %s", file_name, str(e))
8990
raise RuntimeError(82000, str(e)) from None
9091

9192
with self.conn as connection:
92-
connection.execute(create_tables_sql)
93+
connection.execute(text(create_tables_sql))
9394
# 判断是否有重复文件
94-
result = connection.execute(query_sql, query_sql_params).fetchall()
95+
result = connection.execute(text(query_sql, query_sql_params)).fetchall()
9596
# 查询记录为空,无重复图片, 插入新文件特征
9697
if not result:
97-
connection.execute(insert_sql, insert_sql_params)
98+
connection.execute(text(insert_sql, insert_sql_params))
9899
return img_bytes
99100
logger.info("fileName: %s, method: Duplicate ImagesCleaner. The image is duplicated and filtered ",
100101
file_name)
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
{
2-
"query_sql": "SELECT * FROM operator_duplicate_img_features WHERE task_uuid = ? AND file_feature = ?",
3-
"db_path": "/flow/sqlite.db",
4-
"insert_sql": "INSERT INTO operator_duplicate_img_features (task_uuid, file_feature,file_name,timestamp) VALUES (?,?,?,?)",
5-
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operator_duplicate_img_features (id INTEGER PRIMARY KEY AUTOINCREMENT,task_uuid TEXT,file_feature TEXT,file_name TEXT,timestamp DATETIME);"
2+
"query_sql": "SELECT * FROM operator_duplicate_img_features WHERE task_uuid = :task_uuid AND file_feature = :file_feature",
3+
"insert_sql": "INSERT INTO operator_duplicate_img_features (task_uuid, file_feature, file_name, timestamp) VALUES (:task_uuid, :file_feature, :file_name, :timestamp)",
4+
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operator_duplicate_img_features (id INT AUTO_INCREMENT PRIMARY KEY,task_uuid VARCHAR(255),file_feature TEXT,file_name TEXT,timestamp DATETIME);"
65
}

runtime/operators/filter/img_similar_images_cleaner/process.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121
import cv2
2222
import numpy as np
23+
from sqlalchemy import text
2324

24-
from data_platform.sqlite_manager.sqlite_manager import SQLiteManager
25+
from data_platform.sql_manager.sql_manager import SQLManager
2526
from data_platform.common.utils import get_now_time
2627
from data_platform.common.utils import bytes_to_numpy
2728
from data_platform.core.base_op import Filter
@@ -149,36 +150,48 @@ def execute_sql(self, p_hash: str, des_matrix: np.ndarray, file_name: str,
149150
des_matrix_binary = zlib.compress(des_matrix.tobytes()) # 使用 zlib 进行压缩数组
150151
timestamp = get_now_time('Asia/Shanghai', '%Y-%m-%d %H:%M:%S', file_name,
151152
"ImgSimilarCleaner")
152-
query_sql = str(self.sql_dict.get("query_sql"))
153153
query_task_uuid_sql = str(self.sql_dict.get("query_task_uuid_sql"))
154154
insert_sql = str(self.sql_dict.get("insert_sql"))
155155
create_tables_sql = str(self.sql_dict.get("create_tables_sql"))
156-
db_path = str(self.sql_dict.get("db_path"))
157156

158-
db_manager = SQLiteManager()
157+
db_manager = SQLManager()
159158
try:
160-
self.conn = db_manager.create_connect(db_path)
159+
self.conn = db_manager.create_connect()
161160
except Exception as e:
162161
logger.error("fileName: %s, database connection failed: %s", file_name, str(e))
163162
raise RuntimeError(82000, str(e)) from None
164163

165164
with self.conn as connection:
166165
"""从数据库中获取文件特征、比较相似度,插入新的文件特征"""
167-
connection.execute(create_tables_sql)
168-
result = connection.execute(query_task_uuid_sql, [self.task_uuid]).fetchall()
166+
with self.conn as connection:
167+
connection.execute(text(create_tables_sql))
168+
result = connection.execute(query_task_uuid_sql, {"task_uuid": self.task_uuid}).fetchall()
169169
total_count = len(result)
170-
for i in range(0, total_count, self.page_size):
171-
rows = connection.execute(query_sql, [self.task_uuid, self.page_size, i]).fetchall()
172-
# 对应任务uuid,最后一页没有数据,跳出循环
173-
if not rows:
174-
break
175-
# 对两张图片进行相似度比较
176-
if self.determine_similar_images(rows, p_hash, des_matrix, file_name):
170+
if self.has_similar_images(connection, des_matrix, file_name, p_hash, total_count):
177171
return np.array([])
178-
connection.execute(insert_sql, [self.task_uuid, p_hash, des_matrix_binary, str(des_matrix.shape),
179-
file_name.encode("utf-8").hex(), timestamp])
172+
173+
insert_data = {
174+
"task_uuid": self.task_uuid,
175+
"p_hash": p_hash,
176+
"des_matrix": des_matrix_binary,
177+
"matrix_shape": str(des_matrix.shape),
178+
"file_name": file_name.encode("utf-8").hex(),
179+
"timestamp": timestamp
180+
}
181+
connection.execute(text(insert_sql),insert_data)
180182
return img
181183

184+
def has_similar_images(self, connection, des_matrix, file_name, p_hash, total_count):
185+
for i in range(0, total_count, self.page_size):
186+
query_sql = self.sql_dict.get("query_sql")
187+
rows = connection.execute(text(query_sql), {"task_uuid": self.task_uuid, "ge": self.page_size, "le": i}).fetchall()
188+
# 对应任务uuid,最后一页没有数据,跳出循环
189+
if not rows:
190+
break # 对两张图片进行相似度比较
191+
if self.determine_similar_images(rows, p_hash, des_matrix, file_name):
192+
return True
193+
return False
194+
182195
def determine_similar_images(self, file_features: List, p_hash: str, des_matrix: np.ndarray,
183196
file_name: str) -> bool:
184197
"""根据文件特征,判断两张图片相似度是否超过指定阈值"""
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
{
2-
"query_sql": "SELECT * FROM operator_similar_img_features WHERE task_uuid = ? ORDER BY timestamp LIMIT ? OFFSET ?",
3-
"insert_sql": "INSERT INTO operator_similar_img_features (task_uuid,p_hash,des_matrix,matrix_shape,file_name,timestamp) VALUES (?,?,?,?,?,?)",
4-
"query_task_uuid_sql": "SELECT * FROM operator_similar_img_features WHERE task_uuid = ?",
5-
"db_path": "/flow/sqlite.db",
6-
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operator_similar_img_features (id INTEGER PRIMARY KEY AUTOINCREMENT,task_uuid TEXT,p_hash TEXT,des_matrix BLOB,matrix_shape TEXT,file_name TEXT,timestamp DATETIME);"
2+
"query_sql": "SELECT * FROM operator_similar_img_features WHERE task_uuid = :task_uuid ORDER BY timestamp LIMIT :ge OFFSET :le",
3+
"insert_sql": "INSERT INTO operator_similar_img_features (task_uuid,p_hash,des_matrix,matrix_shape,file_name,timestamp) VALUES (:task_uuid,:p_hash,:des_matrix,:matrix_shape,:file_name,:timestamp)",
4+
"query_task_uuid_sql": "SELECT * FROM operator_similar_img_features WHERE task_uuid = :task_uuid",
5+
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operator_similar_img_features (id INT AUTO_INCREMENT PRIMARY KEY,task_uuid VARCHAR(255),p_hash TEXT,des_matrix BLOB,matrix_shape TEXT,file_name TEXT,timestamp DATETIME);"
76
}

runtime/operators/filter/remove_duplicate_file/process.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
import numpy as np
1919
from datasketch import MinHash
20+
from sqlalchemy import text
2021

21-
from data_platform.sqlite_manager.sqlite_manager import SQLiteManager
22+
from data_platform.sql_manager.sql_manager import SQLManager
2223
from data_platform.common.utils import get_now_time
2324
from data_platform.core.base_op import Filter
2425

@@ -91,34 +92,43 @@ def execute_sql(self, text_minhash: MinHash, file_name: str,
9192
minhash_values = text_minhash.hashvalues
9293
# 将 NumPy 数组转换为字符串
9394
minhash_values_string = np.array2string(minhash_values)
94-
95-
query_sql = self.sql_dict.get("query_sql")
9695
query_task_uuid_sql = self.sql_dict.get("query_task_uuid_sql")
9796
insert_sql = self.sql_dict.get("insert_sql")
98-
db_path = self.sql_dict.get("db_path")
9997
create_tables_sql = self.sql_dict.get("create_tables_sql")
100-
db_manager = SQLiteManager()
98+
db_manager = SQLManager()
10199
try:
102-
self.conn = db_manager.create_connect(db_path)
100+
self.conn = db_manager.create_connect()
103101
except Exception as e:
104102
logger.error("fileName: %s, database connection failed: %s", file_name, str(e))
105103
raise RuntimeError(82000, str(e)) from None
106104
with self.conn as connection:
107-
connection.execute(create_tables_sql)
108-
result = connection.execute(query_task_uuid_sql, [self.task_uuid]).fetchall()
105+
connection.execute(text(create_tables_sql))
106+
result = connection.execute(query_task_uuid_sql, {"task_uuid": self.task_uuid}).fetchall()
109107
total_count = len(result)
110-
for i in range(0, total_count, self.page_size):
111-
rows = connection.execute(query_sql, [self.task_uuid, self.page_size, i]).fetchall()
112-
# 对应任务uuid,最后一页没有数据,跳出循环
113-
if not rows:
114-
break
115-
# 对两个文本进行相似度比较
116-
if self.determine_similar_text(rows, text_minhash, file_name):
117-
return ""
118-
connection.execute(insert_sql, [self.task_uuid, minhash_values_string, file_name.encode("utf-8").hex(),
119-
timestamp])
108+
if self.has_similar_text(connection, file_name, text_minhash, total_count):
109+
return ""
110+
insert_data = {
111+
"task_uuid": self.task_uuid,
112+
"file_feature": minhash_values_string,
113+
"file_name": file_name.encode("utf-8").hex(),
114+
"timestamp": timestamp
115+
}
116+
connection.execute(text(insert_sql), insert_data)
120117
return input_text
121118

119+
def has_similar_text(self, connection, file_name, text_minhash, total_count) -> bool:
120+
query_sql = self.sql_dict.get("query_sql")
121+
for i in range(0, total_count, self.page_size):
122+
rows = connection.execute(
123+
text(query_sql), {"task_uuid": self.task_uuid, "ge": self.page_size, "le": i}).fetchall()
124+
# 对应任务uuid,最后一页没有数据,跳出循环
125+
if not rows:
126+
break
127+
# 对两个文本进行相似度比较
128+
if self.determine_similar_text(rows, text_minhash, file_name):
129+
return True
130+
return False
131+
122132
def determine_similar_text(self, file_features: List, text_minhash: MinHash, file_name: str) -> bool:
123133
for signature in file_features:
124134
# 历史文件特征和历史文件名称
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
{
2-
"query_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = ? ORDER BY timestamp LIMIT ? OFFSET ?",
3-
"db_path": "/flow/sqlite.db",
4-
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operators_similar_text_features (id INTEGER PRIMARY KEY AUTOINCREMENT, task_uuid TEXT,file_feature TEXT,file_name TEXT,timestamp DATETIME);",
5-
"insert_sql": "INSERT INTO operators_similar_text_features (task_uuid, file_feature, file_name, timestamp) VALUES (?, ?, ?, ?)",
6-
"query_task_uuid_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = ?"
2+
"query_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = :task_uuid ORDER BY timestamp LIMIT :ge OFFSET :le",
3+
"create_tables_sql": "CREATE TABLE IF NOT EXISTS operators_similar_text_features (id INT AUTO_INCREMENT PRIMARY KEY, task_uuid VARCHAR(255),file_feature TEXT,file_name TEXT,timestamp DATETIME);",
4+
"insert_sql": "INSERT INTO operators_similar_text_features (task_uuid, file_feature, file_name, timestamp) VALUES (:task_uuid, :file_feature, :file_name, :timestamp)",
5+
"query_task_uuid_sql": "SELECT * FROM operators_similar_text_features WHERE task_uuid = :task_uuid"
76
}

runtime/operators/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ pytz==2025.2
1717
six==1.17.0
1818
xmltodict==1.0.2
1919
zhconv==1.4.3
20+
sqlalchemy==2.0.40
21+
pymysql==1.1.1

runtime/python-executor/data_platform/core/base_op.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from data_platform.common.utils.registry import Registry
1313
from data_platform.common.utils import check_valid_path
1414
from data_platform.core.constant import Fields
15-
from data_platform.sqlite_manager.persistence_atction import TaskInfoPersistence
15+
from data_platform.sql_manager.persistence_atction import TaskInfoPersistence
1616

1717
OPERATORS = Registry('Operators')
1818

runtime/python-executor/data_platform/sqlite_manager/__init__.py renamed to runtime/python-executor/data_platform/sql_manager/__init__.py

File renamed without changes.
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import json
4+
import time
5+
import os
6+
from pathlib import Path
7+
from typing import Dict, Any
8+
9+
from loguru import logger
10+
from sqlalchemy import text
11+
12+
from data_platform.sql_manager.sql_manager import SQLManager
13+
14+
15+
class TaskInfoPersistence:
16+
def __init__(self):
17+
self.sql_dict = self.load_sql_dict()
18+
19+
@staticmethod
20+
def load_sql_dict():
21+
"""获取sql语句"""
22+
sql_config_path = str(Path(__file__).parent / 'sql' / 'sql_config.json')
23+
with open(sql_config_path, 'r', encoding='utf-8') as f:
24+
return json.load(f)
25+
26+
def persistence_task_info(self, sample: Dict[str, Any]):
27+
instance_id = str(sample.get("instance_id"))
28+
meta_file_name = str(sample.get("sourceFileName"))
29+
meta_file_type = str(sample.get("sourceFileType"))
30+
meta_file_id = int(sample.get("sourceFileId"))
31+
meta_file_size = str(sample.get("sourceFileSize"))
32+
file_id = int(sample.get("fileId"))
33+
file_size = str(sample.get("fileSize"))
34+
file_type = str(sample.get("fileType"))
35+
file_name = str(sample.get("fileName"))
36+
file_path = str(sample.get("filePath"))
37+
status = int(sample.get("execute_status"))
38+
failed_reason = sample.get("failed_reason")
39+
operator_id = str(failed_reason.get("op_name")) if failed_reason else ""
40+
error_code = str(failed_reason.get("error_code")) if failed_reason else ""
41+
incremental = str(sample.get("incremental") if sample.get("incremental") else '')
42+
child_id = sample.get("childId")
43+
slice_num = sample.get('slice_num', 0)
44+
insert_data = {
45+
"instance_id": instance_id,
46+
"meta_file_id": meta_file_id,
47+
"meta_file_type": meta_file_type,
48+
"meta_file_name": meta_file_name,
49+
"meta_file_size": meta_file_size,
50+
"file_id": file_id,
51+
"file_size": file_size,
52+
"file_type": file_type,
53+
"file_name": file_name,
54+
"file_path": file_path,
55+
"status": status,
56+
"operator_id": operator_id,
57+
"error_code": error_code,
58+
"incremental": incremental,
59+
"child_id": child_id,
60+
"slice_num": slice_num,
61+
}
62+
self.insert_clean_result(insert_data)
63+
64+
def insert_clean_result(self, insert_data):
65+
retries = 0
66+
max_retries = 20
67+
retry_delay = 1
68+
while retries <= max_retries:
69+
try:
70+
self.execute_sql_insert(insert_data)
71+
return
72+
except Exception as e:
73+
if "database is locked" in str(e) or "locking protocol" in str(e):
74+
retries += 1
75+
time.sleep(retry_delay)
76+
else:
77+
logger.error("database execute failed: {}", str(e))
78+
raise RuntimeError(82000, str(e)) from None
79+
raise Exception("Max retries exceeded")
80+
81+
def execute_sql_insert(self, insert_data):
82+
insert_sql = str(self.sql_dict.get("insert_sql"))
83+
create_tables_sql = str(self.sql_dict.get("create_tables_sql"))
84+
with SQLManager.create_connect() as conn:
85+
conn.execute(text(create_tables_sql))
86+
conn.execute(text(insert_sql), insert_data)
87+
88+
89+
def query_task_info(self, instance_ids: list[str]):
90+
result = {}
91+
current_result = None
92+
for instance_id in instance_ids:
93+
try:
94+
current_result = self.execute_sql_query(instance_id)
95+
except Exception as e:
96+
logger.warning("instance_id: {}, query job result error: {}", instance_id, str(e))
97+
if current_result:
98+
result[instance_id] = current_result
99+
return result
100+
101+
def execute_sql_query(self, instance_id):
102+
result = None
103+
create_tables_sql = str(self.sql_dict.get("create_tables_sql"))
104+
query_sql = str(self.sql_dict.get("query_sql"))
105+
with SQLManager.create_connect() as conn:
106+
conn.execute(text(create_tables_sql))
107+
execute_result = conn.execute(text(query_sql), {"instance_id": instance_id})
108+
result = execute_result.fetchall()
109+
return result
110+
111+
# todo 删除接口待实现
112+
def delete_task_info(self, instance_id: str):
113+
create_tables_sql = self.sql_dict.get("create_tables_sql")
114+
delete_task_instance_sql = self.sql_dict.get("delete_task_instance_sql")
115+
try:
116+
with SQLManager.create_connect() as conn:
117+
conn.execute(text(create_tables_sql))
118+
conn.execute(text(delete_task_instance_sql), {"instance_id": instance_id})
119+
except Exception as e:
120+
logger.warning(f"delete database for flow: {instance_id}", e)
121+
122+
def delete_task_operate_info(self, instance_id: str):
123+
create_duplicate_img_tables_sql = self.sql_dict.get("create_duplicate_img_tables_sql")
124+
create_similar_img_tables_sql = self.sql_dict.get("create_similar_img_tables_sql")
125+
create_similar_text_tables_sql = self.sql_dict.get("create_similar_text_tables_sql")
126+
delete_duplicate_img_tables_sql = self.sql_dict.get("delete_duplicate_img_tables_sql")
127+
delete_similar_img_tables_sql = self.sql_dict.get("delete_similar_img_tables_sql")
128+
delete_similar_text_tables_sql = self.sql_dict.get("delete_similar_text_tables_sql")
129+
try:
130+
with SQLManager.create_connect() as conn:
131+
conn.execute(text(create_duplicate_img_tables_sql))
132+
conn.execute(text(delete_duplicate_img_tables_sql), {"instance_id": instance_id})
133+
conn.execute(text(create_similar_img_tables_sql))
134+
conn.execute(text(delete_similar_img_tables_sql), {"instance_id": instance_id})
135+
conn.execute(text(create_similar_text_tables_sql))
136+
conn.execute(text(delete_similar_text_tables_sql), {"instance_id": instance_id})
137+
except Exception as e:
138+
logger.warning(f"delete database for flow: {instance_id} error", e)

0 commit comments

Comments
 (0)