Skip to content

Commit 35d1547

Browse files
authored
Merge branch 'dev' into feat/knowledge_db_api_param_entry
2 parents 27ee05f + c63555f commit 35d1547

File tree

5 files changed

+69
-33
lines changed

5 files changed

+69
-33
lines changed

src/memos/configs/mem_scheduler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,13 @@ class RabbitMQConfig(
178178
ge=1, # Port must be >= 1
179179
le=65535, # Port must be <= 65535
180180
)
181+
exchange_name: str = Field(
182+
default="memos-fanout",
183+
description="Exchange name for RabbitMQ (e.g., memos-fanout, memos-memory-change)",
184+
)
185+
exchange_type: str = Field(
186+
default="fanout", description="Exchange type for RabbitMQ (fanout or direct)"
187+
)
181188

182189

183190
class GraphDBAuthConfig(BaseConfig, DictConversionMixin, EnvConfigMixin):

src/memos/log.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def close(self):
188188
},
189189
"handlers": {
190190
"console": {
191-
"level": selected_log_level,
191+
"level": "DEBUG",
192192
"class": "logging.StreamHandler",
193193
"stream": stdout,
194194
"formatter": "no_datetime",

src/memos/mem_scheduler/general_modules/scheduler_logger.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -181,19 +181,21 @@ def log_working_memory_replacement(
181181
or getattr(itm.metadata, "update_at", None),
182182
}
183183
)
184-
ev = self.create_event_log(
185-
label="scheduleMemory",
186-
from_memory_type=TEXT_MEMORY_TYPE,
187-
to_memory_type=WORKING_MEMORY_TYPE,
188-
user_id=user_id,
189-
mem_cube_id=mem_cube_id,
190-
mem_cube=mem_cube,
191-
memcube_log_content=memcube_content,
192-
metadata=meta,
193-
memory_len=len(memcube_content),
194-
memcube_name=self._map_memcube_name(mem_cube_id),
195-
)
196-
log_func_callback([ev])
184+
# Only create log if there are actual memory changes
185+
if memcube_content:
186+
ev = self.create_event_log(
187+
label="scheduleMemory",
188+
from_memory_type=TEXT_MEMORY_TYPE,
189+
to_memory_type=WORKING_MEMORY_TYPE,
190+
user_id=user_id,
191+
mem_cube_id=mem_cube_id,
192+
mem_cube=mem_cube,
193+
memcube_log_content=memcube_content,
194+
metadata=meta,
195+
memory_len=len(memcube_content),
196+
memcube_name=self._map_memcube_name(mem_cube_id),
197+
)
198+
log_func_callback([ev])
197199

198200
@log_exceptions(logger=logger)
199201
def log_activation_memory_update(
@@ -235,19 +237,21 @@ def log_activation_memory_update(
235237
"updated_at": None,
236238
}
237239
)
238-
ev = self.create_event_log(
239-
label="scheduleMemory",
240-
from_memory_type=ACTIVATION_MEMORY_TYPE,
241-
to_memory_type=PARAMETER_MEMORY_TYPE,
242-
user_id=user_id,
243-
mem_cube_id=mem_cube_id,
244-
mem_cube=mem_cube,
245-
memcube_log_content=memcube_content,
246-
metadata=meta,
247-
memory_len=len(added_memories),
248-
memcube_name=self._map_memcube_name(mem_cube_id),
249-
)
250-
log_func_callback([ev])
240+
# Only create log if there are actual memory changes
241+
if memcube_content:
242+
ev = self.create_event_log(
243+
label="scheduleMemory",
244+
from_memory_type=ACTIVATION_MEMORY_TYPE,
245+
to_memory_type=PARAMETER_MEMORY_TYPE,
246+
user_id=user_id,
247+
mem_cube_id=mem_cube_id,
248+
mem_cube=mem_cube,
249+
memcube_log_content=memcube_content,
250+
metadata=meta,
251+
memory_len=len(added_memories),
252+
memcube_name=self._map_memcube_name(mem_cube_id),
253+
)
254+
log_func_callback([ev])
251255

252256
@log_exceptions(logger=logger)
253257
def log_adding_memory(

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
is_all_english,
3131
transform_name_to_key,
3232
)
33+
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
3334
from memos.memories.textual.item import TextualMemoryItem
3435
from memos.memories.textual.preference import PreferenceTextMemory
3536
from memos.memories.textual.tree import TreeTextMemory
@@ -157,7 +158,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
157158
"""
158159
logger.info(f"Messages {messages} assigned to {QUERY_LABEL} handler.")
159160

160-
grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages)
161+
grouped_messages = group_messages_by_user_and_mem_cube(messages=messages)
161162

162163
self.validate_schedule_messages(messages=messages, label=QUERY_LABEL)
163164

@@ -201,7 +202,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
201202
messages: List of answer messages to process
202203
"""
203204
logger.info(f"Messages {messages} assigned to {ANSWER_LABEL} handler.")
204-
grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages)
205+
grouped_messages = group_messages_by_user_and_mem_cube(messages=messages)
205206

206207
self.validate_schedule_messages(messages=messages, label=ANSWER_LABEL)
207208

@@ -237,7 +238,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
237238
def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
238239
logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.")
239240
# Process the query in a session turn
240-
grouped_messages = self.dispatcher._group_messages_by_user_and_mem_cube(messages=messages)
241+
grouped_messages = group_messages_by_user_and_mem_cube(messages=messages)
241242

242243
self.validate_schedule_messages(messages=messages, label=ADD_LABEL)
243244
try:
@@ -758,8 +759,17 @@ def process_message(message: ScheduleMessageItem):
758759

759760
# Get the preference memory from the mem_cube
760761
pref_mem = mem_cube.pref_mem
762+
if pref_mem is None:
763+
logger.warning(
764+
f"Preference memory not initialized for mem_cube_id={mem_cube_id}, "
765+
f"skipping pref_add processing"
766+
)
767+
return
761768
if not isinstance(pref_mem, PreferenceTextMemory):
762-
logger.error(f"Expected PreferenceTextMemory but got {type(pref_mem).__name__}")
769+
logger.error(
770+
f"Expected PreferenceTextMemory but got {type(pref_mem).__name__} "
771+
f"for mem_cube_id={mem_cube_id}"
772+
)
763773
return
764774

765775
# Use pref_mem.get_memory to process the memories

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ def __init__(self):
3232
# RabbitMQ settings
3333
self.rabbitmq_config: RabbitMQConfig | None = None
3434
self.rabbit_queue_name = "memos-scheduler"
35-
self.rabbitmq_exchange_name = "memos-fanout"
36-
self.rabbitmq_exchange_type = FANOUT_EXCHANGE_TYPE
35+
self.rabbitmq_exchange_name = "memos-fanout" # Default, will be overridden by config
36+
self.rabbitmq_exchange_type = FANOUT_EXCHANGE_TYPE # Default, will be overridden by config
3737
self.rabbitmq_connection = None
3838
self.rabbitmq_channel = None
3939

@@ -87,6 +87,21 @@ def initialize_rabbitmq(
8787
else:
8888
logger.error("Not implemented")
8989

90+
# Load exchange configuration from config
91+
if self.rabbitmq_config:
92+
if (
93+
hasattr(self.rabbitmq_config, "exchange_name")
94+
and self.rabbitmq_config.exchange_name
95+
):
96+
self.rabbitmq_exchange_name = self.rabbitmq_config.exchange_name
97+
logger.info(f"Using configured exchange name: {self.rabbitmq_exchange_name}")
98+
if (
99+
hasattr(self.rabbitmq_config, "exchange_type")
100+
and self.rabbitmq_config.exchange_type
101+
):
102+
self.rabbitmq_exchange_type = self.rabbitmq_config.exchange_type
103+
logger.info(f"Using configured exchange type: {self.rabbitmq_exchange_type}")
104+
90105
# Start connection process
91106
parameters = self.get_rabbitmq_connection_param()
92107
self.rabbitmq_connection = SelectConnection(

0 commit comments

Comments
 (0)