Skip to content

Commit 1a6ef9b

Browse files
authored
Feature/remove web log queue v2 (#741)
* feat: propagate item_id in scheduler and dispatcher logs * remove web log queue put * chore: add diagnostic logs for publish * chore: log submitted web log at info * chore: rename log_id to item_id in debug info * test: avoid web log queue dependency --------- Co-authored-by: [email protected] <>
1 parent 58fd663 commit 1a6ef9b

File tree

7 files changed

+61
-49
lines changed

7 files changed

+61
-49
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -853,19 +853,21 @@ def _submit_web_logs(
853853
return
854854

855855
for message in messages:
856-
try:
857-
self._web_log_message_queue.put(message)
858-
except Exception as e:
859-
logger.warning(f"Failed to put message to web log queue: {e}", stack_info=True)
860-
861856
message_info = message.debug_info()
862-
logger.debug(f"Submitted Scheduling log for web: {message_info}")
857+
logger.info(f"[DIAGNOSTIC] base_scheduler._submit_web_logs: submitted {message_info}")
863858

864859
# Always call publish; the publisher now caches when offline and flushes after reconnect
865860
logger.info(
866861
f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}"
867862
)
868863
self.rabbitmq_publish_message(message=message.to_dict())
864+
logger.info(
865+
"[DIAGNOSTIC] base_scheduler._submit_web_logs: publish dispatched "
866+
"item_id=%s task_id=%s label=%s",
867+
message.item_id,
868+
message.task_id,
869+
message.label,
870+
)
869871
logger.debug(
870872
f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}"
871873
)

src/memos/mem_scheduler/general_modules/scheduler_logger.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def create_autofilled_log_item(
4949
user_id: str,
5050
mem_cube_id: str,
5151
mem_cube: GeneralMemCube,
52+
item_id: str | None = None,
5253
) -> ScheduleLogForWebItem:
5354
if mem_cube is None:
5455
logger.error(
@@ -94,16 +95,19 @@ def create_autofilled_log_item(
9495
)
9596
memory_capacities["parameter_memory_capacity"] = 1
9697

97-
log_message = ScheduleLogForWebItem(
98-
user_id=user_id,
99-
mem_cube_id=mem_cube_id,
100-
label=label,
101-
from_memory_type=from_memory_type,
102-
to_memory_type=to_memory_type,
103-
log_content=log_content,
104-
current_memory_sizes=current_memory_sizes,
105-
memory_capacities=memory_capacities,
106-
)
98+
log_kwargs = {
99+
"user_id": user_id,
100+
"mem_cube_id": mem_cube_id,
101+
"label": label,
102+
"from_memory_type": from_memory_type,
103+
"to_memory_type": to_memory_type,
104+
"log_content": log_content,
105+
"current_memory_sizes": current_memory_sizes,
106+
"memory_capacities": memory_capacities,
107+
}
108+
if item_id:
109+
log_kwargs["item_id"] = item_id
110+
log_message = ScheduleLogForWebItem(**log_kwargs)
107111
return log_message
108112

109113
@log_exceptions(logger=logger)
@@ -120,6 +124,7 @@ def create_event_log(
120124
memory_len: int,
121125
memcube_name: str | None = None,
122126
log_content: str | None = None,
127+
item_id: str | None = None,
123128
) -> ScheduleLogForWebItem:
124129
item = self.create_autofilled_log_item(
125130
log_content=log_content or "",
@@ -129,6 +134,7 @@ def create_event_log(
129134
user_id=user_id,
130135
mem_cube_id=mem_cube_id,
131136
mem_cube=mem_cube,
137+
item_id=item_id,
132138
)
133139
item.memcube_log_content = memcube_log_content
134140
item.metadata = metadata

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
266266
metadata=[],
267267
memory_len=1,
268268
memcube_name=self._map_memcube_name(msg.mem_cube_id),
269+
item_id=msg.item_id,
269270
)
270271
event.task_id = msg.task_id
271272
self._submit_web_logs([event])
@@ -322,6 +323,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
322323
metadata=[],
323324
memory_len=1,
324325
memcube_name=self._map_memcube_name(msg.mem_cube_id),
326+
item_id=msg.item_id,
325327
)
326328
event.task_id = msg.task_id
327329
self._submit_web_logs([event])
@@ -492,6 +494,7 @@ def send_add_log_messages_to_local_env(
492494
metadata=add_meta_legacy,
493495
memory_len=len(add_content_legacy),
494496
memcube_name=self._map_memcube_name(msg.mem_cube_id),
497+
item_id=msg.item_id,
495498
)
496499
event.task_id = msg.task_id
497500
events.append(event)
@@ -507,6 +510,7 @@ def send_add_log_messages_to_local_env(
507510
metadata=update_meta_legacy,
508511
memory_len=len(update_content_legacy),
509512
memcube_name=self._map_memcube_name(msg.mem_cube_id),
513+
item_id=msg.item_id,
510514
)
511515
event.task_id = msg.task_id
512516
events.append(event)
@@ -573,6 +577,7 @@ def send_add_log_messages_to_cloud_env(
573577
metadata=None,
574578
memory_len=len(kb_log_content),
575579
memcube_name=self._map_memcube_name(msg.mem_cube_id),
580+
item_id=msg.item_id,
576581
)
577582
event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
578583
event.task_id = msg.task_id
@@ -719,6 +724,7 @@ def _extract_fields(mem_item):
719724
metadata=None,
720725
memory_len=len(kb_log_content),
721726
memcube_name=self._map_memcube_name(mem_cube_id),
727+
item_id=message.item_id,
722728
)
723729
event.log_content = (
724730
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
@@ -788,6 +794,7 @@ def process_message(message: ScheduleMessageItem):
788794
user_name=user_name,
789795
custom_tags=info.get("custom_tags", None),
790796
task_id=message.task_id,
797+
item_id=message.item_id,
791798
info=info,
792799
)
793800

@@ -815,6 +822,7 @@ def _process_memories_with_reader(
815822
user_name: str,
816823
custom_tags: list[str] | None = None,
817824
task_id: str | None = None,
825+
item_id: str | None = None,
818826
info: dict | None = None,
819827
) -> None:
820828
logger.info(
@@ -934,6 +942,7 @@ def _process_memories_with_reader(
934942
metadata=None,
935943
memory_len=len(kb_log_content),
936944
memcube_name=self._map_memcube_name(mem_cube_id),
945+
item_id=item_id,
937946
)
938947
event.log_content = (
939948
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
@@ -979,6 +988,7 @@ def _process_memories_with_reader(
979988
metadata=add_meta_legacy,
980989
memory_len=len(add_content_legacy),
981990
memcube_name=self._map_memcube_name(mem_cube_id),
991+
item_id=item_id,
982992
)
983993
event.task_id = task_id
984994
self._submit_web_logs([event])
@@ -1045,6 +1055,7 @@ def _process_memories_with_reader(
10451055
metadata=None,
10461056
memory_len=len(kb_log_content),
10471057
memcube_name=self._map_memcube_name(mem_cube_id),
1058+
item_id=item_id,
10481059
)
10491060
event.log_content = f"Knowledge Base Memory Update failed: {exc!s}"
10501061
event.task_id = task_id
@@ -1212,6 +1223,7 @@ def process_message(message: ScheduleMessageItem):
12121223
metadata=meta,
12131224
memory_len=len(keys),
12141225
memcube_name=self._map_memcube_name(mem_cube_id),
1226+
item_id=message.item_id,
12151227
)
12161228
self._submit_web_logs([event])
12171229

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def debug_info(self) -> dict[str, Any]:
163163
"""Return structured debug information for logging purposes."""
164164
return {
165165
"content_preview:": self.log_content[:50],
166-
"log_id": self.item_id,
166+
"item_id": self.item_id,
167167
"user_id": self.user_id,
168168
"mem_cube_id": self.mem_cube_id,
169169
"operation": f"{self.from_memory_type}{self.to_memory_type}",

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ def _maybe_emit_task_completion(
329329
# messages in one batch can belong to different business task_ids; check each
330330
task_ids = set()
331331
task_id_to_doc_id = {}
332+
task_id_to_item_id = {}
332333

333334
for msg in messages:
334335
tid = getattr(msg, "task_id", None)
@@ -340,6 +341,8 @@ def _maybe_emit_task_completion(
340341
sid = info.get("source_doc_id")
341342
if sid:
342343
task_id_to_doc_id[tid] = sid
344+
if tid not in task_id_to_item_id:
345+
task_id_to_item_id[tid] = msg.item_id
343346

344347
if not task_ids:
345348
return
@@ -356,6 +359,7 @@ def _maybe_emit_task_completion(
356359

357360
for task_id in task_ids:
358361
source_doc_id = task_id_to_doc_id.get(task_id)
362+
event_item_id = task_id_to_item_id.get(task_id)
359363
status_data = self.status_tracker.get_task_status_by_business_id(
360364
business_task_id=task_id, user_id=user_id
361365
)
@@ -369,6 +373,7 @@ def _maybe_emit_task_completion(
369373
# (Although if status is 'completed', local error shouldn't happen theoretically,
370374
# unless status update lags or is inconsistent. We trust status_tracker here.)
371375
event = ScheduleLogForWebItem(
376+
item_id=event_item_id,
372377
task_id=task_id,
373378
user_id=user_id,
374379
mem_cube_id=mem_cube_id,
@@ -393,6 +398,7 @@ def _maybe_emit_task_completion(
393398
error_msg = "Unknown error (check system logs)"
394399

395400
event = ScheduleLogForWebItem(
401+
item_id=event_item_id,
396402
task_id=task_id,
397403
user_id=user_id,
398404
mem_cube_id=mem_cube_id,

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,15 @@ def rabbitmq_publish_message(self, message: dict):
368368
logger.debug(f"Published message: {message}")
369369
return True
370370
except Exception as e:
371+
logger.error(
372+
"[DIAGNOSTIC] RabbitMQ publish error. label=%s item_id=%s exchange=%s "
373+
"routing_key=%s error=%s",
374+
label,
375+
message.get("item_id"),
376+
exchange_name,
377+
routing_key,
378+
e,
379+
)
371380
logger.error(f"Failed to publish message: {e}")
372381
# Cache message for retry on next connection
373382
self.rabbitmq_publish_cache.put(message)

tests/mem_scheduler/test_scheduler.py

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -139,44 +139,21 @@ def test_submit_web_logs(self):
139139
},
140140
)
141141

142-
# Empty the queue by consuming all elements
143-
while not self.scheduler._web_log_message_queue.empty():
144-
self.scheduler._web_log_message_queue.get()
142+
self.scheduler.rabbitmq_config = MagicMock()
143+
self.scheduler.rabbitmq_publish_message = MagicMock()
145144

146145
# Submit the log message
147146
self.scheduler._submit_web_logs(messages=log_message)
148147

149-
# Verify the message was added to the queue
150-
self.assertEqual(self.scheduler._web_log_message_queue.qsize(), 1)
151-
152-
# Get the actual message from the queue
153-
actual_message = self.scheduler._web_log_message_queue.get()
154-
155-
# Verify core fields
156-
self.assertEqual(actual_message.user_id, "test_user")
157-
self.assertEqual(actual_message.mem_cube_id, "test_cube")
158-
self.assertEqual(actual_message.label, QUERY_TASK_LABEL)
159-
self.assertEqual(actual_message.from_memory_type, "WorkingMemory")
160-
self.assertEqual(actual_message.to_memory_type, "LongTermMemory")
161-
self.assertEqual(actual_message.log_content, "Test Content")
162-
163-
# Verify memory sizes
164-
self.assertEqual(actual_message.current_memory_sizes["long_term_memory_size"], 0)
165-
self.assertEqual(actual_message.current_memory_sizes["user_memory_size"], 0)
166-
self.assertEqual(actual_message.current_memory_sizes["working_memory_size"], 0)
167-
self.assertEqual(actual_message.current_memory_sizes["transformed_act_memory_size"], 0)
168-
169-
# Verify memory capacities
170-
self.assertEqual(actual_message.memory_capacities["long_term_memory_capacity"], 1000)
171-
self.assertEqual(actual_message.memory_capacities["user_memory_capacity"], 500)
172-
self.assertEqual(actual_message.memory_capacities["working_memory_capacity"], 100)
173-
self.assertEqual(actual_message.memory_capacities["transformed_act_memory_capacity"], 0)
148+
self.scheduler.rabbitmq_publish_message.assert_called_once_with(
149+
message=log_message.to_dict()
150+
)
174151

175152
# Verify auto-generated fields exist
176-
self.assertTrue(hasattr(actual_message, "item_id"))
177-
self.assertTrue(isinstance(actual_message.item_id, str))
178-
self.assertTrue(hasattr(actual_message, "timestamp"))
179-
self.assertTrue(isinstance(actual_message.timestamp, datetime))
153+
self.assertTrue(hasattr(log_message, "item_id"))
154+
self.assertTrue(isinstance(log_message.item_id, str))
155+
self.assertTrue(hasattr(log_message, "timestamp"))
156+
self.assertTrue(isinstance(log_message.timestamp, datetime))
180157

181158
def test_activation_memory_update(self):
182159
"""Test activation memory update functionality with DynamicCache handling."""

0 commit comments

Comments
 (0)