Skip to content

Commit 8b9fb1f

Browse files
committed
perf: Memory optimization
1 parent 2194b8b commit 8b9fb1f

File tree

5 files changed

+169
-37
lines changed

5 files changed

+169
-37
lines changed

apps/application/flow/tools.py

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
"""
99
import asyncio
1010
import json
11-
import traceback
11+
import queue
12+
import threading
1213
from typing import Iterator
1314

1415
from django.http import StreamingHttpResponse
@@ -227,6 +228,30 @@ def generate_tool_message_template(name, context):
227228
return tool_message_template % (name, tool_message_json_template % (context))
228229

229230

231+
# 全局单例事件循环
232+
_global_loop = None
233+
_loop_thread = None
234+
_loop_lock = threading.Lock()
235+
236+
237+
def get_global_loop():
238+
"""获取全局共享的事件循环"""
239+
global _global_loop, _loop_thread
240+
241+
with _loop_lock:
242+
if _global_loop is None:
243+
_global_loop = asyncio.new_event_loop()
244+
245+
def run_forever():
246+
asyncio.set_event_loop(_global_loop)
247+
_global_loop.run_forever()
248+
249+
_loop_thread = threading.Thread(target=run_forever, daemon=True, name="GlobalAsyncLoop")
250+
_loop_thread.start()
251+
252+
return _global_loop
253+
254+
230255
async def _yield_mcp_response(chat_model, message_list, mcp_servers, mcp_output_enable=True):
231256
client = MultiServerMCPClient(json.loads(mcp_servers))
232257
tools = await client.get_tools()
@@ -242,19 +267,31 @@ async def _yield_mcp_response(chat_model, message_list, mcp_servers, mcp_output_
242267

243268

244269
def mcp_response_generator(chat_model, message_list, mcp_servers, mcp_output_enable=True):
245-
loop = asyncio.new_event_loop()
246-
try:
247-
async_gen = _yield_mcp_response(chat_model, message_list, mcp_servers, mcp_output_enable)
248-
while True:
249-
try:
250-
chunk = loop.run_until_complete(anext_async(async_gen))
251-
yield chunk
252-
except StopAsyncIteration:
253-
break
254-
except Exception as e:
255-
maxkb_logger.error(f'Exception: {e}', exc_info=True)
256-
finally:
257-
loop.close()
270+
"""使用全局事件循环,不创建新实例"""
271+
result_queue = queue.Queue()
272+
loop = get_global_loop() # 使用共享循环
273+
274+
async def _run():
275+
try:
276+
async_gen = _yield_mcp_response(chat_model, message_list, mcp_servers, mcp_output_enable)
277+
async for chunk in async_gen:
278+
result_queue.put(('data', chunk))
279+
except Exception as e:
280+
maxkb_logger.error(f'Exception: {e}', exc_info=True)
281+
result_queue.put(('error', e))
282+
finally:
283+
result_queue.put(('done', None))
284+
285+
# 在全局循环中调度任务
286+
asyncio.run_coroutine_threadsafe(_run(), loop)
287+
288+
while True:
289+
msg_type, data = result_queue.get()
290+
if msg_type == 'done':
291+
break
292+
if msg_type == 'error':
293+
raise data
294+
yield data
258295

259296

260297
async def anext_async(agen):

apps/application/flow/workflow_manage.py

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -234,25 +234,62 @@ def run_block(self, language='zh'):
234234
非流式响应
235235
@return: 结果
236236
"""
237-
self.run_chain_async(None, None, language)
238-
while self.is_run():
239-
pass
240-
details = self.get_runtime_details()
241-
message_tokens = sum([row.get('message_tokens') for row in details.values() if
242-
'message_tokens' in row and row.get('message_tokens') is not None])
243-
answer_tokens = sum([row.get('answer_tokens') for row in details.values() if
244-
'answer_tokens' in row and row.get('answer_tokens') is not None])
245-
answer_text_list = self.get_answer_text_list()
246-
answer_text = '\n\n'.join(
247-
'\n\n'.join([a.get('content') for a in answer]) for answer in
248-
answer_text_list)
249-
answer_list = reduce(lambda pre, _n: [*pre, *_n], answer_text_list, [])
250-
self.work_flow_post_handler.handler(self)
251-
return self.base_to_response.to_block_response(self.params['chat_id'],
252-
self.params['chat_record_id'], answer_text, True
253-
, message_tokens, answer_tokens,
254-
_status=status.HTTP_200_OK if self.status == 200 else status.HTTP_500_INTERNAL_SERVER_ERROR,
255-
other_params={'answer_list': answer_list})
237+
try:
238+
self.run_chain_async(None, None, language)
239+
while self.is_run():
240+
pass
241+
details = self.get_runtime_details()
242+
message_tokens = sum([row.get('message_tokens') for row in details.values() if
243+
'message_tokens' in row and row.get('message_tokens') is not None])
244+
answer_tokens = sum([row.get('answer_tokens') for row in details.values() if
245+
'answer_tokens' in row and row.get('answer_tokens') is not None])
246+
answer_text_list = self.get_answer_text_list()
247+
answer_text = '\n\n'.join(
248+
'\n\n'.join([a.get('content') for a in answer]) for answer in
249+
answer_text_list)
250+
answer_list = reduce(lambda pre, _n: [*pre, *_n], answer_text_list, [])
251+
self.work_flow_post_handler.handler(self)
252+
253+
res = self.base_to_response.to_block_response(self.params['chat_id'],
254+
self.params['chat_record_id'], answer_text, True
255+
, message_tokens, answer_tokens,
256+
_status=status.HTTP_200_OK if self.status == 200 else status.HTTP_500_INTERNAL_SERVER_ERROR,
257+
other_params={'answer_list': answer_list})
258+
finally:
259+
self._cleanup()
260+
return res
261+
262+
def _cleanup(self):
263+
"""清理所有对象引用"""
264+
# 清理列表
265+
self.future_list.clear()
266+
self.field_list.clear()
267+
self.global_field_list.clear()
268+
self.chat_field_list.clear()
269+
self.image_list.clear()
270+
self.video_list.clear()
271+
self.document_list.clear()
272+
self.audio_list.clear()
273+
self.other_list.clear()
274+
if hasattr(self, 'node_context'):
275+
self.node_context.clear()
276+
277+
# 清理字典
278+
self.context.clear()
279+
self.chat_context.clear()
280+
self.form_data.clear()
281+
282+
# 清理对象引用
283+
self.node_chunk_manage = None
284+
self.work_flow_post_handler = None
285+
self.flow = None
286+
self.start_node = None
287+
self.current_node = None
288+
self.current_result = None
289+
self.chat_record = None
290+
self.base_to_response = None
291+
self.params = None
292+
self.lock = None
256293

257294
def run_stream(self, current_node, node_result_future, language='zh'):
258295
"""
@@ -307,6 +344,7 @@ def await_result(self):
307344
'',
308345
[],
309346
'', True, message_tokens, answer_tokens, {})
347+
self._cleanup()
310348

311349
def run_chain_async(self, current_node, node_result_future, language='zh'):
312350
future = executor.submit(self.run_chain_manage, current_node, node_result_future, language)

apps/application/serializers/common.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
@date:2025/6/9 13:42
77
@desc:
88
"""
9-
from datetime import datetime
109
from typing import List
1110

1211
from django.core.cache import cache
@@ -226,10 +225,66 @@ def append_chat_record(self, chat_record: ChatRecord):
226225
chat_record.save()
227226
ChatCountSerializer(data={'chat_id': self.chat_id}).update_chat()
228227

228+
def to_dict(self):
229+
230+
return {
231+
'chat_id': self.chat_id,
232+
'chat_user_id': self.chat_user_id,
233+
'chat_user_type': self.chat_user_type,
234+
'knowledge_id_list': self.knowledge_id_list,
235+
'exclude_document_id_list': self.exclude_document_id_list,
236+
'application_id': self.application_id,
237+
'chat_record_list': [self.chat_record_to_map(c) for c in self.chat_record_list],
238+
'debug': self.debug
239+
}
240+
241+
def chat_record_to_map(self, chat_record):
242+
return {'id': chat_record.id,
243+
'chat_id': chat_record.chat_id,
244+
'vote_status': chat_record.vote_status,
245+
'problem_text': chat_record.problem_text,
246+
'answer_text': chat_record.answer_text,
247+
'answer_text_list': chat_record.answer_text_list,
248+
'message_tokens': chat_record.message_tokens,
249+
'answer_tokens': chat_record.answer_tokens,
250+
'const': chat_record.const,
251+
'details': chat_record.details,
252+
'improve_paragraph_id_list': chat_record.improve_paragraph_id_list,
253+
'run_time': chat_record.run_time,
254+
'index': chat_record.index}
255+
256+
@staticmethod
257+
def map_to_chat_record(chat_record_dict):
258+
ChatRecord(id=chat_record_dict.get('id'),
259+
chat_id=chat_record_dict.get('chat_id'),
260+
vote_status=chat_record_dict.get('vote_status'),
261+
problem_text=chat_record_dict.get('problem_text'),
262+
answer_text=chat_record_dict.get('answer_text'),
263+
answer_text_list=chat_record_dict.get('answer_text_list'),
264+
message_tokens=chat_record_dict.get('message_tokens'),
265+
answer_tokens=chat_record_dict.get('answer_tokens'),
266+
const=chat_record_dict.get('const'),
267+
details=chat_record_dict.get('details'),
268+
improve_paragraph_id_list=chat_record_dict.get('improve_paragraph_id_list'),
269+
run_time=chat_record_dict.get('run_time'),
270+
index=chat_record_dict.get('index'), )
271+
229272
def set_cache(self):
230-
cache.set(Cache_Version.CHAT.get_key(key=self.chat_id), self, version=Cache_Version.CHAT.get_version(),
273+
cache.set(Cache_Version.CHAT.get_key(key=self.chat_id), self.to_dict(),
274+
version=Cache_Version.CHAT_INFO.get_version(),
231275
timeout=60 * 30)
232276

277+
@staticmethod
278+
def map_to_chat_info(chat_info_dict):
279+
return ChatInfo(chat_info_dict.get('chat_id'), chat_info_dict.get('chat_user_id'),
280+
chat_info_dict.get('chat_user_type'), chat_info_dict.get('knowledge_id_list'),
281+
chat_info_dict.get('exclude_document_id_list'),
282+
chat_info_dict.get('application_id'),
283+
[ChatInfo.map_to_chat_record(c_r) for c_r in chat_info_dict.get('chat_record_list')])
284+
233285
@staticmethod
234286
def get_cache(chat_id):
235-
return cache.get(Cache_Version.CHAT.get_key(key=chat_id), version=Cache_Version.CHAT.get_version())
287+
chat_info_dict = cache.get(Cache_Version.CHAT.get_key(key=chat_id), version=Cache_Version.CHAT_INFO.get_version())
288+
if chat_info_dict:
289+
return ChatInfo.map_to_chat_info(chat_info_dict)
290+
return None

apps/common/auth/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def to_string(self):
4545
value = json.dumps(self.to_dict())
4646
authentication = encrypt(value)
4747
cache_key = hashlib.sha256(authentication.encode()).hexdigest()
48-
authentication_cache.set(cache_key, value, version=Cache_Version.CHAT.value, timeout=60 * 60 * 2)
48+
authentication_cache.set(cache_key, value, version=Cache_Version.CHAT.get_version(), timeout=60 * 60 * 2)
4949
return authentication
5050

5151
@staticmethod

apps/common/constants/cache_version.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class Cache_Version(Enum):
3030
# 对话
3131
CHAT = "CHAT", lambda key: key
3232

33+
CHAT_INFO = "CHAT_INFO", lambda key: key
34+
3335
CHAT_VARIABLE = "CHAT_VARIABLE", lambda key: key
3436

3537
# 应用API KEY

0 commit comments

Comments
 (0)