Skip to content

Commit 7e4bb59

Browse files
authored
Merge branch 'dev' into feature/remove-web-log-queue-v2
2 parents 3d7b3cf + 1a6ef9b commit 7e4bb59

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

src/memos/mem_scheduler/general_modules/scheduler_logger.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def create_autofilled_log_item(
4949
user_id: str,
5050
mem_cube_id: str,
5151
mem_cube: GeneralMemCube,
52+
item_id: str | None = None,
5253
) -> ScheduleLogForWebItem:
5354
if mem_cube is None:
5455
logger.error(
@@ -94,16 +95,19 @@ def create_autofilled_log_item(
9495
)
9596
memory_capacities["parameter_memory_capacity"] = 1
9697

97-
log_message = ScheduleLogForWebItem(
98-
user_id=user_id,
99-
mem_cube_id=mem_cube_id,
100-
label=label,
101-
from_memory_type=from_memory_type,
102-
to_memory_type=to_memory_type,
103-
log_content=log_content,
104-
current_memory_sizes=current_memory_sizes,
105-
memory_capacities=memory_capacities,
106-
)
98+
log_kwargs = {
99+
"user_id": user_id,
100+
"mem_cube_id": mem_cube_id,
101+
"label": label,
102+
"from_memory_type": from_memory_type,
103+
"to_memory_type": to_memory_type,
104+
"log_content": log_content,
105+
"current_memory_sizes": current_memory_sizes,
106+
"memory_capacities": memory_capacities,
107+
}
108+
if item_id:
109+
log_kwargs["item_id"] = item_id
110+
log_message = ScheduleLogForWebItem(**log_kwargs)
107111
return log_message
108112

109113
@log_exceptions(logger=logger)
@@ -120,6 +124,7 @@ def create_event_log(
120124
memory_len: int,
121125
memcube_name: str | None = None,
122126
log_content: str | None = None,
127+
item_id: str | None = None,
123128
) -> ScheduleLogForWebItem:
124129
item = self.create_autofilled_log_item(
125130
log_content=log_content or "",
@@ -129,6 +134,7 @@ def create_event_log(
129134
user_id=user_id,
130135
mem_cube_id=mem_cube_id,
131136
mem_cube=mem_cube,
137+
item_id=item_id,
132138
)
133139
item.memcube_log_content = memcube_log_content
134140
item.metadata = metadata

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
266266
metadata=[],
267267
memory_len=1,
268268
memcube_name=self._map_memcube_name(msg.mem_cube_id),
269+
item_id=msg.item_id,
269270
)
270271
event.task_id = msg.task_id
271272
self._submit_web_logs([event])
@@ -322,6 +323,7 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
322323
metadata=[],
323324
memory_len=1,
324325
memcube_name=self._map_memcube_name(msg.mem_cube_id),
326+
item_id=msg.item_id,
325327
)
326328
event.task_id = msg.task_id
327329
self._submit_web_logs([event])
@@ -492,6 +494,7 @@ def send_add_log_messages_to_local_env(
492494
metadata=add_meta_legacy,
493495
memory_len=len(add_content_legacy),
494496
memcube_name=self._map_memcube_name(msg.mem_cube_id),
497+
item_id=msg.item_id,
495498
)
496499
event.task_id = msg.task_id
497500
events.append(event)
@@ -507,6 +510,7 @@ def send_add_log_messages_to_local_env(
507510
metadata=update_meta_legacy,
508511
memory_len=len(update_content_legacy),
509512
memcube_name=self._map_memcube_name(msg.mem_cube_id),
513+
item_id=msg.item_id,
510514
)
511515
event.task_id = msg.task_id
512516
events.append(event)
@@ -573,6 +577,7 @@ def send_add_log_messages_to_cloud_env(
573577
metadata=None,
574578
memory_len=len(kb_log_content),
575579
memcube_name=self._map_memcube_name(msg.mem_cube_id),
580+
item_id=msg.item_id,
576581
)
577582
event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
578583
event.task_id = msg.task_id
@@ -719,6 +724,7 @@ def _extract_fields(mem_item):
719724
metadata=None,
720725
memory_len=len(kb_log_content),
721726
memcube_name=self._map_memcube_name(mem_cube_id),
727+
item_id=message.item_id,
722728
)
723729
event.log_content = (
724730
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
@@ -788,6 +794,7 @@ def process_message(message: ScheduleMessageItem):
788794
user_name=user_name,
789795
custom_tags=info.get("custom_tags", None),
790796
task_id=message.task_id,
797+
item_id=message.item_id,
791798
info=info,
792799
)
793800

@@ -815,6 +822,7 @@ def _process_memories_with_reader(
815822
user_name: str,
816823
custom_tags: list[str] | None = None,
817824
task_id: str | None = None,
825+
item_id: str | None = None,
818826
info: dict | None = None,
819827
) -> None:
820828
logger.info(
@@ -934,6 +942,7 @@ def _process_memories_with_reader(
934942
metadata=None,
935943
memory_len=len(kb_log_content),
936944
memcube_name=self._map_memcube_name(mem_cube_id),
945+
item_id=item_id,
937946
)
938947
event.log_content = (
939948
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
@@ -979,6 +988,7 @@ def _process_memories_with_reader(
979988
metadata=add_meta_legacy,
980989
memory_len=len(add_content_legacy),
981990
memcube_name=self._map_memcube_name(mem_cube_id),
991+
item_id=item_id,
982992
)
983993
event.task_id = task_id
984994
self._submit_web_logs([event])
@@ -1045,6 +1055,7 @@ def _process_memories_with_reader(
10451055
metadata=None,
10461056
memory_len=len(kb_log_content),
10471057
memcube_name=self._map_memcube_name(mem_cube_id),
1058+
item_id=item_id,
10481059
)
10491060
event.log_content = f"Knowledge Base Memory Update failed: {exc!s}"
10501061
event.task_id = task_id
@@ -1212,6 +1223,7 @@ def process_message(message: ScheduleMessageItem):
12121223
metadata=meta,
12131224
memory_len=len(keys),
12141225
memcube_name=self._map_memcube_name(mem_cube_id),
1226+
item_id=message.item_id,
12151227
)
12161228
self._submit_web_logs([event])
12171229

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ def _maybe_emit_task_completion(
329329
# messages in one batch can belong to different business task_ids; check each
330330
task_ids = set()
331331
task_id_to_doc_id = {}
332+
task_id_to_item_id = {}
332333

333334
for msg in messages:
334335
tid = getattr(msg, "task_id", None)
@@ -340,6 +341,8 @@ def _maybe_emit_task_completion(
340341
sid = info.get("source_doc_id")
341342
if sid:
342343
task_id_to_doc_id[tid] = sid
344+
if tid not in task_id_to_item_id:
345+
task_id_to_item_id[tid] = msg.item_id
343346

344347
if not task_ids:
345348
return
@@ -356,6 +359,7 @@ def _maybe_emit_task_completion(
356359

357360
for task_id in task_ids:
358361
source_doc_id = task_id_to_doc_id.get(task_id)
362+
event_item_id = task_id_to_item_id.get(task_id)
359363
status_data = self.status_tracker.get_task_status_by_business_id(
360364
business_task_id=task_id, user_id=user_id
361365
)
@@ -369,6 +373,7 @@ def _maybe_emit_task_completion(
369373
# (Although if status is 'completed', local error shouldn't happen theoretically,
370374
# unless status update lags or is inconsistent. We trust status_tracker here.)
371375
event = ScheduleLogForWebItem(
376+
item_id=event_item_id,
372377
task_id=task_id,
373378
user_id=user_id,
374379
mem_cube_id=mem_cube_id,
@@ -393,6 +398,7 @@ def _maybe_emit_task_completion(
393398
error_msg = "Unknown error (check system logs)"
394399

395400
event = ScheduleLogForWebItem(
401+
item_id=event_item_id,
396402
task_id=task_id,
397403
user_id=user_id,
398404
mem_cube_id=mem_cube_id,

0 commit comments

Comments
 (0)