Skip to content

Commit 5e4f695

Browse files
CarltonXiangharvey_xiangCaralHsi
authored
Fix/no response (#464)
* fix: response error * fix: response error * fix: response error * feat: replace context thread --------- Co-authored-by: harvey_xiang <[email protected]> Co-authored-by: CaralHsi <[email protected]>
1 parent 4e500a9 commit 5e4f695

File tree

7 files changed

+17
-15
lines changed

7 files changed

+17
-15
lines changed

src/memos/api/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from memos.configs.mem_cube import GeneralMemCubeConfig
1717
from memos.configs.mem_os import MOSConfig
18+
from memos.context.context import ContextThread
1819
from memos.mem_cube.general import GeneralMemCube
1920

2021

@@ -178,7 +179,6 @@ def start_watch_if_enabled(cls) -> None:
178179
if not enable:
179180
return
180181
interval = int(os.getenv("NACOS_WATCH_INTERVAL", "60"))
181-
import threading
182182

183183
def _loop() -> None:
184184
while True:
@@ -188,7 +188,7 @@ def _loop() -> None:
188188
logger.error(f"❌ Nacos watch loop error: {e}")
189189
time.sleep(interval)
190190

191-
threading.Thread(target=_loop, daemon=True).start()
191+
ContextThread(target=_loop, daemon=True).start()
192192
logger.info(f"Nacos watch thread started (interval={interval}s).")
193193

194194
@classmethod

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from sqlalchemy.engine import Engine
1313

1414
from memos.configs.mem_scheduler import AuthConfig, BaseSchedulerConfig
15+
from memos.context.context import ContextThread
1516
from memos.llms.base import BaseLLM
1617
from memos.log import get_logger
1718
from memos.mem_cube.general import GeneralMemCube
@@ -689,7 +690,7 @@ def start(self) -> None:
689690
logger.info("Message consumer process started")
690691
else:
691692
# Default to thread mode
692-
self._consumer_thread = threading.Thread(
693+
self._consumer_thread = ContextThread(
693694
target=self._message_consumer,
694695
daemon=True,
695696
name="MessageConsumerThread",

src/memos/mem_scheduler/general_modules/task_threads.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from concurrent.futures import as_completed
66
from typing import Any, TypeVar
77

8+
from memos.context.context import ContextThread
89
from memos.log import get_logger
910
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
1011

@@ -138,7 +139,7 @@ def worker(task_name: str, func: Callable, args: tuple):
138139

139140
# Start all threads
140141
for task_name, (func, args) in tasks.items():
141-
thread = threading.Thread(
142+
thread = ContextThread(
142143
target=worker, args=(task_name, func, args), name=f"task-{task_name}"
143144
)
144145
threads[task_name] = thread
@@ -283,7 +284,7 @@ def run_race(
283284

284285
# Create and start threads for each task
285286
for task_name, task_func in tasks.items():
286-
thread = threading.Thread(
287+
thread = ContextThread(
287288
target=self.worker, args=(task_func, task_name), name=f"race-{task_name}"
288289
)
289290
self.threads[task_name] = thread

src/memos/mem_scheduler/monitors/dispatcher_monitor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from time import perf_counter
55

66
from memos.configs.mem_scheduler import BaseSchedulerConfig
7-
from memos.context.context import ContextThreadPoolExecutor
7+
from memos.context.context import ContextThread, ContextThreadPoolExecutor
88
from memos.log import get_logger
99
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
1010
from memos.mem_scheduler.general_modules.dispatcher import SchedulerDispatcher
@@ -340,7 +340,7 @@ def start(self) -> bool:
340340
return False
341341

342342
self._running = True
343-
self._monitor_thread = threading.Thread(
343+
self._monitor_thread = ContextThread(
344344
target=self._monitor_loop, name="threadpool_monitor", daemon=True
345345
)
346346
self._monitor_thread.start()

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from pathlib import Path
77

88
from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
9+
from memos.context.context import ContextThread
910
from memos.dependency import require_python_package
1011
from memos.log import get_logger
1112
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
@@ -96,7 +97,7 @@ def initialize_rabbitmq(
9697
)
9798

9899
# Start IOLoop in dedicated thread
99-
self._io_loop_thread = threading.Thread(
100+
self._io_loop_thread = ContextThread(
100101
target=self.rabbitmq_connection.ioloop.start, daemon=True
101102
)
102103
self._io_loop_thread.start()

src/memos/mem_scheduler/webservice_modules/redis_service.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import asyncio
22
import os
33
import subprocess
4-
import threading
54
import time
65

76
from collections.abc import Callable
87
from typing import Any
98

9+
from memos.context.context import ContextThread
1010
from memos.dependency import require_python_package
1111
from memos.log import get_logger
1212
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
@@ -41,7 +41,7 @@ def __init__(self):
4141
self.query_list_capacity = 1000
4242

4343
self._redis_listener_running = False
44-
self._redis_listener_thread: threading.Thread | None = None
44+
self._redis_listener_thread: ContextThread | None = None
4545
self._redis_listener_loop: asyncio.AbstractEventLoop | None = None
4646

4747
@property
@@ -336,7 +336,7 @@ def redis_start_listening(self, handler: Callable | None = None):
336336
if handler is None:
337337
handler = self.redis_consume_message_stream
338338

339-
self._redis_listener_thread = threading.Thread(
339+
self._redis_listener_thread = ContextThread(
340340
target=self._redis_run_listener_async,
341341
args=(handler,),
342342
daemon=True,

src/memos/memories/textual/tree_text_memory/organize/reorganizer.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import threading
32
import time
43
import traceback
54

@@ -10,7 +9,7 @@
109

1110
import numpy as np
1211

13-
from memos.context.context import ContextThreadPoolExecutor
12+
from memos.context.context import ContextThread, ContextThreadPoolExecutor
1413
from memos.dependency import require_python_package
1514
from memos.embedders.factory import OllamaEmbedder
1615
from memos.graph_dbs.item import GraphDBEdge, GraphDBNode
@@ -94,12 +93,12 @@ def __init__(
9493
self._reorganize_needed = True
9594
if self.is_reorganize:
9695
# ____ 1. For queue message driven thread ___________
97-
self.thread = threading.Thread(target=self._run_message_consumer_loop)
96+
self.thread = ContextThread(target=self._run_message_consumer_loop)
9897
self.thread.start()
9998
# ____ 2. For periodic structure optimization _______
10099
self._stop_scheduler = False
101100
self._is_optimizing = {"LongTermMemory": False, "UserMemory": False}
102-
self.structure_optimizer_thread = threading.Thread(
101+
self.structure_optimizer_thread = ContextThread(
103102
target=self._run_structure_organizer_loop
104103
)
105104
self.structure_optimizer_thread.start()

0 commit comments

Comments
 (0)