7
7
import base64
8
8
from typing import List
9
9
from modelcache .manager .scalar_data .base import CacheStorage , CacheData
10
- from DBUtils .PooledDB import PooledDB
10
+ import sqlite3
11
+
12
+
13
+ # def insert_single_data(conn, sql, data_tuple):
14
+ # cur = conn.cursor()
15
+ # try:
16
+ # cur.execute(sql, data_tuple)
17
+ # conn.commit()
18
+ # id = cur.lastrowid
19
+ # # print('id: {}'.format(id))
20
+ # return id
21
+ # except Exception as e:
22
+ # print(e)
23
+ # conn.rollback()
24
+ # if cur:
25
+ # cur.close()
26
+ #
27
+ #
28
+ # def excute_sql(conn, sql):
29
+ # cur = conn.cursor()
30
+ # try:
31
+ # cur.execute(sql)
32
+ # conn.commit()
33
+ # except Exception as e:
34
+ # print(e)
35
+ # conn.rollback()
36
+ # if cur:
37
+ # cur.close()
38
+ #
39
+ #
40
+ # def excute_delete_sql(conn, sql):
41
+ # cur = conn.cursor()
42
+ # try:
43
+ # cur.execute(sql)
44
+ # row_count = cur.rowcount
45
+ # conn.commit()
46
+ # except Exception as e:
47
+ # print(e)
48
+ # conn.rollback()
49
+ # if cur:
50
+ # cur.close()
51
+ # return row_count
52
+ #
53
+ #
54
+ # def query_fetch_one_data(conn, sql):
55
+ # cursor = conn.cursor()
56
+ # try:
57
+ # cursor.execute(sql)
58
+ # except Exception as e:
59
+ # print(e)
60
+ # conn.rollback()
61
+ # rows = cursor.fetchone()
62
+ # if cursor:
63
+ # cursor.close()
64
+ # return rows
65
+ #
66
+ #
67
+ # def close(conn):
68
+ # if conn:
69
+ # conn.close()
11
70
12
71
13
72
class SQLStorage (CacheStorage ):
14
73
def __init__ (
15
74
self ,
16
75
db_type : str = "mysql" ,
17
76
config = None ,
18
- url = "sqlite:/// ./sqlite.db"
77
+ url = "./sqlite.db"
19
78
):
20
- if db_type in ["mysql" , "oceanbase" ]:
21
- self .host = config .get ('mysql' , 'host' )
22
- self .port = int (config .get ('mysql' , 'port' ))
23
- self .username = config .get ('mysql' , 'username' )
24
- self .password = config .get ('mysql' , 'password' )
25
- self .database = config .get ('mysql' , 'database' )
26
-
27
- self .pool = PooledDB (
28
- creator = pymysql ,
29
- host = self .host ,
30
- user = self .username ,
31
- password = self .password ,
32
- port = self .port ,
33
- database = self .database
34
- )
35
- elif db_type == 'sqlite' :
36
- self ._url = url
79
+ self ._url = url
80
+ # self._engine = sqlite3.connect(url)
81
+ self .create ()
37
82
38
83
def create (self ):
39
- pass
84
+ # answer_table_sql = """CREATE TABLE IF NOT EXISTS `modelcache_llm_answer` (
85
+ # `id` bigint(20) NOT NULL AUTO_INCREMENT comment '主键',
86
+ # `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '创建时间',
87
+ # `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间',
88
+ # `question` text NOT NULL comment 'question',
89
+ # `answer` text NOT NULL comment 'answer',
90
+ # `answer_type` int(11) NOT NULL comment 'answer_type',
91
+ # `hit_count` int(11) NOT NULL DEFAULT '0' comment 'hit_count',
92
+ # `model` varchar(1000) NOT NULL comment 'model',
93
+ # `embedding_data` blob NOT NULL comment 'embedding_data',
94
+ # PRIMARY KEY(`id`)
95
+ # ) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = 'modelcache_llm_answer';
96
+ # """
97
+ answer_table_sql = """CREATE TABLE IF NOT EXISTS modelcache_llm_answer (
98
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
99
+ gmt_create TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
100
+ gmt_modified TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
101
+ question TEXT NOT NULL,
102
+ answer TEXT NOT NULL,
103
+ answer_type INTEGER NOT NULL,
104
+ hit_count INTEGER NOT NULL DEFAULT 0,
105
+ model VARCHAR(1000) NOT NULL,
106
+ embedding_data BLOB NOT NULL
107
+ );
108
+ """
109
+
110
+ # log_table_sql = """CREATE TABLE IF NOT EXISTS `modelcache_query_log` (
111
+ # `id` bigint(20) NOT NULL AUTO_INCREMENT comment '主键',
112
+ # `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '创建时间',
113
+ # `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间',
114
+ # `error_code` int(11) NOT NULL comment 'errorCode',
115
+ # `error_desc` varchar(1000) NOT NULL comment 'errorDesc',
116
+ # `cache_hit` varchar(100) NOT NULL comment 'cacheHit',
117
+ # `delta_time` float NOT NULL comment 'delta_time',
118
+ # `model` varchar(1000) NOT NULL comment 'model',
119
+ # `query` text NOT NULL comment 'query',
120
+ # `hit_query` text NOT NULL comment 'hitQuery',
121
+ # `answer` text NOT NULL comment 'answer',
122
+ # PRIMARY KEY(`id`)
123
+ # ) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = 'modelcache_query_log';
124
+ # """
125
+ log_table_sql = """CREATE TABLE IF NOT EXISTS modelcache_query_log (
126
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
127
+ gmt_create TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
128
+ gmt_modified TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
129
+ error_code INTEGER NOT NULL,
130
+ error_desc VARCHAR(1000) NOT NULL,
131
+ cache_hit VARCHAR(100) NOT NULL,
132
+ delta_time REAL NOT NULL,
133
+ model VARCHAR(1000) NOT NULL,
134
+ query TEXT NOT NULL,
135
+ hit_query TEXT NOT NULL,
136
+ answer TEXT NOT NULL
137
+ );
138
+ """
139
+
140
+ conn = sqlite3 .connect (self ._url )
141
+ try :
142
+ cursor = conn .cursor ()
143
+ cursor .execute (answer_table_sql )
144
+ cursor .execute (log_table_sql )
145
+ conn .commit ()
146
+ cursor .close ()
147
+ conn .close ()
148
+ finally :
149
+ conn .close ()
40
150
41
151
def _insert (self , data : List ):
42
152
answer = data [0 ]
@@ -46,19 +156,19 @@ def _insert(self, data: List):
46
156
answer_type = 0
47
157
embedding_data = embedding_data .tobytes ()
48
158
49
- table_name = "cache_codegpt_answer "
50
- insert_sql = "INSERT INTO {} (question, answer, answer_type, model, embedding_data) VALUES (%s, %s, %s, %s, _binary%s )" .format (table_name )
159
+ table_name = "modelcache_llm_answer "
160
+ insert_sql = "INSERT INTO {} (question, answer, answer_type, model, embedding_data) VALUES (?, ?, ?, ?, ? )" .format (table_name )
51
161
52
- conn = self . pool . connection ( )
162
+ conn = sqlite3 . connect ( self . _url )
53
163
try :
54
- with conn .cursor () as cursor :
55
- # 执行插入数据操作
56
- values = (question , answer , answer_type , model , embedding_data )
57
- cursor .execute (insert_sql , values )
58
- conn .commit ()
59
- id = cursor .lastrowid
164
+ cursor = conn .cursor ()
165
+ values = (question , answer , answer_type , model , embedding_data )
166
+ cursor .execute (insert_sql , values )
167
+ conn .commit ()
168
+ id = cursor .lastrowid
169
+ cursor .close ()
170
+ conn .close ()
60
171
finally :
61
- # 关闭连接,将连接返回给连接池
62
172
conn .close ()
63
173
return id
64
174
@@ -81,34 +191,32 @@ def insert_query_resp(self, query_resp, **kwargs):
81
191
if isinstance (hit_query , list ):
82
192
hit_query = json .dumps (hit_query , ensure_ascii = False )
83
193
84
- table_name = "cache_query_log_info "
194
+ table_name = "modelcache_query_log "
85
195
insert_sql = "INSERT INTO {} (error_code, error_desc, cache_hit, model, query, delta_time, hit_query, answer) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" .format (table_name )
86
- conn = self .pool .connection ()
196
+
197
+ conn = sqlite3 .connect (self ._url )
87
198
try :
88
- with conn .cursor () as cursor :
89
- # 执行插入数据操作
90
- values = (error_code , error_desc , cache_hit , model , query , delta_time , hit_query , answer )
91
- cursor .execute (insert_sql , values )
92
- conn .commit ()
199
+ cursor = conn .cursor ()
200
+ values = (error_code , error_desc , cache_hit , model , query , delta_time , hit_query , answer )
201
+ cursor .execute (insert_sql , values )
202
+ conn .commit ()
203
+ cursor .close ()
204
+ conn .close ()
93
205
finally :
94
- # 关闭连接,将连接返回给连接池
95
206
conn .close ()
96
- return id
97
207
98
208
def get_data_by_id (self , key : int ):
99
- table_name = "cache_codegpt_answer "
209
+ table_name = "modelcache_llm_answer "
100
210
query_sql = "select question, answer, embedding_data, model from {} where id={}" .format (table_name , key )
101
- conn_start = time .time ()
102
- conn = self .pool .connection ()
103
-
104
- search_start = time .time ()
211
+ conn = sqlite3 .connect (self ._url )
105
212
try :
106
- with conn .cursor () as cursor :
107
- # 执行数据库操作
108
- cursor .execute (query_sql )
109
- resp = cursor .fetchone ()
213
+ cursor = conn .cursor ()
214
+ cursor .execute (query_sql )
215
+ resp = cursor .fetchone ()
216
+ conn .commit ()
217
+ cursor .close ()
218
+ conn .close ()
110
219
finally :
111
- # 关闭连接,将连接返回给连接池
112
220
conn .close ()
113
221
114
222
if resp is not None and len (resp ) == 4 :
@@ -117,16 +225,16 @@ def get_data_by_id(self, key: int):
117
225
return None
118
226
119
227
def update_hit_count_by_id (self , primary_id : int ):
120
- table_name = "cache_codegpt_answer "
228
+ table_name = "modelcache_llm_answer "
121
229
update_sql = "UPDATE {} SET hit_count = hit_count+1 WHERE id={}" .format (table_name , primary_id )
122
- conn = self .pool .connection ()
123
230
124
- # 使用连接执行更新数据操作
231
+ conn = sqlite3 . connect ( self . _url )
125
232
try :
126
- with conn .cursor () as cursor :
127
- # 执行更新数据操作
128
- cursor .execute (update_sql )
129
- conn .commit ()
233
+ cursor = conn .cursor ()
234
+ cursor .execute (update_sql )
235
+ conn .commit ()
236
+ cursor .close ()
237
+ conn .close ()
130
238
finally :
131
239
# 关闭连接,将连接返回给连接池
132
240
conn .close ()
@@ -135,34 +243,31 @@ def get_ids(self, deleted=True):
135
243
pass
136
244
137
245
def mark_deleted (self , keys ):
138
- table_name = "cache_codegpt_answer "
246
+ table_name = "modelcache_llm_answer "
139
247
delete_sql = "Delete from {} WHERE id in ({})" .format (table_name , "," .join ([str (i ) for i in keys ]))
140
-
141
- # 从连接池中获取连接
142
- conn = self .pool .connection ()
248
+ conn = sqlite3 .connect (self ._url )
143
249
try :
144
- with conn .cursor () as cursor :
145
- # 执行删除数据操作
146
- cursor .execute (delete_sql )
147
- delete_count = cursor .rowcount
148
- conn .commit ()
250
+ cursor = conn .cursor ()
251
+ cursor .execute (delete_sql )
252
+ delete_count = cursor .rowcount
253
+ conn .commit ()
254
+ cursor .close ()
255
+ conn .close ()
149
256
finally :
150
- # 关闭连接,将连接返回给连接池
151
257
conn .close ()
152
258
return delete_count
153
259
154
260
def model_deleted (self , model_name ):
155
- table_name = "cache_codegpt_answer "
261
+ table_name = "modelcache_llm_answer "
156
262
delete_sql = "Delete from {} WHERE model='{}'" .format (table_name , model_name )
157
- conn = self .pool .connection ()
158
- # 使用连接执行删除数据操作
263
+ conn = sqlite3 .connect (self ._url )
159
264
try :
160
- with conn .cursor () as cursor :
161
- # 执行删除数据操作
162
- resp = cursor .execute (delete_sql )
163
- conn .commit ()
265
+ cursor = conn .cursor ()
266
+ resp = cursor .execute (delete_sql )
267
+ conn .commit ()
268
+ cursor .close ()
269
+ conn .close ()
164
270
finally :
165
- # 关闭连接,将连接返回给连接池
166
271
conn .close ()
167
272
return resp
168
273
0 commit comments