Skip to content

Commit bf7584a

Browse files
committed
add flask multi cache demo
1 parent 84cace3 commit bf7584a

File tree

2 files changed

+395
-0
lines changed

2 files changed

+395
-0
lines changed

flask4multicache.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# -*- coding: utf-8 -*-
2+
import time
3+
from flask import Flask, request
4+
import logging
5+
import configparser
6+
import json
7+
from modelcache import cache
8+
from modelcache.adapter import adapter
9+
from modelcache.manager import CacheBase, VectorBase, get_data_manager
10+
from modelcache.similarity_evaluation.distance import SearchDistanceEvaluation
11+
from modelcache.processor.pre import query_multi_splicing
12+
from modelcache.processor.pre import insert_multi_splicing
13+
from concurrent.futures import ThreadPoolExecutor
14+
from modelcache.utils.model_filter import model_blacklist_filter
15+
from modelcache.embedding import Data2VecAudio
16+
17+
# 创建一个Flask实例
18+
app = Flask(__name__)
19+
20+
21+
def response_text(cache_resp):
22+
return cache_resp['data']
23+
24+
25+
def save_query_info(result, model, query, delta_time_log):
26+
cache.data_manager.save_query_resp(result, model=model, query=json.dumps(query, ensure_ascii=False),
27+
delta_time=delta_time_log)
28+
29+
30+
def response_hitquery(cache_resp):
31+
return cache_resp['hitQuery']
32+
33+
34+
data2vec = Data2VecAudio()
35+
mysql_config = configparser.ConfigParser()
36+
mysql_config.read('modelcache/config/mysql_config.ini')
37+
38+
milvus_config = configparser.ConfigParser()
39+
milvus_config.read('modelcache/config/milvus_config.ini')
40+
41+
# redis_config = configparser.ConfigParser()
42+
# redis_config.read('modelcache/config/redis_config.ini')
43+
44+
45+
data_manager = get_data_manager(CacheBase("mysql", config=mysql_config),
46+
VectorBase("milvus", dimension=data2vec.dimension, milvus_config=milvus_config))
47+
48+
# data_manager = get_data_manager(CacheBase("mysql", config=mysql_config),
49+
# VectorBase("redis", dimension=data2vec.dimension, redis_config=redis_config))
50+
51+
52+
cache.init(
53+
embedding_func=data2vec.to_embeddings,
54+
data_manager=data_manager,
55+
similarity_evaluation=SearchDistanceEvaluation(),
56+
query_pre_embedding_func=query_multi_splicing,
57+
insert_pre_embedding_func=insert_multi_splicing,
58+
)
59+
60+
global executor
61+
executor = ThreadPoolExecutor(max_workers=6)
62+
63+
64+
@app.route('/welcome')
65+
def first_flask(): # 视图函数
66+
return 'hello, modelcache!'
67+
68+
69+
@app.route('/modelcache', methods=['GET', 'POST'])
70+
def user_backend():
71+
try:
72+
if request.method == 'POST':
73+
request_data = request.json
74+
elif request.method == 'GET':
75+
request_data = request.args
76+
param_dict = json.loads(request_data)
77+
except Exception as e:
78+
result = {"errorCode": 101, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '',
79+
"answer": ''}
80+
cache.data_manager.save_query_resp(result, model='', query='', delta_time=0)
81+
return json.dumps(result)
82+
83+
# param parsing
84+
try:
85+
request_type = param_dict.get("type")
86+
87+
scope = param_dict.get("scope")
88+
if scope is not None:
89+
model = scope.get('model')
90+
model = model.replace('-', '_')
91+
model = model.replace('.', '_')
92+
query = param_dict.get("query")
93+
chat_info = param_dict.get("chat_info")
94+
if request_type is None or request_type not in ['query', 'insert', 'remove', 'register']:
95+
result = {"errorCode": 102,
96+
"errorDesc": "type exception, should one of ['query', 'insert', 'remove', 'register']",
97+
"cacheHit": False, "delta_time": 0, "hit_query": '', "answer": ''}
98+
cache.data_manager.save_query_resp(result, model=model, query='', delta_time=0)
99+
return json.dumps(result)
100+
except Exception as e:
101+
result = {"errorCode": 103, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '',
102+
"answer": ''}
103+
return json.dumps(result)
104+
105+
# model filter
106+
filter_resp = model_blacklist_filter(model, request_type)
107+
if isinstance(filter_resp, dict):
108+
return json.dumps(filter_resp)
109+
110+
if request_type == 'query':
111+
try:
112+
start_time = time.time()
113+
response = adapter.ChatCompletion.create_query(
114+
scope={"model": model},
115+
query=query
116+
)
117+
delta_time = '{}s'.format(round(time.time() - start_time, 2))
118+
if response is None:
119+
result = {"errorCode": 0, "errorDesc": '', "cacheHit": False, "delta_time": delta_time, "hit_query": '',
120+
"answer": ''}
121+
# elif response in ['adapt_query_exception']:
122+
elif isinstance(response, str):
123+
result = {"errorCode": 201, "errorDesc": response, "cacheHit": False, "delta_time": delta_time,
124+
"hit_query": '', "answer": ''}
125+
else:
126+
answer = response_text(response)
127+
hit_query = response_hitquery(response)
128+
result = {"errorCode": 0, "errorDesc": '', "cacheHit": True, "delta_time": delta_time,
129+
"hit_query": hit_query, "answer": answer}
130+
delta_time_log = round(time.time() - start_time, 2)
131+
future = executor.submit(save_query_info, result, model, query, delta_time_log)
132+
except Exception as e:
133+
result = {"errorCode": 202, "errorDesc": str(e), "cacheHit": False, "delta_time": 0,
134+
"hit_query": '', "answer": ''}
135+
logging.info('result: {}'.format(result))
136+
137+
return json.dumps(result, ensure_ascii=False)
138+
139+
if request_type == 'insert':
140+
try:
141+
try:
142+
response = adapter.ChatCompletion.create_insert(
143+
model=model,
144+
chat_info=chat_info
145+
)
146+
except Exception as e:
147+
result = {"errorCode": 302, "errorDesc": str(e), "writeStatus": "exception"}
148+
return json.dumps(result, ensure_ascii=False)
149+
150+
if response == 'success':
151+
result = {"errorCode": 0, "errorDesc": "", "writeStatus": "success"}
152+
else:
153+
result = {"errorCode": 301, "errorDesc": response, "writeStatus": "exception"}
154+
return json.dumps(result, ensure_ascii=False)
155+
except Exception as e:
156+
result = {"errorCode": 303, "errorDesc": str(e), "writeStatus": "exception"}
157+
return json.dumps(result, ensure_ascii=False)
158+
159+
if request_type == 'remove':
160+
remove_type = param_dict.get("remove_type")
161+
id_list = param_dict.get("id_list", [])
162+
163+
response = adapter.ChatCompletion.create_remove(
164+
model=model,
165+
remove_type=remove_type,
166+
id_list=id_list
167+
)
168+
if not isinstance(response, dict):
169+
result = {"errorCode": 401, "errorDesc": "", "response": response, "removeStatus": "exception"}
170+
return json.dumps(result)
171+
172+
state = response.get('status')
173+
if state == 'success':
174+
result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"}
175+
else:
176+
result = {"errorCode": 402, "errorDesc": "", "response": response, "writeStatus": "exception"}
177+
return json.dumps(result)
178+
179+
if request_type == 'register':
180+
response = adapter.ChatCompletion.create_register(
181+
model=model
182+
)
183+
if response in ['create_success', 'already_exists']:
184+
result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"}
185+
else:
186+
result = {"errorCode": 502, "errorDesc": "", "response": response, "writeStatus": "exception"}
187+
return json.dumps(result)
188+
189+
190+
if __name__ == '__main__':
191+
app.run(host='0.0.0.0', port=5000, debug=True)

flask4multicache_demo.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# -*- coding: utf-8 -*-
2+
import time
3+
from flask import Flask, request
4+
import logging
5+
import json
6+
from modelcache import cache
7+
from modelcache.adapter import adapter
8+
from modelcache.manager import CacheBase, VectorBase, get_data_manager
9+
from modelcache.similarity_evaluation.distance import SearchDistanceEvaluation
10+
from modelcache.processor.pre import query_multi_splicing
11+
from modelcache.processor.pre import insert_multi_splicing
12+
from concurrent.futures import ThreadPoolExecutor
13+
from modelcache.utils.model_filter import model_blacklist_filter
14+
from modelcache.embedding import Data2VecAudio
15+
from modelcache_mm.processor.pre import mm_insert_dict
16+
from modelcache_mm.processor.pre import mm_query_dict
17+
from modelcache_mm.embedding import Clip2Vec
18+
19+
# 创建一个Flask实例
20+
app = Flask(__name__)
21+
22+
23+
def response_text(cache_resp):
24+
return cache_resp['data']
25+
26+
27+
def save_query_info(result, model, query, delta_time_log):
28+
cache.data_manager.save_query_resp(result, model=model, query=json.dumps(query, ensure_ascii=False),
29+
delta_time=delta_time_log)
30+
31+
32+
def response_hitquery(cache_resp):
33+
return cache_resp['hitQuery']
34+
35+
36+
# data2vec = Data2VecAudio()
37+
# data_manager = get_data_manager(CacheBase("sqlite"), VectorBase("faiss", dimension=data2vec.dimension))
38+
39+
image_dimension = 512
40+
text_dimension = 512
41+
clip2vec = Clip2Vec()
42+
data_manager = get_data_manager(CacheBase("sqlite"), VectorBase("faiss",
43+
mm_dimension=image_dimension+text_dimension,
44+
i_dimension=image_dimension,
45+
t_dimension=text_dimension))
46+
47+
48+
cache.init(
49+
embedding_func=clip2vec.to_embeddings,
50+
data_manager=data_manager,
51+
similarity_evaluation=SearchDistanceEvaluation(),
52+
insert_pre_embedding_func=mm_insert_dict,
53+
query_pre_embedding_func=mm_query_dict,
54+
)
55+
56+
# cache.set_openai_key()
57+
global executor
58+
executor = ThreadPoolExecutor(max_workers=6)
59+
60+
61+
@app.route('/welcome')
62+
def first_flask(): # 视图函数
63+
return 'hello, multicache!'
64+
65+
66+
@app.route('/multicache', methods=['GET', 'POST'])
67+
def user_backend():
68+
try:
69+
if request.method == 'POST':
70+
request_data = request.json
71+
elif request.method == 'GET':
72+
request_data = request.args
73+
param_dict = json.loads(request_data)
74+
except Exception as e:
75+
result = {"errorCode": 301, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '',
76+
"answer": ''}
77+
cache.data_manager.save_query_resp(result, model='', query='', delta_time=0)
78+
return json.dumps(result)
79+
80+
# param parsing
81+
try:
82+
request_type = param_dict.get("request_type")
83+
scope = param_dict.get("scope")
84+
if scope is not None:
85+
model = scope.get('model')
86+
model = model.replace('-', '_')
87+
model = model.replace('.', '_')
88+
89+
if request_type in ['query', 'insert']:
90+
if request_type == 'query':
91+
query = param_dict.get("query")
92+
elif request_type == 'insert':
93+
chat_info = param_dict.get("chat_info")
94+
query = chat_info[-1]['query']
95+
96+
if request_type is None or request_type not in ['query', 'remove', 'insert', 'register']:
97+
result = {"errorCode": 102,
98+
"errorDesc": "type exception, should one of ['query', 'insert', 'remove', 'register']",
99+
"cacheHit": False, "delta_time": 0, "hit_query": '', "answer": ''}
100+
cache.data_manager.save_query_resp(result, model=model, query='', delta_time=0)
101+
return json.dumps(result)
102+
except Exception as e:
103+
result = {"errorCode": 103, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '',
104+
"answer": ''}
105+
return json.dumps(result)
106+
107+
# model filter
108+
# filter_resp = model_blacklist_filter(model, request_type)
109+
# if isinstance(filter_resp, dict):
110+
# return json.dumps(filter_resp)
111+
112+
if request_type == 'query':
113+
# if UUID:
114+
# try:
115+
# uuid_list = UUID.split('==>')
116+
# except Exception as e:
117+
try:
118+
start_time = time.time()
119+
response = adapter.ChatCompletion.create_query(
120+
scope={"model": model},
121+
query=query,
122+
)
123+
delta_time = '{}s'.format(round(time.time() - start_time, 2))
124+
if response is None:
125+
result = {"errorCode": 0, "errorDesc": '', "cacheHit": False, "delta_time": delta_time,
126+
"hit_query": '', "answer": ''}
127+
elif isinstance(response, dict):
128+
answer = response_text(response)
129+
hit_query = response_hitquery(response)
130+
result = {"errorCode": 0, "errorDesc": '', "cacheHit": True, "delta_time": delta_time,
131+
"hit_query": hit_query, "answer": answer}
132+
else:
133+
result = {"errorCode": 201, "errorDesc": response, "cacheHit": False, "delta_time": delta_time,
134+
"hit_query": '', "answer": ''}
135+
delta_time_log = round(time.time() - start_time, 3)
136+
137+
future = executor.submit(save_query_info, result, model, query, delta_time_log)
138+
# query_time = round(time.time() - start_time, 2)
139+
except Exception as e:
140+
raise e
141+
return json.dumps(result, ensure_ascii=False)
142+
143+
if request_type == 'insert':
144+
# if UUID:
145+
# try:
146+
# uuid_list = UUID.split('==>')
147+
# except Exception as e:
148+
try:
149+
start_time = time.time()
150+
try:
151+
response = adapter.ChatCompletion.create_insert(
152+
model=model,
153+
chat_info=chat_info,
154+
)
155+
except Exception as e:
156+
raise e
157+
158+
if response == 'success':
159+
result = {"errorCode": 0, "errorDesc": "", "writeStatus": "success"}
160+
else:
161+
result = {"errorCode": 301, "errorDesc": response, "writeStatus": "exception"}
162+
insert_time = round(time.time() - start_time, 2)
163+
return json.dumps(result, ensure_ascii=False)
164+
except Exception as e:
165+
raise e
166+
167+
if request_type == 'remove':
168+
remove_type = param_dict.get("remove_type")
169+
id_list = param_dict.get("id_list", [])
170+
171+
response = adapter.ChatCompletion.create_remove(
172+
model=model,
173+
remove_type=remove_type,
174+
id_list=id_list
175+
)
176+
177+
if not isinstance(response, dict):
178+
result = {"errorCode": 401, "errorDesc": "", "response": response, "removeStatus": "exception"}
179+
return json.dumps(result)
180+
181+
state = response.get('status')
182+
# if response == 'success':
183+
if state == 'success':
184+
result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"}
185+
else:
186+
result = {"errorCode": 402, "errorDesc": "", "response": response, "writeStatus": "exception"}
187+
return json.dumps(result)
188+
189+
if request_type == 'register':
190+
type = param_dict.get("type")
191+
response = adapter.ChatCompletion.create_register(
192+
model=model,
193+
type=type
194+
)
195+
if response in ['create_success', 'already_exists']:
196+
result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"}
197+
else:
198+
result = {"errorCode": 502, "errorDesc": "", "response": response, "writeStatus": "exception"}
199+
return json.dumps(result)
200+
201+
202+
if __name__ == '__main__':
203+
# app.run(host='0.0.0.0', port=5000, debug=True)
204+
app.run(host='0.0.0.0', port=5000)

0 commit comments

Comments
 (0)