Skip to content

Commit bad58a9

Browse files
feat: Implement Knowledge Base logging format in GeneralScheduler
This commit implements the new logging format for the 'Knowledge Base' scenario within the function of the . Key changes: - Introduced a conditional logging path based on the environment variable, distinguishing between Knowledge Base logging and existing Playground/Default logging. - Refactored the memory processing loop to correctly fetch the for operations by querying the graph store for the existing memory item's content. - Ensured that and operations for the Knowledge Base scenario produce a single, structured with adhering to the new design document, including the and the correctly populated for updates. - Maintained backward compatibility for existing Playground/Default logging paths by reconstructing their expected data structures.
1 parent b6b8694 commit bad58a9

File tree

1 file changed

+147
-78
lines changed

1 file changed

+147
-78
lines changed

src/memos/mem_scheduler/general_scheduler.py

Lines changed: 147 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -271,90 +271,159 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
271271
f"This MemoryItem {memory_id} has already been deleted."
272272
)
273273
continue
274-
add_content: list[dict] = []
275-
add_meta: list[dict] = []
276-
update_content: list[dict] = []
277-
update_meta: list[dict] = []
278-
for mem_item in mem_items:
279-
if mem_item.metadata.memory_type == WORKING_MEMORY_TYPE:
280-
continue
281-
key = getattr(mem_item.metadata, "key", None) or transform_name_to_key(
282-
name=mem_item.memory
283-
)
284-
exists = False
274+
for msg in batch:
275+
try:
276+
userinput_memory_ids = json.loads(msg.content)
277+
except Exception as e:
278+
logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True)
279+
userinput_memory_ids = []
280+
281+
# Prepare data for both logging paths, fetching original content for updates
282+
prepared_add_items = []
283+
prepared_update_items_with_original = []
284+
285+
for memory_id in userinput_memory_ids:
285286
try:
286-
text_mem = self.current_mem_cube.text_mem
287-
if key and hasattr(text_mem, "graph_store"):
288-
candidates = text_mem.graph_store.get_by_metadata(
287+
# This mem_item represents the NEW content that was just added/processed
288+
mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get(
289+
memory_id=memory_id
290+
)
291+
# Check if a memory with the same key already exists (determining if it's an update)
292+
key = getattr(mem_item.metadata, "key", None) or transform_name_to_key(
293+
name=mem_item.memory
294+
)
295+
exists = False
296+
original_content = None
297+
original_item_id = None
298+
299+
# Only check graph_store if a key exists and the text_mem has a graph_store
300+
if key and hasattr(self.current_mem_cube.text_mem, "graph_store"):
301+
candidates = self.current_mem_cube.text_mem.graph_store.get_by_metadata(
289302
[
290-
{"field": "memory", "op": "=", "value": key},
291-
{
292-
"field": "memory_type",
293-
"op": "=",
294-
"value": mem_item.metadata.memory_type,
295-
},
303+
{"field": "key", "op": "=", "value": key},
304+
{"field": "memory_type", "op": "=", "value": mem_item.metadata.memory_type},
296305
]
297306
)
298-
exists = bool(candidates)
307+
if candidates:
308+
exists = True
309+
original_item_id = candidates[0]
310+
# Crucial step: Fetch the original content for updates
311+
# This `get` is for the *existing* memory that will be updated
312+
original_mem_item = self.current_mem_cube.text_mem.get(
313+
memory_id=original_item_id
314+
)
315+
original_content = original_mem_item.memory
316+
317+
if exists:
318+
prepared_update_items_with_original.append({
319+
"new_item": mem_item,
320+
"original_content": original_content,
321+
"original_item_id": original_item_id,
322+
})
323+
else:
324+
prepared_add_items.append(mem_item)
325+
299326
except Exception:
300-
exists = False
327+
logger.warning(
328+
f"This MemoryItem {memory_id} has already been deleted or an error occurred during preparation."
329+
)
330+
continue
301331

302-
payload = {
303-
"content": f"{key}: {mem_item.memory}",
304-
"ref_id": mem_item.id,
305-
}
306-
meta_dict = {
307-
"ref_id": mem_item.id,
308-
"id": mem_item.id,
309-
"key": mem_item.metadata.key,
310-
"memory": mem_item.memory,
311-
"memory_type": mem_item.metadata.memory_type,
312-
"status": mem_item.metadata.status,
313-
"confidence": mem_item.metadata.confidence,
314-
"tags": mem_item.metadata.tags,
315-
"updated_at": getattr(mem_item.metadata, "updated_at", None)
316-
or getattr(mem_item.metadata, "update_at", None),
317-
}
318-
if exists:
319-
update_content.append(payload)
320-
update_meta.append(meta_dict)
321-
else:
322-
add_content.append(payload)
323-
add_meta.append(meta_dict)
324-
325-
events = []
326-
if add_content:
327-
event = self.create_event_log(
328-
label="addMemory",
329-
from_memory_type=USER_INPUT_TYPE,
330-
to_memory_type=LONG_TERM_MEMORY_TYPE,
331-
user_id=msg.user_id,
332-
mem_cube_id=msg.mem_cube_id,
333-
mem_cube=self.current_mem_cube,
334-
memcube_log_content=add_content,
335-
metadata=add_meta,
336-
memory_len=len(add_content),
337-
memcube_name=self._map_memcube_name(msg.mem_cube_id),
338-
)
339-
event.task_id = msg.task_id
340-
events.append(event)
341-
if update_content:
342-
event = self.create_event_log(
343-
label="updateMemory",
344-
from_memory_type=LONG_TERM_MEMORY_TYPE,
345-
to_memory_type=LONG_TERM_MEMORY_TYPE,
346-
user_id=msg.user_id,
347-
mem_cube_id=msg.mem_cube_id,
348-
mem_cube=self.current_mem_cube,
349-
memcube_log_content=update_content,
350-
metadata=update_meta,
351-
memory_len=len(update_content),
352-
memcube_name=self._map_memcube_name(msg.mem_cube_id),
353-
)
354-
event.task_id = msg.task_id
355-
events.append(event)
356-
if events:
357-
self._submit_web_logs(events)
332+
# Conditional Logging: Knowledge Base (Cloud Service) vs. Playground/Default
333+
is_cloud_env = (
334+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
335+
)
336+
337+
if is_cloud_env:
338+
# New: Knowledge Base Logging (Cloud Service)
339+
kb_log_content = []
340+
for item in prepared_add_items:
341+
kb_log_content.append({
342+
"log_source": "KNOWLEDGE_BASE_LOG",
343+
"trigger_source": msg.info.get("trigger_source", "Messages") if msg.info else "Messages", # Assuming msg.info is available and contains trigger_source
344+
"operation": "ADD",
345+
"memory_id": item.id,
346+
"content": item.memory,
347+
"original_content": None,
348+
"source_doc_id": getattr(item.metadata, "source_doc_id", None)
349+
})
350+
for item_data in prepared_update_items_with_original:
351+
new_item = item_data["new_item"]
352+
kb_log_content.append({
353+
"log_source": "KNOWLEDGE_BASE_LOG",
354+
"trigger_source": msg.info.get("trigger_source", "Messages") if msg.info else "Messages",
355+
"operation": "UPDATE",
356+
"memory_id": new_item.id,
357+
"content": new_item.memory,
358+
"original_content": item_data["original_content"], # Now correctly fetched
359+
"source_doc_id": getattr(new_item.metadata, "source_doc_id", None)
360+
})
361+
362+
if kb_log_content:
363+
event = self.create_event_log(
364+
label="knowledgeBaseUpdate",
365+
log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.",
366+
user_id=msg.user_id,
367+
mem_cube_id=msg.mem_cube_id,
368+
mem_cube=self.current_mem_cube,
369+
memcube_log_content=kb_log_content,
370+
metadata=None, # Per design doc for KB logs
371+
memory_len=len(kb_log_content),
372+
memcube_name=self._map_memcube_name(msg.mem_cube_id),
373+
)
374+
event.task_id = msg.task_id
375+
self._submit_web_logs([event])
376+
else:
377+
# Existing: Playground/Default Logging
378+
# Reconstruct add_content/add_meta/update_content/update_meta from prepared_items
379+
# This ensures existing logging path continues to work with pre-existing data structures
380+
add_content_legacy: list[dict] = []
381+
add_meta_legacy: list[dict] = []
382+
update_content_legacy: list[dict] = []
383+
update_meta_legacy: list[dict] = []
384+
385+
for item in prepared_add_items:
386+
key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory)
387+
add_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item.id})
388+
add_meta_legacy.append({
389+
"ref_id": item.id, "id": item.id, "key": item.metadata.key, "memory": item.memory,
390+
"memory_type": item.metadata.memory_type, "status": item.metadata.status,
391+
"confidence": item.metadata.confidence, "tags": item.metadata.tags,
392+
"updated_at": getattr(item.metadata, "updated_at", None) or getattr(item.metadata, "update_at", None),
393+
})
394+
395+
for item_data in prepared_update_items_with_original:
396+
item = item_data["new_item"]
397+
key = getattr(item.metadata, "key", None) or transform_name_to_key(name=item.memory)
398+
update_content_legacy.append({"content": f"{key}: {item.memory}", "ref_id": item.id})
399+
update_meta_legacy.append({
400+
"ref_id": item.id, "id": item.id, "key": item.metadata.key, "memory": item.memory,
401+
"memory_type": item.metadata.memory_type, "status": item.metadata.status,
402+
"confidence": item.metadata.confidence, "tags": item.metadata.tags,
403+
"updated_at": getattr(item.metadata, "updated_at", None) or getattr(item.metadata, "update_at", None),
404+
})
405+
406+
events = []
407+
if add_content_legacy:
408+
event = self.create_event_log(
409+
label="addMemory", from_memory_type=USER_INPUT_TYPE, to_memory_type=LONG_TERM_MEMORY_TYPE,
410+
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, mem_cube=self.current_mem_cube,
411+
memcube_log_content=add_content_legacy, metadata=add_meta_legacy, memory_len=len(add_content_legacy),
412+
memcube_name=self._map_memcube_name(msg.mem_cube_id),
413+
)
414+
event.task_id = msg.task_id
415+
events.append(event)
416+
if update_content_legacy:
417+
event = self.create_event_log(
418+
label="updateMemory", from_memory_type=LONG_TERM_MEMORY_TYPE, to_memory_type=LONG_TERM_MEMORY_TYPE,
419+
user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, mem_cube=self.current_mem_cube,
420+
memcube_log_content=update_content_legacy, metadata=update_meta_legacy, memory_len=len(update_content_legacy),
421+
memcube_name=self._map_memcube_name(msg.mem_cube_id),
422+
)
423+
event.task_id = msg.task_id
424+
events.append(event)
425+
if events:
426+
self._submit_web_logs(events)
358427

359428
except Exception as e:
360429
logger.error(f"Error: {e}", exc_info=True)

0 commit comments

Comments
 (0)