Skip to content

Commit 272eee3

Browse files
committed
feat: Asynchronous processing of logs, notifications and memory additions,handle synchronous and asynchronous environments
1 parent 0ac8355 commit 272eee3

File tree

2 files changed

+215
-58
lines changed

2 files changed

+215
-58
lines changed

src/memos/mem_os/product.py

Lines changed: 169 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
12
import json
23
import os
34
import random
5+
import threading
46
import time
57

68
from collections.abc import Generator
@@ -522,6 +524,163 @@ def _send_message_to_scheduler(
522524
)
523525
self.mem_scheduler.submit_messages(messages=[message_item])
524526

527+
528+
async def _post_chat_processing(
529+
self,
530+
user_id: str,
531+
cube_id: str,
532+
query: str,
533+
full_response: str,
534+
system_prompt: str,
535+
time_start: float,
536+
time_end: float,
537+
speed_improvement: float,
538+
current_messages: list,
539+
) -> None:
540+
"""
541+
Asynchronous processing of logs, notifications and memory additions
542+
"""
543+
try:
544+
logger.info(f"user_id: {user_id}, cube_id: {cube_id}, current_messages: {current_messages}")
545+
logger.info(f"user_id: {user_id}, cube_id: {cube_id}, full_response: {full_response}")
546+
547+
clean_response, extracted_references = self._extract_references_from_response(full_response)
548+
logger.info(f"Extracted {len(extracted_references)} references from response")
549+
550+
# Send chat report notifications asynchronously
551+
if self.online_bot:
552+
try:
553+
from memos.memos_tools.notification_utils import send_online_bot_notification_async
554+
555+
# 准备通知数据
556+
chat_data = {
557+
"query": query,
558+
"user_id": user_id,
559+
"cube_id": cube_id,
560+
"system_prompt": system_prompt,
561+
"full_response": full_response,
562+
}
563+
564+
system_data = {
565+
"references": extracted_references,
566+
"time_start": time_start,
567+
"time_end": time_end,
568+
"speed_improvement": speed_improvement,
569+
}
570+
571+
emoji_config = {"chat": "💬", "system_info": "📊"}
572+
573+
await send_online_bot_notification_async(
574+
online_bot=self.online_bot,
575+
header_name="MemOS Chat Report",
576+
sub_title_name="chat_with_references",
577+
title_color="#00956D",
578+
other_data1=chat_data,
579+
other_data2=system_data,
580+
emoji=emoji_config,
581+
)
582+
except Exception as e:
583+
logger.warning(f"Failed to send chat notification (async): {e}")
584+
585+
self._send_message_to_scheduler(
586+
user_id=user_id, mem_cube_id=cube_id, query=clean_response, label=ANSWER_LABEL
587+
)
588+
589+
self.add(
590+
user_id=user_id,
591+
messages=[
592+
{
593+
"role": "user",
594+
"content": query,
595+
"chat_time": str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
596+
},
597+
{
598+
"role": "assistant",
599+
"content": clean_response, # Store clean text without reference markers
600+
"chat_time": str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
601+
},
602+
],
603+
mem_cube_id=cube_id,
604+
)
605+
606+
logger.info(f"Post-chat processing completed for user {user_id}")
607+
608+
except Exception as e:
609+
logger.error(f"Error in post-chat processing for user {user_id}: {e}", exc_info=True)
610+
611+
def _start_post_chat_processing(
612+
self,
613+
user_id: str,
614+
cube_id: str,
615+
query: str,
616+
full_response: str,
617+
system_prompt: str,
618+
time_start: float,
619+
time_end: float,
620+
speed_improvement: float,
621+
current_messages: list,
622+
) -> None:
623+
"""
624+
Asynchronous processing of logs, notifications and memory additions, handle synchronous and asynchronous environments
625+
"""
626+
def run_async_in_thread():
627+
"""Running asynchronous tasks in a new thread"""
628+
try:
629+
loop = asyncio.new_event_loop()
630+
asyncio.set_event_loop(loop)
631+
try:
632+
loop.run_until_complete(
633+
self._post_chat_processing(
634+
user_id=user_id,
635+
cube_id=cube_id,
636+
query=query,
637+
full_response=full_response,
638+
system_prompt=system_prompt,
639+
time_start=time_start,
640+
time_end=time_end,
641+
speed_improvement=speed_improvement,
642+
current_messages=current_messages,
643+
)
644+
)
645+
finally:
646+
loop.close()
647+
except Exception as e:
648+
logger.error(f"Error in thread-based post-chat processing for user {user_id}: {e}", exc_info=True)
649+
650+
try:
651+
# Try to get the current event loop
652+
asyncio.get_running_loop()
653+
# Create task and store reference to prevent garbage collection
654+
task = asyncio.create_task(
655+
self._post_chat_processing(
656+
user_id=user_id,
657+
cube_id=cube_id,
658+
query=query,
659+
full_response=full_response,
660+
system_prompt=system_prompt,
661+
time_start=time_start,
662+
time_end=time_end,
663+
speed_improvement=speed_improvement,
664+
current_messages=current_messages,
665+
)
666+
)
667+
# Add exception handling for the background task
668+
task.add_done_callback(
669+
lambda t: logger.error(f"Error in background post-chat processing for user {user_id}: {t.exception()}", exc_info=True)
670+
if t.exception()
671+
else None
672+
)
673+
except RuntimeError:
674+
# No event loop, run in a new thread
675+
thread = threading.Thread(
676+
target=run_async_in_thread,
677+
name=f"PostChatProcessing-{user_id}",
678+
# Set as a daemon thread to avoid blocking program exit
679+
daemon=True
680+
)
681+
thread.start()
682+
683+
525684
def _filter_memories_by_threshold(
526685
self, memories: list[TextualMemoryItem], threshold: float = 0.50, min_num: int = 3
527686
) -> list[TextualMemoryItem]:
@@ -895,64 +1054,17 @@ def chat_with_references(
8951054
yield f"data: {json.dumps({'type': 'suggestion', 'data': further_suggestion})}\n\n"
8961055
yield f"data: {json.dumps({'type': 'end'})}\n\n"
8971056

898-
logger.info(f"user_id: {user_id}, cube_id: {cube_id}, current_messages: {current_messages}")
899-
logger.info(f"user_id: {user_id}, cube_id: {cube_id}, full_response: {full_response}")
900-
901-
clean_response, extracted_references = self._extract_references_from_response(full_response)
902-
logger.info(f"Extracted {len(extracted_references)} references from response")
903-
904-
# Send chat report if online_bot is available
905-
try:
906-
from memos.memos_tools.notification_utils import send_online_bot_notification
907-
908-
# Prepare data for online_bot
909-
chat_data = {
910-
"query": query,
911-
"user_id": user_id,
912-
"cube_id": cube_id,
913-
"system_prompt": system_prompt,
914-
"full_response": full_response,
915-
}
916-
917-
system_data = {
918-
"references": extracted_references,
919-
"time_start": time_start,
920-
"time_end": time_end,
921-
"speed_improvement": speed_improvement,
922-
}
923-
924-
emoji_config = {"chat": "💬", "system_info": "📊"}
925-
926-
send_online_bot_notification(
927-
online_bot=self.online_bot,
928-
header_name="MemOS Chat Report",
929-
sub_title_name="chat_with_references",
930-
title_color="#00956D",
931-
other_data1=chat_data,
932-
other_data2=system_data,
933-
emoji=emoji_config,
934-
)
935-
except Exception as e:
936-
logger.warning(f"Failed to send chat notification: {e}")
937-
938-
self._send_message_to_scheduler(
939-
user_id=user_id, mem_cube_id=cube_id, query=clean_response, label=ANSWER_LABEL
940-
)
941-
self.add(
1057+
# Asynchronous processing of logs, notifications and memory additions
1058+
self._start_post_chat_processing(
9421059
user_id=user_id,
943-
messages=[
944-
{
945-
"role": "user",
946-
"content": query,
947-
"chat_time": str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
948-
},
949-
{
950-
"role": "assistant",
951-
"content": clean_response, # Store clean text without reference markers
952-
"chat_time": str(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
953-
},
954-
],
955-
mem_cube_id=cube_id,
1060+
cube_id=cube_id,
1061+
query=query,
1062+
full_response=full_response,
1063+
system_prompt=system_prompt,
1064+
time_start=time_start,
1065+
time_end=time_end,
1066+
speed_improvement=speed_improvement,
1067+
current_messages=current_messages,
9561068
)
9571069

9581070
def get_all(

src/memos/memos_tools/notification_utils.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
22
Notification utilities for MemOS product.
33
"""
4-
4+
import asyncio
55
import logging
66

77
from collections.abc import Callable
@@ -51,6 +51,51 @@ def send_online_bot_notification(
5151
logger.warning(f"Failed to send online bot notification: {e}")
5252

5353

54+
async def send_online_bot_notification_async(
55+
online_bot: Callable | None,
56+
header_name: str,
57+
sub_title_name: str,
58+
title_color: str,
59+
other_data1: dict[str, Any],
60+
other_data2: dict[str, Any],
61+
emoji: dict[str, str],
62+
) -> None:
63+
"""
64+
Send notification via online_bot asynchronously if available.
65+
66+
Args:
67+
online_bot: The online_bot function or None
68+
header_name: Header name for the report
69+
sub_title_name: Subtitle for the report
70+
title_color: Title color
71+
other_data1: First data dict
72+
other_data2: Second data dict
73+
emoji: Emoji configuration dict
74+
"""
75+
if online_bot is None:
76+
return
77+
78+
try:
79+
# Run the potentially blocking notification in a thread pool
80+
loop = asyncio.get_event_loop()
81+
await loop.run_in_executor(
82+
None,
83+
lambda: online_bot(
84+
header_name=header_name,
85+
sub_title_name=sub_title_name,
86+
title_color=title_color,
87+
other_data1=other_data1,
88+
other_data2=other_data2,
89+
emoji=emoji,
90+
)
91+
)
92+
93+
logger.info(f"Online bot notification sent successfully (async): {header_name}")
94+
95+
except Exception as e:
96+
logger.warning(f"Failed to send online bot notification (async): {e}")
97+
98+
5499
def send_error_bot_notification(
55100
error_bot: Callable | None,
56101
err: str,

0 commit comments

Comments
 (0)