|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | + Alipay.com Inc. |
| 4 | + Copyright (c) 2004-2021 All Rights Reserved. |
| 5 | + ------------------------------------------------------ |
| 6 | + File Name : gptcache_serving.py |
| 7 | + Author : fuhui.phe |
| 8 | + |
| 9 | + Create Time : 2023/5/28 11:03 |
| 10 | + Description : description what the main function of this file |
| 11 | + Change Activity: |
| 12 | + version0 : 2023/5/28 11:03 by fuhui.phe init |
| 13 | +""" |
| 14 | +from datetime import datetime |
| 15 | +from typing import Dict |
| 16 | +import time |
| 17 | +import json |
| 18 | +import uuid |
| 19 | +from gptcache import cache |
| 20 | +from gptcache.adapter import codegpt |
| 21 | +from gptcache.manager import CacheBase, VectorBase, get_data_manager |
| 22 | +from gptcache.similarity_evaluation.distance import SearchDistanceEvaluation |
| 23 | +from gptcache.processor.pre import insert_iat_dict |
| 24 | +from gptcache.processor.pre import query_iat_dict |
| 25 | +from gptcache.utils.env_config import get_table_suffix |
| 26 | +from concurrent.futures import ThreadPoolExecutor |
| 27 | +from gptcache.maya_embedding_service.maya_multi_embedding_service import get_embedding_multi |
| 28 | +from gptcache.maya_embedding_service.maya_multi_embedding_service import get_embedding_multi_concurrent_sin |
| 29 | +# from gptcache.utils.modle_version_manager import model_version_load |
| 30 | +# from gptcache.utils.modle_version_manager import get_all_collections |
| 31 | +# from gptcache.utils.collection_util import get_collection_iat_name |
| 32 | +# from gptcache.utils.collection_util import get_collection_iat_type |
| 33 | + |
| 34 | + |
| 35 | +def save_query_info(result, model, query, delta_time_log): |
| 36 | + print('执行 save_query_info!') |
| 37 | + cache.data_manager.save_query_resp(result, model=model, query=query, |
| 38 | + delta_time=delta_time_log) |
| 39 | + |
| 40 | + |
| 41 | +def response_text(cache_resp): |
| 42 | + # print('cache_resp: {}'.format(cache_resp)) |
| 43 | + return cache_resp['data'] |
| 44 | + |
| 45 | + |
| 46 | +def response_hitquery(cache_resp): |
| 47 | + # print('cache_resp: {}'.format(cache_resp)) |
| 48 | + return cache_resp['hitQuery'] |
| 49 | + |
| 50 | + |
| 51 | +# timm2vec = Timm() |
| 52 | +# text2vec = Data2VecAudio() |
| 53 | + |
| 54 | + |
| 55 | +# python类示例 |
| 56 | +class UserBackend: |
| 57 | + def __init__(self): |
| 58 | + self.table_suffix = get_table_suffix() |
| 59 | + image_dimension = 768 |
| 60 | + text_dimension = 768 |
| 61 | + # data_manager = get_data_manager(CacheBase("oceanbase", table_suffix=self.table_suffix), |
| 62 | + # VectorBase("milvus", iat_dimension=image_dimension + text_dimension, |
| 63 | + # i_dimension=image_dimension, t_dimension=text_dimension, |
| 64 | + # table_suffix=self.table_suffix)) |
| 65 | + |
| 66 | + data_manager = get_data_manager(CacheBase("oceanbase", table_suffix=self.table_suffix), |
| 67 | + VectorBase("redis", iat_dimension=image_dimension+text_dimension, |
| 68 | + i_dimension=image_dimension, t_dimension=text_dimension, |
| 69 | + table_suffix=self.table_suffix)) |
| 70 | + cache.init( |
| 71 | + # embedding_func=get_cache_embedding_text2vec, |
| 72 | + # image_embedding_func=timm2vec.to_embeddings, |
| 73 | + # text_embedding_func=text2vec.to_embeddings, |
| 74 | + embedding_func=get_embedding_multi, |
| 75 | + embedding_concur_func=get_embedding_multi_concurrent_sin, |
| 76 | + data_manager=data_manager, |
| 77 | + similarity_evaluation=SearchDistanceEvaluation(), |
| 78 | + # iat_query_pre_embedding_func=query_multi_splicing, |
| 79 | + iat_insert_pre_embedding_func=insert_iat_dict, |
| 80 | + iat_query_pre_embedding_func=query_iat_dict, |
| 81 | + # insert_pre_embedding_miulti_func=insert_multimodal_splicing, |
| 82 | + # query_pre_embedding_miulti_func=query_multimodal_splicing, |
| 83 | + ) |
| 84 | + # cache.set_openai_key() |
| 85 | + self.gptcache_version = datetime.now().strftime("%Y-%m-%d %H:%M") |
| 86 | + self.executor = ThreadPoolExecutor(max_workers=6) |
| 87 | + |
| 88 | + def __call__(self, param): |
| 89 | + print('gptcache_version: {}'.format(self.gptcache_version)) |
| 90 | + # logging.info('gptcache_version: {}'.format(self.gptcache_version)) |
| 91 | + print('call_time: {}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) |
| 92 | + # logging.info('call_time: {}'.format(datetime.now().strftime("%Y-%m-%d %H:%M"))) |
| 93 | + try: |
| 94 | + # print('param: {}'.format(param)) |
| 95 | + param_dict = json.loads(param) |
| 96 | + except Exception as e: |
| 97 | + result = {"errorCode": 101, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '', |
| 98 | + "answer": ''} |
| 99 | + cache.data_manager.save_query_resp(result, model='', query='', delta_time=0) |
| 100 | + return json.dumps(result) |
| 101 | + |
| 102 | + request_type = param_dict.get("request_type") |
| 103 | + UUID = param_dict.get("UUID", None) |
| 104 | + print('request_type: {}'.format(request_type)) |
| 105 | + # param parsing |
| 106 | + try: |
| 107 | + scope = param_dict.get("scope") |
| 108 | + print('scope: {}'.format(scope)) |
| 109 | + if scope is not None: |
| 110 | + model = scope.get('model') |
| 111 | + model = model.replace('-', '_') |
| 112 | + model = model.replace('.', '_') |
| 113 | + print('model: {}'.format(model)) |
| 114 | + |
| 115 | + if request_type in ['iat_query', 'iat_insert']: |
| 116 | + if request_type == 'iat_query': |
| 117 | + query = param_dict.get("query") |
| 118 | + elif request_type == 'iat_insert': |
| 119 | + chat_info = param_dict.get("chat_info") |
| 120 | + query = chat_info[-1]['query'] |
| 121 | + |
| 122 | + if request_type is None or request_type not in ['iat_query', 'iat_remove', 'iat_insert', 'iat_register']: |
| 123 | + result = {"errorCode": 102, |
| 124 | + "errorDesc": "type exception, should one of ['iat_query', 'iat_insert', 'iat_remove', 'iat_register']", |
| 125 | + "cacheHit": False, "delta_time": 0, "hit_query": '', "answer": ''} |
| 126 | + cache.data_manager.save_query_resp(result, model=model, query='', delta_time=0) |
| 127 | + return json.dumps(result) |
| 128 | + except Exception as e: |
| 129 | + result = {"errorCode": 103, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, "hit_query": '', |
| 130 | + "answer": ''} |
| 131 | + # cache.data_manager.save_query_resp(result, model='', query='', delta_time=0) |
| 132 | + return json.dumps(result) |
| 133 | + |
| 134 | + # --------分割线 |
| 135 | + if request_type == 'iat_query': |
| 136 | + if UUID: |
| 137 | + try: |
| 138 | + uuid_list = UUID.split('==>') |
| 139 | + user_start = float(uuid_list[1]) |
| 140 | + ray_http_cost = time.time()-user_start |
| 141 | + print('ray_http_cost: {}'.format(ray_http_cost)) |
| 142 | + except Exception as e: |
| 143 | + print('uuid_e: {}'.format(e)) |
| 144 | + try: |
| 145 | + start_time = time.time() |
| 146 | + response = codegpt.ChatCompletion.create_iat_query( |
| 147 | + scope={"model": model}, |
| 148 | + query=query, |
| 149 | + ) |
| 150 | + # print('response: {}'.format(response)) |
| 151 | + delta_time = '{}s'.format(round(time.time() - start_time, 2)) |
| 152 | + if response is None: |
| 153 | + result = {"errorCode": 0, "errorDesc": '', "cacheHit": False, "delta_time": delta_time, |
| 154 | + "hit_query": '', "answer": ''} |
| 155 | + elif isinstance(response, dict): |
| 156 | + answer = response_text(response) |
| 157 | + hit_query = response_hitquery(response) |
| 158 | + result = {"errorCode": 0, "errorDesc": '', "cacheHit": True, "delta_time": delta_time, |
| 159 | + "hit_query": hit_query, "answer": answer} |
| 160 | + else: |
| 161 | + result = {"errorCode": 201, "errorDesc": response, "cacheHit": False, "delta_time": delta_time, |
| 162 | + "hit_query": '', "answer": ''} |
| 163 | + delta_time_log = round(time.time() - start_time, 3) |
| 164 | + print('delta_time_log: {}'.format(delta_time_log)) |
| 165 | + |
| 166 | + # modify at 20230807 20:51 |
| 167 | + future = self.executor.submit(save_query_info, result, model, query, delta_time_log) |
| 168 | + query_time = round(time.time() - start_time, 2) |
| 169 | + print('query_time: {}'.format(query_time)) |
| 170 | + except Exception as e: |
| 171 | + result = {"errorCode": 202, "errorDesc": str(e), "cacheHit": False, "delta_time": 0, |
| 172 | + "hit_query": '', "answer": ''} |
| 173 | + print('result: {}'.format(result)) |
| 174 | + return json.dumps(result, ensure_ascii=False) |
| 175 | + |
| 176 | + # response = codegpt.ChatCompletion.create_iat_query( |
| 177 | + # scope={"model": model}, |
| 178 | + # query=query, |
| 179 | + # ) |
| 180 | + # print('response_query: {}'.format(response)) |
| 181 | + |
| 182 | + if request_type == 'iat_insert': |
| 183 | + if UUID: |
| 184 | + try: |
| 185 | + uuid_list = UUID.split('==>') |
| 186 | + user_start = float(uuid_list[1]) |
| 187 | + ray_http_cost = time.time()-user_start |
| 188 | + print('ray_http_cost: {}'.format(ray_http_cost)) |
| 189 | + except Exception as e: |
| 190 | + print('uuid_e: {}'.format(e)) |
| 191 | + try: |
| 192 | + start_time = time.time() |
| 193 | + try: |
| 194 | + response = codegpt.ChatCompletion.create_iat_insert( |
| 195 | + model=model, |
| 196 | + chat_info=chat_info, |
| 197 | + ) |
| 198 | + except Exception as e: |
| 199 | + result = {"errorCode": 303, "errorDesc": e, "writeStatus": "exception"} |
| 200 | + return json.dumps(result, ensure_ascii=False) |
| 201 | + |
| 202 | + if response == 'success': |
| 203 | + result = {"errorCode": 0, "errorDesc": "", "writeStatus": "success"} |
| 204 | + else: |
| 205 | + result = {"errorCode": 301, "errorDesc": response, "writeStatus": "exception"} |
| 206 | + insert_time = round(time.time() - start_time, 2) |
| 207 | + print('insert_time: {}'.format(insert_time)) |
| 208 | + return json.dumps(result, ensure_ascii=False) |
| 209 | + except Exception as e: |
| 210 | + result = {"errorCode": 304, "errorDesc": e, "writeStatus": "exception"} |
| 211 | + print('result: {}'.format(result)) |
| 212 | + return json.dumps(result, ensure_ascii=False) |
| 213 | + |
| 214 | + # response = codegpt.ChatCompletion.create_iat_insert( |
| 215 | + # model=model, |
| 216 | + # chat_info=chat_info, |
| 217 | + # milvus_collection_ins=collection_ins |
| 218 | + # ) |
| 219 | + # print('response: {}'.format(response)) |
| 220 | + |
| 221 | + if request_type == 'iat_remove': |
| 222 | + remove_type = param_dict.get("remove_type") |
| 223 | + id_list = param_dict.get("id_list", []) |
| 224 | + print('remove_type: {}'.format(remove_type)) |
| 225 | + |
| 226 | + response = codegpt.ChatCompletion.create_iat_remove( |
| 227 | + model=model, |
| 228 | + remove_type=remove_type, |
| 229 | + id_list=id_list |
| 230 | + ) |
| 231 | + |
| 232 | + if not isinstance(response, dict): |
| 233 | + result = {"errorCode": 401, "errorDesc": "", "response": response, "removeStatus": "exception"} |
| 234 | + return json.dumps(result) |
| 235 | + |
| 236 | + state = response.get('status') |
| 237 | + # if response == 'success': |
| 238 | + if state == 'success': |
| 239 | + result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"} |
| 240 | + else: |
| 241 | + result = {"errorCode": 402, "errorDesc": "", "response": response, "writeStatus": "exception"} |
| 242 | + return json.dumps(result) |
| 243 | + |
| 244 | + if request_type == 'iat_register': |
| 245 | + iat_type = param_dict.get("iat_type") |
| 246 | + response = codegpt.ChatCompletion.create_iat_register( |
| 247 | + model=model, |
| 248 | + iat_type=iat_type, |
| 249 | + table_suffix=self.table_suffix |
| 250 | + ) |
| 251 | + if response in ['create_success', 'already_exists']: |
| 252 | + result = {"errorCode": 0, "errorDesc": "", "response": response, "writeStatus": "success"} |
| 253 | + else: |
| 254 | + result = {"errorCode": 502, "errorDesc": "", "response": response, "writeStatus": "exception"} |
| 255 | + return json.dumps(result) |
| 256 | + |
| 257 | + def __update_config__(self, config: Dict[str, object]): |
| 258 | + """ |
| 259 | + 可选 |
| 260 | + """ |
| 261 | + pass |
| 262 | + |
| 263 | + def __health_check__(self): |
| 264 | + """ |
| 265 | + 可选 |
| 266 | + """ |
| 267 | + # logging.info(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) |
| 268 | + return True |
| 269 | + |
| 270 | + |
| 271 | +if __name__ == '__main__': |
| 272 | + # ============01 |
| 273 | + # request_type = 'iat_insert' |
| 274 | + # scope = {"model": "test_0313"} |
| 275 | + # # UUID = "820b0052-d9d8-11ee-95f1-52775e3e6fd1" + "==>" + str(time.time()) |
| 276 | + # UUID = str(uuid.uuid1()) + "==>" + str(time.time()) |
| 277 | + # print('UUID: {}'.format(UUID)) |
| 278 | + # img_data = "http://resarch.oss-cn-hangzhou-zmf.aliyuncs.com/transFile%2Ftmp%2FLMM_test_image_coco%2FCOCO_train2014_000000332345.jpg" |
| 279 | + # query = {'text': ['父母带着孩子来这个地方可能会有什么顾虑'], |
| 280 | + # 'imageRaw': '', |
| 281 | + # 'imageUrl': img_data, |
| 282 | + # 'imageId': 'ccc'} |
| 283 | + # answer = "应该注意小孩不要跑到铁轨上" |
| 284 | + # chat_info = [{"query": query, "answer": answer}] |
| 285 | + # data_dict = {'request_type': request_type, 'scope': scope, 'chat_info': chat_info, 'UUID': UUID} |
| 286 | + # r1 = json.dumps(data_dict) |
| 287 | + |
| 288 | + # ============02 |
| 289 | + request_type = 'iat_query' |
| 290 | + UUID = str(uuid.uuid1()) + "==>" + str(time.time()) |
| 291 | + scope = {"model": "test_0313"} |
| 292 | + img_data = 'http://resarch.oss-cn-hangzhou-zmf.aliyuncs.com/transFile%2Ftmp%2FLMM_test_image_coco%2FCOCO_train2014_000000332345.jpg' |
| 293 | + query = {'text': ['父母带着孩子来这个地方可能会有什么顾虑'], |
| 294 | + 'imageRaw': '', |
| 295 | + 'imageUrl': img_data, |
| 296 | + 'multiType': 'IMG_TEXT'} |
| 297 | + r1 = json.dumps({'request_type': request_type, 'scope': scope, 'query': query, 'UUID': UUID}) |
| 298 | + |
| 299 | + # ============03 |
| 300 | + # request_type = 'iat_remove' |
| 301 | + # scope = {"model": "test_0313"} |
| 302 | + # # iat_type = 'IMG_TEXT' |
| 303 | + # remove_type = 'truncate_by_model' |
| 304 | + # r1 = json.dumps({'request_type': request_type, 'scope': scope, 'remove_type': remove_type}) |
| 305 | + |
| 306 | + # ============04 |
| 307 | + # request_type = 'iat_register' |
| 308 | + # scope = {"model": "test_0313"} |
| 309 | + # iat_type = 'IMG_TEXT' |
| 310 | + # r1 = json.dumps({'request_type': request_type, 'scope': scope, 'iat_type': iat_type}) |
| 311 | + |
| 312 | + user_backend = UserBackend() |
| 313 | + resp = user_backend(r1) |
| 314 | + print('resp: {}'.format(resp)) |
0 commit comments