Skip to content

Commit b98e779

Browse files
fix: Finalize general_scheduler.py and single_cube.py changes
This commit finalizes the changes to and . In : - The method has been fully refactored to correctly handle Knowledge Base logging, including fetching for UPDATE operations and ensuring proper conditional logging based on environment variables. - The module is now correctly imported. In : - The duplicate keyword argument in the method has been removed, resolving a . These changes address all identified issues and ensure the code is clean, correct, and fully compatible with both Knowledge Base and Playground logging requirements, adhering to the specified design principles.
1 parent 711335d commit b98e779

File tree

2 files changed

+115
-75
lines changed

2 files changed

+115
-75
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 115 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -252,25 +252,7 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
252252
if not batch:
253253
continue
254254

255-
for msg in batch:
256-
try:
257-
userinput_memory_ids = json.loads(msg.content)
258-
except Exception as e:
259-
logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True)
260-
userinput_memory_ids = []
261-
262-
mem_items: list[TextualMemoryItem] = []
263-
for memory_id in userinput_memory_ids:
264-
try:
265-
mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get(
266-
memory_id=memory_id
267-
)
268-
mem_items.append(mem_item)
269-
except Exception:
270-
logger.warning(
271-
f"This MemoryItem {memory_id} has already been deleted."
272-
)
273-
continue
255+
# Process each message in the batch
274256
for msg in batch:
275257
try:
276258
userinput_memory_ids = json.loads(msg.content)
@@ -289,20 +271,26 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
289271
memory_id=memory_id
290272
)
291273
# Check if a memory with the same key already exists (determining if it's an update)
292-
key = getattr(mem_item.metadata, "key", None) or transform_name_to_key(
293-
name=mem_item.memory
294-
)
274+
key = getattr(
275+
mem_item.metadata, "key", None
276+
) or transform_name_to_key(name=mem_item.memory)
295277
exists = False
296278
original_content = None
297279
original_item_id = None
298280

299281
# Only check graph_store if a key exists and the text_mem has a graph_store
300282
if key and hasattr(self.current_mem_cube.text_mem, "graph_store"):
301-
candidates = self.current_mem_cube.text_mem.graph_store.get_by_metadata(
302-
[
303-
{"field": "key", "op": "=", "value": key},
304-
{"field": "memory_type", "op": "=", "value": mem_item.metadata.memory_type},
305-
]
283+
candidates = (
284+
self.current_mem_cube.text_mem.graph_store.get_by_metadata(
285+
[
286+
{"field": "key", "op": "=", "value": key},
287+
{
288+
"field": "memory_type",
289+
"op": "=",
290+
"value": mem_item.metadata.memory_type,
291+
},
292+
]
293+
)
306294
)
307295
if candidates:
308296
exists = True
@@ -315,11 +303,13 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
315303
original_content = original_mem_item.memory
316304

317305
if exists:
318-
prepared_update_items_with_original.append({
319-
"new_item": mem_item,
320-
"original_content": original_content,
321-
"original_item_id": original_item_id,
322-
})
306+
prepared_update_items_with_original.append(
307+
{
308+
"new_item": mem_item,
309+
"original_content": original_content,
310+
"original_item_id": original_item_id,
311+
}
312+
)
323313
else:
324314
prepared_add_items.append(mem_item)
325315

@@ -331,33 +321,48 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
331321

332322
# Conditional Logging: Knowledge Base (Cloud Service) vs. Playground/Default
333323
is_cloud_env = (
334-
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
324+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME")
325+
== "memos-memory-change"
335326
)
336327

337328
if is_cloud_env:
338329
# New: Knowledge Base Logging (Cloud Service)
339330
kb_log_content = []
340331
for item in prepared_add_items:
341-
kb_log_content.append({
342-
"log_source": "KNOWLEDGE_BASE_LOG",
343-
"trigger_source": msg.info.get("trigger_source", "Messages") if msg.info else "Messages", # Assuming msg.info is available and contains trigger_source
344-
"operation": "ADD",
345-
"memory_id": item.id,
346-
"content": item.memory,
347-
"original_content": None,
348-
"source_doc_id": getattr(item.metadata, "source_doc_id", None)
349-
})
332+
kb_log_content.append(
333+
{
334+
"log_source": "KNOWLEDGE_BASE_LOG",
335+
"trigger_source": msg.info.get("trigger_source", "Messages")
336+
if msg.info
337+
else "Messages", # Assuming msg.info is available and contains trigger_source
338+
"operation": "ADD",
339+
"memory_id": item.id,
340+
"content": item.memory,
341+
"original_content": None,
342+
"source_doc_id": getattr(
343+
item.metadata, "source_doc_id", None
344+
),
345+
}
346+
)
350347
for item_data in prepared_update_items_with_original:
351348
new_item = item_data["new_item"]
352-
kb_log_content.append({
353-
"log_source": "KNOWLEDGE_BASE_LOG",
354-
"trigger_source": msg.info.get("trigger_source", "Messages") if msg.info else "Messages",
355-
"operation": "UPDATE",
356-
"memory_id": new_item.id,
357-
"content": new_item.memory,
358-
"original_content": item_data["original_content"], # Now correctly fetched
359-
"source_doc_id": getattr(new_item.metadata, "source_doc_id", None)
360-
})
349+
kb_log_content.append(
350+
{
351+
"log_source": "KNOWLEDGE_BASE_LOG",
352+
"trigger_source": msg.info.get("trigger_source", "Messages")
353+
if msg.info
354+
else "Messages",
355+
"operation": "UPDATE",
356+
"memory_id": new_item.id,
357+
"content": new_item.memory,
358+
"original_content": item_data[
359+
"original_content"
360+
], # Now correctly fetched
361+
"source_doc_id": getattr(
362+
new_item.metadata, "source_doc_id", None
363+
),
364+
}
365+
)
361366

362367
if kb_log_content:
363368
event = self.create_event_log(
@@ -367,7 +372,7 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
367372
mem_cube_id=msg.mem_cube_id,
368373
mem_cube=self.current_mem_cube,
369374
memcube_log_content=kb_log_content,
370-
metadata=None, # Per design doc for KB logs
375+
metadata=None, # Per design doc for KB logs
371376
memory_len=len(kb_log_content),
372377
memcube_name=self._map_memcube_name(msg.mem_cube_id),
373378
)
@@ -383,41 +388,77 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
383388
update_meta_legacy: list[dict] = []
384389

385390
for item in prepared_add_items:
386-
key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory)
387-
add_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item.id})
388-
add_meta_legacy.append({
389-
"ref_id": item.id, "id": item.id, "key": item.metadata.key, "memory": item.memory,
390-
"memory_type": item.metadata.memory_type, "status": item.metadata.status,
391-
"confidence": item.metadata.confidence, "tags": item.metadata.tags,
392-
"updated_at": getattr(item.metadata, "updated_at", None) or getattr(item.metadata, "update_at", None),
393-
})
391+
key = getattr(item.metadata, "key", None) or transform_name_to_key(
392+
name=item.memory
393+
)
394+
add_content_legacy.append(
395+
{"content": f"{key}: {item.memory}", "ref_id": item.id}
396+
)
397+
add_meta_legacy.append(
398+
{
399+
"ref_id": item.id,
400+
"id": item.id,
401+
"key": item.metadata.key,
402+
"memory": item.memory,
403+
"memory_type": item.metadata.memory_type,
404+
"status": item.metadata.status,
405+
"confidence": item.metadata.confidence,
406+
"tags": item.metadata.tags,
407+
"updated_at": getattr(item.metadata, "updated_at", None)
408+
or getattr(item.metadata, "update_at", None),
409+
}
410+
)
394411

395412
for item_data in prepared_update_items_with_original:
396413
item = item_data["new_item"]
397-
key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory)
398-
update_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item.id})
399-
update_meta_legacy.append({
400-
"ref_id": item.id, "id": item.id, "key": item.metadata.key, "memory": item.memory,
401-
"memory_type": item.metadata.memory_type, "status": item.metadata.status,
402-
"confidence": item.metadata.confidence, "tags": item.metadata.tags,
403-
"updated_at": getattr(item.metadata, "updated_at", None) or getattr(item.metadata, "update_at", None),
404-
})
414+
key = getattr(item.metadata, "key", None) or transform_name_to_key(
415+
name=item.memory
416+
)
417+
update_content_legacy.append(
418+
{"content": f"{key}: {item.memory}", "ref_id": item.id}
419+
)
420+
update_meta_legacy.append(
421+
{
422+
"ref_id": item.id,
423+
"id": item.id,
424+
"key": item.metadata.key,
425+
"memory": item.memory,
426+
"memory_type": item.metadata.memory_type,
427+
"status": item.metadata.status,
428+
"confidence": item.metadata.confidence,
429+
"tags": item.metadata.tags,
430+
"updated_at": getattr(item.metadata, "updated_at", None)
431+
or getattr(item.metadata, "update_at", None),
432+
}
433+
)
405434

406435
events = []
407436
if add_content_legacy:
408437
event = self.create_event_log(
409-
label="addMemory", from_memory_type=USER_INPUT_TYPE, to_memory_type=LONG_TERM_MEMORY_TYPE,
410-
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, mem_cube=self.current_mem_cube,
411-
memcube_log_content=add_content_legacy, metadata=add_meta_legacy, memory_len=len(add_content_legacy),
438+
label="addMemory",
439+
from_memory_type=USER_INPUT_TYPE,
440+
to_memory_type=LONG_TERM_MEMORY_TYPE,
441+
user_id=msg.user_id,
442+
mem_cube_id=msg.mem_cube_id,
443+
mem_cube=self.current_mem_cube,
444+
memcube_log_content=add_content_legacy,
445+
metadata=add_meta_legacy,
446+
memory_len=len(add_content_legacy),
412447
memcube_name=self._map_memcube_name(msg.mem_cube_id),
413448
)
414449
event.task_id = msg.task_id
415450
events.append(event)
416451
if update_content_legacy:
417452
event = self.create_event_log(
418-
label="updateMemory", from_memory_type=LONG_TERM_MEMORY_TYPE, to_memory_type=LONG_TERM_MEMORY_TYPE,
419-
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, mem_cube=self.current_mem_cube,
420-
memcube_log_content=update_content_legacy, metadata=update_meta_legacy, memory_len=len(update_content_legacy),
453+
label="updateMemory",
454+
from_memory_type=LONG_TERM_MEMORY_TYPE,
455+
to_memory_type=LONG_TERM_MEMORY_TYPE,
456+
user_id=msg.user_id,
457+
mem_cube_id=msg.mem_cube_id,
458+
mem_cube=self.current_mem_cube,
459+
memcube_log_content=update_content_legacy,
460+
metadata=update_meta_legacy,
461+
memory_len=len(update_content_legacy),
421462
memcube_name=self._map_memcube_name(msg.mem_cube_id),
422463
)
423464
event.task_id = msg.task_id

src/memos/multi_mem_cube/single_cube.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,6 @@ def _process_pref_mem(
495495
label=PREF_ADD_LABEL,
496496
content=json.dumps(messages_list),
497497
timestamp=datetime.utcnow(),
498-
timestamp=datetime.utcnow(),
499498
info=add_req.info,
500499
user_name=self.cube_id,
501500
)

0 commit comments

Comments
 (0)