Skip to content

Commit 16d60f3

Browse files
committed
Merge remote-tracking branch 'upstream/dev' into dev
2 parents e85109b + 119bbe2 commit 16d60f3

File tree

15 files changed

+65
-51
lines changed

15 files changed

+65
-51
lines changed

src/memos/api/config.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from memos.configs.mem_cube import GeneralMemCubeConfig
1717
from memos.configs.mem_os import MOSConfig
18+
from memos.context.context import ContextThread
1819
from memos.mem_cube.general import GeneralMemCube
1920

2021

@@ -178,7 +179,6 @@ def start_watch_if_enabled(cls) -> None:
178179
if not enable:
179180
return
180181
interval = int(os.getenv("NACOS_WATCH_INTERVAL", "60"))
181-
import threading
182182

183183
def _loop() -> None:
184184
while True:
@@ -188,7 +188,7 @@ def _loop() -> None:
188188
logger.error(f"❌ Nacos watch loop error: {e}")
189189
time.sleep(interval)
190190

191-
threading.Thread(target=_loop, daemon=True).start()
191+
ContextThread(target=_loop, daemon=True).start()
192192
logger.info(f"Nacos watch thread started (interval={interval}s).")
193193

194194
@classmethod
@@ -861,9 +861,9 @@ def create_user_config(user_name: str, user_id: str) -> tuple[MOSConfig, General
861861
"reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower()
862862
== "true",
863863
"memory_size": {
864-
"WorkingMemory": os.getenv("NEBULAR_WORKING_MEMORY", 20),
865-
"LongTermMemory": os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6),
866-
"UserMemory": os.getenv("NEBULAR_USER_MEMORY", 1e6),
864+
"WorkingMemory": int(os.getenv("NEBULAR_WORKING_MEMORY", 20)),
865+
"LongTermMemory": int(os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6)),
866+
"UserMemory": int(os.getenv("NEBULAR_USER_MEMORY", 1e6)),
867867
},
868868
"search_strategy": {
869869
"fast_graph": bool(os.getenv("FAST_GRAPH", "false") == "true"),
@@ -933,9 +933,9 @@ def get_default_cube_config() -> GeneralMemCubeConfig | None:
933933
== "true",
934934
"internet_retriever": internet_config,
935935
"memory_size": {
936-
"WorkingMemory": os.getenv("NEBULAR_WORKING_MEMORY", 20),
937-
"LongTermMemory": os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6),
938-
"UserMemory": os.getenv("NEBULAR_USER_MEMORY", 1e6),
936+
"WorkingMemory": int(os.getenv("NEBULAR_WORKING_MEMORY", 20)),
937+
"LongTermMemory": int(os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6)),
938+
"UserMemory": int(os.getenv("NEBULAR_USER_MEMORY", 1e6)),
939939
},
940940
"search_strategy": {
941941
"fast_graph": bool(os.getenv("FAST_GRAPH", "false") == "true"),

src/memos/api/middleware/request_context.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,18 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response:
7272
f"headers: {request.headers}"
7373
)
7474

75+
response = await call_next(request)
76+
end_time = time.time()
77+
7578
# Process the request
7679
try:
77-
response = await call_next(request)
78-
end_time = time.time()
80+
if not response:
81+
logger.error(
82+
f"Request Failed No Response, path: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms"
83+
)
84+
85+
return response
86+
7987
if response.status_code == 200:
8088
logger.info(
8189
f"Request completed: source: {self.source}, path: {request.url.path}, status: {response.status_code}, cost: {(end_time - start_time) * 1000:.2f}ms"
@@ -89,6 +97,5 @@ async def dispatch(self, request: Request, call_next: Callable) -> Response:
8997
logger.error(
9098
f"Request Exception Error: source: {self.source}, path: {request.url.path}, error: {e}, cost: {(end_time - start_time) * 1000:.2f}ms"
9199
)
92-
raise e
93100

94101
return response

src/memos/api/routers/server_router.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def search_memories(search_req: APISearchRequest):
349349
mem_cube_id=search_req.mem_cube_id,
350350
session_id=search_req.session_id or "default_session",
351351
)
352-
logger.info(f"Search user_id is: {user_context.mem_cube_id}")
352+
logger.info(f"Search Req is: {search_req}")
353353
memories_result: MOSSearchResult = {
354354
"text_mem": [],
355355
"act_mem": [],
@@ -502,6 +502,9 @@ def add_memories(add_req: APIADDRequest):
502502
mem_cube_id=add_req.mem_cube_id,
503503
session_id=add_req.session_id or "default_session",
504504
)
505+
506+
logger.info(f"Add Req is: {add_req}")
507+
505508
target_session_id = add_req.session_id
506509
if not target_session_id:
507510
target_session_id = "default_session"

src/memos/graph_dbs/polardb.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,9 +1740,11 @@ def get_grouped_counts(
17401740
for field in group_fields:
17411741
alias = field.replace(".", "_")
17421742
return_fields.append(
1743-
f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype) AS {alias}"
1743+
f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype)::text AS {alias}"
1744+
)
1745+
group_by_fields.append(
1746+
f"ag_catalog.agtype_access_operator(properties, '\"{field}\"'::agtype)::text"
17441747
)
1745-
group_by_fields.append(alias)
17461748

17471749
# Full SQL query construction
17481750
query = f"""
@@ -1751,7 +1753,6 @@ def get_grouped_counts(
17511753
{where_clause}
17521754
GROUP BY {", ".join(group_by_fields)}
17531755
"""
1754-
17551756
conn = self._get_connection()
17561757
try:
17571758
with conn.cursor() as cursor:
@@ -1772,7 +1773,7 @@ def get_grouped_counts(
17721773
else:
17731774
group_values[field] = str(value)
17741775
count_value = row[-1] # Last column is count
1775-
output.append({**group_values, "count": count_value})
1776+
output.append({**group_values, "count": int(count_value)})
17761777

17771778
return output
17781779

src/memos/mem_reader/simple_struct.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,18 @@ def detect_lang(text):
6767
try:
6868
if not text or not isinstance(text, str):
6969
return "en"
70+
cleaned_text = text
71+
# remove role and timestamp
72+
cleaned_text = re.sub(
73+
r"\b(user|assistant|query|answer)\s*:", "", cleaned_text, flags=re.IGNORECASE
74+
)
75+
cleaned_text = re.sub(r"\[[\d\-:\s]+\]", "", cleaned_text)
76+
77+
# extract chinese characters
7078
chinese_pattern = r"[\u4e00-\u9fff\u3400-\u4dbf\U00020000-\U0002a6df\U0002a700-\U0002b73f\U0002b740-\U0002b81f\U0002b820-\U0002ceaf\uf900-\ufaff]"
71-
chinese_chars = re.findall(chinese_pattern, text)
72-
if len(chinese_chars) / len(re.sub(r"[\s\d\W]", "", text)) > 0.3:
79+
chinese_chars = re.findall(chinese_pattern, cleaned_text)
80+
text_without_special = re.sub(r"[\s\d\W]", "", cleaned_text)
81+
if text_without_special and len(chinese_chars) / len(text_without_special) > 0.3:
7382
return "zh"
7483
return "en"
7584
except Exception:
@@ -466,15 +475,11 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]:
466475
if type == "chat":
467476
for items in scene_data:
468477
result = []
469-
for item in items:
470-
# Convert dictionary to string
471-
if "chat_time" in item:
472-
result.append(item)
473-
else:
474-
result.append(item)
478+
for i, item in enumerate(items):
479+
result.append(item)
475480
if len(result) >= 10:
476481
results.append(result)
477-
context = copy.deepcopy(result[-2:])
482+
context = copy.deepcopy(result[-2:]) if i + 1 < len(items) else []
478483
result = context
479484
if result:
480485
results.append(result)

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from sqlalchemy.engine import Engine
1313

1414
from memos.configs.mem_scheduler import AuthConfig, BaseSchedulerConfig
15+
from memos.context.context import ContextThread
1516
from memos.llms.base import BaseLLM
1617
from memos.log import get_logger
1718
from memos.mem_cube.general import GeneralMemCube
@@ -689,7 +690,7 @@ def start(self) -> None:
689690
logger.info("Message consumer process started")
690691
else:
691692
# Default to thread mode
692-
self._consumer_thread = threading.Thread(
693+
self._consumer_thread = ContextThread(
693694
target=self._message_consumer,
694695
daemon=True,
695696
name="MessageConsumerThread",

src/memos/mem_scheduler/general_modules/task_threads.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from concurrent.futures import as_completed
66
from typing import Any, TypeVar
77

8+
from memos.context.context import ContextThread
89
from memos.log import get_logger
910
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
1011

@@ -138,7 +139,7 @@ def worker(task_name: str, func: Callable, args: tuple):
138139

139140
# Start all threads
140141
for task_name, (func, args) in tasks.items():
141-
thread = threading.Thread(
142+
thread = ContextThread(
142143
target=worker, args=(task_name, func, args), name=f"task-{task_name}"
143144
)
144145
threads[task_name] = thread
@@ -283,7 +284,7 @@ def run_race(
283284

284285
# Create and start threads for each task
285286
for task_name, task_func in tasks.items():
286-
thread = threading.Thread(
287+
thread = ContextThread(
287288
target=self.worker, args=(task_func, task_name), name=f"race-{task_name}"
288289
)
289290
self.threads[task_name] = thread

src/memos/mem_scheduler/monitors/dispatcher_monitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from time import perf_counter
55

66
from memos.configs.mem_scheduler import BaseSchedulerConfig
7-
from memos.context.context import ContextThreadPoolExecutor
7+
from memos.context.context import ContextThread, ContextThreadPoolExecutor
88
from memos.log import get_logger
99
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
1010
from memos.mem_scheduler.general_modules.dispatcher import SchedulerDispatcher
@@ -340,7 +340,7 @@ def start(self) -> bool:
340340
return False
341341

342342
self._running = True
343-
self._monitor_thread = threading.Thread(
343+
self._monitor_thread = ContextThread(
344344
target=self._monitor_loop, name="threadpool_monitor", daemon=True
345345
)
346346
self._monitor_thread.start()

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77

88
from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
9+
from memos.context.context import ContextThread
910
from memos.dependency import require_python_package
1011
from memos.log import get_logger
1112
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
@@ -96,7 +97,7 @@ def initialize_rabbitmq(
9697
)
9798

9899
# Start IOLoop in dedicated thread
99-
self._io_loop_thread = threading.Thread(
100+
self._io_loop_thread = ContextThread(
100101
target=self.rabbitmq_connection.ioloop.start, daemon=True
101102
)
102103
self._io_loop_thread.start()

src/memos/mem_scheduler/webservice_modules/redis_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import asyncio
22
import os
33
import subprocess
4-
import threading
54
import time
65

76
from collections.abc import Callable
87
from typing import Any
98

9+
from memos.context.context import ContextThread
1010
from memos.dependency import require_python_package
1111
from memos.log import get_logger
1212
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
@@ -41,7 +41,7 @@ def __init__(self):
4141
self.query_list_capacity = 1000
4242

4343
self._redis_listener_running = False
44-
self._redis_listener_thread: threading.Thread | None = None
44+
self._redis_listener_thread: ContextThread | None = None
4545
self._redis_listener_loop: asyncio.AbstractEventLoop | None = None
4646

4747
@property
@@ -336,7 +336,7 @@ def redis_start_listening(self, handler: Callable | None = None):
336336
if handler is None:
337337
handler = self.redis_consume_message_stream
338338

339-
self._redis_listener_thread = threading.Thread(
339+
self._redis_listener_thread = ContextThread(
340340
target=self._redis_run_listener_async,
341341
args=(handler,),
342342
daemon=True,

0 commit comments

Comments
 (0)