Skip to content

Commit a3f6636

Browse files
committed
addressed a range of bugs to make scheduler running correctly
1 parent f520cca commit a3f6636

File tree

9 files changed

+13
-41
lines changed

9 files changed

+13
-41
lines changed

src/memos/api/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def start_config_watch(cls):
174174
@classmethod
175175
def start_watch_if_enabled(cls) -> None:
176176
enable = os.getenv("NACOS_ENABLE_WATCH", "false").lower() == "true"
177-
print("enable:", enable)
177+
logger.info(f"NACOS_ENABLE_WATCH: {enable}")
178178
if not enable:
179179
return
180180
interval = int(os.getenv("NACOS_WATCH_INTERVAL", "60"))

src/memos/log.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def close(self):
187187
},
188188
"handlers": {
189189
"console": {
190-
"level": "DEBUG",
190+
"level": "WARNING",
191191
"class": "logging.StreamHandler",
192192
"stream": stdout,
193193
"formatter": "no_datetime",

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,8 +539,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt
539539
if self.disable_handlers and message.label in self.disable_handlers:
540540
logger.info(f"Skipping disabled handler: {message.label} - {message.content}")
541541
continue
542-
self.memos_message_queue.put(message)
543-
logger.info(f"Submitted message to local queue: {message.label} - {message.content}")
542+
self.memos_message_queue.put(message)
543+
logger.info(f"Submitted message to local queue: {message.label} - {message.content}")
544544

545545
with contextlib.suppress(Exception):
546546
if messages:
@@ -609,7 +609,6 @@ def _message_consumer(self) -> None:
609609

610610
if messages:
611611
try:
612-
print(f"dispatch {len(messages)} messages")
613612
self.dispatcher.dispatch(messages)
614613
except Exception as e:
615614
logger.error(f"Error dispatching messages: {e!s}")

src/memos/mem_scheduler/general_modules/dispatcher.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,9 +418,6 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]):
418418
logger.info(
419419
f"Dispatch {len(msgs)} message(s) to {label} handler for user {user_id} and mem_cube {mem_cube_id}."
420420
)
421-
print(
422-
f"Dispatch {len(msgs)} message(s) to {label} handler for user {user_id} and mem_cube {mem_cube_id}."
423-
)
424421
else:
425422
wrapped_handler(msgs)
426423

src/memos/mem_scheduler/general_modules/redis_queue.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,6 @@ def get(
169169
raise ConnectionError("Not connected to Redis. Redis connection not available.")
170170

171171
try:
172-
# Ensure the consumer group and stream exist before reading
173-
self._ensure_consumer_group()
174-
175172
# Calculate timeout for Redis
176173
redis_timeout = None
177174
if block and timeout is not None:
@@ -195,7 +192,6 @@ def get(
195192
logger.warning(
196193
f"Consumer group or stream missing for '{self.stream_name}/{self.consumer_group}'. Attempting to create and retry."
197194
)
198-
self._ensure_consumer_group()
199195
messages = self._redis_conn.xreadgroup(
200196
self.consumer_group,
201197
self.consumer_name,
@@ -263,9 +259,6 @@ def qsize(self) -> int:
263259
return 0
264260

265261
try:
266-
# Ensure consumer group exists
267-
self._ensure_consumer_group()
268-
269262
# Get pending messages info for the consumer group
270263
# XPENDING returns info about pending messages that haven't been acknowledged
271264
pending_info = self._redis_conn.xpending(self.stream_name, self.consumer_group)
@@ -432,7 +425,6 @@ def connect(self) -> None:
432425
# Test the connection
433426
self._redis_conn.ping()
434427
self._is_connected = True
435-
self._ensure_consumer_group()
436428
logger.debug("Redis connection established successfully")
437429
except Exception as e:
438430
logger.error(f"Failed to connect to Redis: {e}")

src/memos/mem_scheduler/memory_manage_modules/retriever.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from memos.memories.textual.item import TextualMemoryMetadata
2222
from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory
2323

24+
# Extract JSON response
2425
from .memory_filter import MemoryFilter
2526

2627

@@ -63,9 +64,6 @@ def evaluate_memory_answer_ability(
6364
response = self.process_llm.generate([{"role": "user", "content": prompt}])
6465

6566
try:
66-
# Extract JSON response
67-
from memos.mem_scheduler.utils.misc_utils import extract_json_obj
68-
6967
result = extract_json_obj(response)
7068

7169
# Validate response structure
@@ -116,12 +114,12 @@ def _process_enhancement_batch(
116114
)
117115
logger.debug(
118116
f"[Enhance][batch={batch_index}] Prompt (first 200 chars, len={len(prompt)}): "
119-
f"{prompt[:200]}..."
117+
f"{prompt[:200]}]..."
120118
)
121119

122120
response = self.process_llm.generate([{"role": "user", "content": prompt}])
123121
logger.debug(
124-
f"[Enhance][batch={batch_index}] Response (first 200 chars): {response[:200]}..."
122+
f"[Enhance][batch={batch_index}] Response (first 200 chars): {response}..."
125123
)
126124

127125
processed_text_memories = extract_list_items_in_answer(response)

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333

3434
class ScheduleMessageItem(BaseModel, DictConversionMixin):
3535
item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4()))
36-
redis_message_id: str = Field(description="the message get from redis stream", default="")
36+
redis_message_id: str = Field(default="", description="the message get from redis stream")
3737
user_id: str = Field(..., description="user id")
3838
mem_cube_id: str = Field(..., description="memcube id")
39-
session_id: str | None = Field(None, description="Session ID for soft-filtering memories")
39+
session_id: str = Field(default="", description="Session ID for soft-filtering memories")
4040
label: str = Field(..., description="Label of the schedule message")
4141
content: str = Field(..., description="Content of the schedule message")
4242
timestamp: datetime = Field(
4343
default_factory=get_utc_now, description="submit time for schedule_messages"
4444
)
45-
user_name: str | None = Field(
46-
default=None,
45+
user_name: str = Field(
46+
default="",
4747
description="user name / display name (optional)",
4848
)
4949

src/memos/mem_scheduler/utils/misc_utils.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,10 @@ def extract_list_items(text: str, bullet_prefixes: tuple[str, ...] = ("- ",)) ->
119119
if matched:
120120
continue
121121

122-
# Removed loose fallback for "• " to strictly comply with "- " prefix format
123-
124122
if items:
125123
return items
126-
127-
# Fallback: try parsing as a JSON array (e.g., ["item1", "item2", ...])
128-
try:
129-
data = extract_json_obj(normalized)
130-
if isinstance(data, list):
131-
result: list[str] = []
132-
for x in data:
133-
result.append(x if isinstance(x, str) else str(x))
134-
return result
135-
except Exception:
136-
# Swallow and return empty list below
137-
pass
124+
else:
125+
logger.error(f"Fail to parse {text}")
138126

139127
return []
140128

src/memos/templates/mem_scheduler_prompts.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,6 @@
390390
- Focus on whether the memories can fully answer the query without additional information
391391
"""
392392

393-
394393
MEMORY_ENHANCEMENT_PROMPT = """
395394
You are a knowledgeable and precise AI assistant.
396395
@@ -430,7 +429,6 @@
430429
Answer:
431430
"""
432431

433-
434432
PROMPT_MAPPING = {
435433
"intent_recognizing": INTENT_RECOGNIZING_PROMPT,
436434
"memory_reranking": MEMORY_RERANKING_PROMPT,

0 commit comments

Comments
 (0)