@@ -142,7 +142,7 @@ def make_chunk(content=None, **kwargs):
142142
143143 async def save_messages_from_langgraph_state (
144144 agent_instance ,
145- conversation ,
145+ thread_id ,
146146 conv_mgr ,
147147 config_dict ,
148148 ):
@@ -162,11 +162,11 @@ async def save_messages_from_langgraph_state(
162162 logger .debug (f"Retrieved { len (messages )} messages from LangGraph state" )
163163
164164 # 获取已保存的消息数量,避免重复保存
165- existing_messages = conv_mgr .get_messages ( conversation . id )
165+ existing_messages = conv_mgr .get_messages_by_thread_id ( thread_id )
166166 existing_count = len (existing_messages )
167167
168168 # 只保存新增的消息
169- new_messages = messages [ existing_count :]
169+ new_messages = messages
170170
171171 for msg in new_messages :
172172 msg_dict = msg .model_dump () if hasattr (msg , "model_dump" ) else {}
@@ -190,8 +190,8 @@ async def save_messages_from_langgraph_state(
190190 msg_dict ["response_metadata" ]["model_name" ] = model_name [: len (model_name ) // repeat_count ]
191191
192192 # 保存 AI 消息
193- ai_msg = conv_mgr .add_message (
194- conversation_id = conversation . id ,
193+ ai_msg = conv_mgr .add_message_by_thread_id (
194+ thread_id = thread_id ,
195195 role = "assistant" ,
196196 content = content ,
197197 message_type = "text" ,
@@ -236,6 +236,10 @@ async def save_messages_from_langgraph_state(
236236 else :
237237 logger .warning (f"Tool call { tool_call_id } not found for update" )
238238
239+ else :
240+ logger .warning (f"Unknown message type: { msg_type } , skipping" )
241+ continue
242+
239243 logger .debug (f"Processed message type={ msg_type } " )
240244
241245 logger .info (f"Saved { len (new_messages )} new messages from LangGraph state" )
@@ -249,7 +253,7 @@ async def stream_messages():
249253 yield make_chunk (status = "init" , meta = meta , msg = HumanMessage (content = query ).model_dump ())
250254
251255 # Input guard
252- if conf .enable_content_guard and content_guard .check (query ):
256+ if conf .enable_content_guard and await content_guard .check (query ):
253257 yield make_chunk (status = "error" , message = "输入内容包含敏感词" , meta = meta )
254258 return
255259
@@ -265,91 +269,74 @@ async def stream_messages():
265269 # 构造运行时配置,如果没有thread_id则生成一个
266270 user_id = str (current_user .id )
267271 thread_id = config .get ("thread_id" )
268-
269272 input_context = {"user_id" : user_id , "thread_id" : thread_id }
270273
274+ if not thread_id :
275+ thread_id = str (uuid .uuid4 ())
276+ logger .warning (f"No thread_id provided, generated new thread_id: { thread_id } " )
277+
278+
271279 # Initialize conversation manager
272280 conv_manager = ConversationManager (db )
273281
274- # Get or create conversation
275- conversation = None
276- if thread_id :
277- conversation = conv_manager .get_conversation_by_thread_id (thread_id )
278- if not conversation :
279- try :
280- # Auto-create conversation for existing thread
281- conversation = conv_manager .create_conversation (
282- user_id = user_id ,
283- agent_id = agent_id ,
284- title = (query [:50 ] + "..." if len (query ) > 50 else query ) if query else "新的对话" ,
285- thread_id = thread_id ,
286- )
287- logger .info (f"Auto-created conversation for thread_id { thread_id } " )
288- except Exception as e :
289- logger .error (f"Failed to auto-create conversation: { e } " )
290- conversation = None
291-
292282 # Save user message
293- if conversation :
294- try :
295- conv_manager .add_message (
296- conversation_id = conversation .id ,
297- role = "user" ,
298- content = query ,
299- message_type = "text" ,
300- extra_metadata = {"raw_message" : HumanMessage (content = query ).model_dump ()},
301- )
302- except Exception as e :
303- logger .error (f"Error saving user message: { e } " )
283+ try :
284+ conv_manager .add_message_by_thread_id (
285+ thread_id = thread_id ,
286+ role = "user" ,
287+ content = query ,
288+ message_type = "text" ,
289+ extra_metadata = {"raw_message" : HumanMessage (content = query ).model_dump ()},
290+ )
291+ except Exception as e :
292+ logger .error (f"Error saving user message: { e } " )
304293
305294 try :
306- # Stream messages (only for display, don't save yet)
295+ full_ai_content = ""
307296 async for msg , metadata in agent .stream_messages (messages , input_context = input_context ):
308297 if isinstance (msg , AIMessageChunk ):
309- # Content guard
310- if conf .enable_content_guard and content_guard .check (msg .content ):
298+
299+ full_ai_content += msg .content
300+ if conf .enable_content_guard and await content_guard .check_with_keywords (full_ai_content [- 20 :]):
311301 logger .warning ("Sensitive content detected in stream" )
312302 yield make_chunk (message = "检测到敏感内容,已中断输出" , status = "error" )
313303 return
314304
315305 yield make_chunk (content = msg .content , msg = msg .model_dump (), metadata = metadata , status = "loading" )
316306
317- elif isinstance (msg , ToolMessage ):
318- yield make_chunk (msg = msg .model_dump (), metadata = metadata , status = "loading" )
319307 else :
320308 yield make_chunk (msg = msg .model_dump (), metadata = metadata , status = "loading" )
321309
310+ if conf .enable_content_guard and await content_guard .check (full_ai_content ):
311+ logger .warning ("Sensitive content detected in final message" )
312+ yield make_chunk (message = "检测到敏感内容,已中断输出" , status = "error" )
313+ return
314+
322315 yield make_chunk (status = "finished" , meta = meta )
323316
324317 # After streaming finished, save all messages from LangGraph state
325- if conversation :
326- langgraph_config = { "configurable" : { "thread_id" : thread_id , "user_id" : user_id }}
327- await save_messages_from_langgraph_state (
328- agent_instance = agent ,
329- conversation = conversation ,
330- conv_mgr = conv_manager ,
331- config_dict = langgraph_config ,
332- )
318+ langgraph_config = { "configurable" : input_context }
319+ await save_messages_from_langgraph_state (
320+ agent_instance = agent ,
321+ thread_id = thread_id ,
322+ conv_mgr = conv_manager ,
323+ config_dict = langgraph_config ,
324+ )
325+
333326 except (asyncio .CancelledError , ConnectionError ) as e :
334327 # 客户端主动中断连接,尝试保存已生成的部分内容
335- logger .info (f"Client disconnected for thread { thread_id } : { e } " )
336- try :
337- if conversation :
338- langgraph_config = {"configurable" : {"thread_id" : thread_id , "user_id" : user_id }}
339- await save_messages_from_langgraph_state (
340- agent_instance = agent ,
341- conversation = conversation ,
342- conv_mgr = conv_manager ,
343- config_dict = langgraph_config ,
344- )
345- except Exception as save_error :
346- logger .error (f"Error saving partial messages after disconnect: { save_error } " )
328+ logger .warning (f"Client disconnected for thread { thread_id } : { e } " )
329+ langgraph_config = {"configurable" : input_context }
330+ await save_messages_from_langgraph_state (
331+ agent_instance = agent ,
332+ thread_id = thread_id ,
333+ conv_mgr = conv_manager ,
334+ config_dict = langgraph_config ,
335+ )
336+
347337 # 通知前端中断(可能发送不到,但用于一致性)
348- try :
349- yield make_chunk (status = "interrupted" , message = "对话已中断" , meta = meta )
350- except Exception :
351- pass
352- return
338+ yield make_chunk (status = "interrupted" , message = "对话已中断" , meta = meta )
339+
353340 except Exception as e :
354341 logger .error (f"Error streaming messages: { e } , { traceback .format_exc ()} " )
355342 yield make_chunk (message = f"Error streaming messages: { e } " , status = "error" )
0 commit comments