11"""Parser for file content parts (RawMessageList)."""
22
3+ import concurrent .futures
34import os
45import tempfile
56
67from typing import Any
78
9+ from tqdm import tqdm
10+
11+ from memos .context .context import ContextThreadPoolExecutor
812from memos .embedders .base import BaseEmbedder
913from memos .llms .base import BaseLLM
1014from memos .log import get_logger
15+ from memos .mem_reader .read_multi_modal .base import BaseMessageParser , _derive_key
16+ from memos .mem_reader .read_multi_modal .utils import (
17+ detect_lang ,
18+ get_parser ,
19+ parse_json_result ,
20+ )
1121from memos .memories .textual .item import (
1222 SourceMessage ,
1323 TextualMemoryItem ,
1424 TreeNodeTextualMemoryMetadata ,
1525)
26+ from memos .templates .mem_reader_prompts import (
27+ CUSTOM_TAGS_INSTRUCTION ,
28+ CUSTOM_TAGS_INSTRUCTION_ZH ,
29+ SIMPLE_STRUCT_DOC_READER_PROMPT ,
30+ SIMPLE_STRUCT_DOC_READER_PROMPT_ZH ,
31+ )
1632from memos .types .openai_chat_completion_types import File
1733
18- from .base import BaseMessageParser , _derive_key
19- from .utils import get_parser
20-
2134
2235logger = get_logger (__name__ )
2336
37+ # Prompt dictionary for doc processing (shared by simple_struct and file_content_parser)
38+ DOC_PROMPT_DICT = {
39+ "doc" : {"en" : SIMPLE_STRUCT_DOC_READER_PROMPT , "zh" : SIMPLE_STRUCT_DOC_READER_PROMPT_ZH },
40+ "custom_tags" : {"en" : CUSTOM_TAGS_INSTRUCTION , "zh" : CUSTOM_TAGS_INSTRUCTION_ZH },
41+ }
42+
2443
2544class FileContentParser (BaseMessageParser ):
2645 """Parser for file content parts."""
2746
28- def _handle_url (self , url_str : str , filename : str ) -> tuple [str , str | None ]:
47+ def _get_doc_llm_response (self , chunk_text : str , custom_tags : list [str ] | None = None ) -> dict :
48+ """
49+ Call LLM to extract memory from document chunk.
50+ Uses doc prompts from DOC_PROMPT_DICT.
51+
52+ Args:
53+ chunk_text: Text chunk to extract memory from
54+ custom_tags: Optional list of custom tags for LLM extraction
55+
56+ Returns:
57+ Parsed JSON response from LLM or empty dict if failed
58+ """
59+ if not self .llm :
60+ logger .warning ("[FileContentParser] LLM not available for fine mode" )
61+ return {}
62+
63+ lang = detect_lang (chunk_text )
64+ template = DOC_PROMPT_DICT ["doc" ][lang ]
65+ prompt = template .replace ("{chunk_text}" , chunk_text )
66+
67+ custom_tags_prompt = (
68+ DOC_PROMPT_DICT ["custom_tags" ][lang ].replace ("{custom_tags}" , str (custom_tags ))
69+ if custom_tags
70+ else ""
71+ )
72+ prompt = prompt .replace ("{custom_tags_prompt}" , custom_tags_prompt )
73+
74+ messages = [{"role" : "user" , "content" : prompt }]
75+ try :
76+ response_text = self .llm .generate (messages )
77+ response_json = parse_json_result (response_text )
78+ except Exception as e :
79+ logger .error (f"[FileContentParser] LLM generation error: { e } " )
80+ response_json = {}
81+ return response_json
82+
83+ def _handle_url (self , url_str : str , filename : str ) -> tuple [str , str | None , bool ]:
2984 """Download and parse file from URL."""
3085 try :
3186 from urllib .parse import urlparse
@@ -42,14 +97,14 @@ def _handle_url(self, url_str: str, filename: str) -> tuple[str, str | None]:
4297 filename = os .path .basename (parsed_url .path ) or "downloaded_file"
4398
4499 if hostname in self .direct_markdown_hostnames :
45- return response .text , None
100+ return response .text , None , True
46101
47102 file_ext = os .path .splitext (filename )[1 ].lower ()
48103 if file_ext in [".md" , ".markdown" , ".txt" ]:
49- return response .text , None
104+ return response .text , None , True
50105 with tempfile .NamedTemporaryFile (mode = "wb" , delete = False , suffix = file_ext ) as temp_file :
51106 temp_file .write (response .content )
52- return "" , temp_file .name
107+ return "" , temp_file .name , False
53108 except Exception as e :
54109 logger .error (f"[FileContentParser] URL processing error: { e } " )
55110 return f"[File URL download failed: { url_str } ]" , None
@@ -261,6 +316,8 @@ def parse_fast(
261316
262317 # Extract info fields
263318 info_ = info .copy ()
319+ if file_id :
320+ info_ .update ({"file_id" : file_id })
264321 user_id = info_ .pop ("user_id" , "" )
265322 session_id = info_ .pop ("session_id" , "" )
266323
@@ -331,10 +388,19 @@ def parse_fine(
331388 """
332389 Parse file content part in fine mode.
333390 Fine mode downloads and parses file content, especially for URLs.
391+ Then uses LLM to extract structured memories from each chunk.
392+
334393 Handles various file parameter scenarios:
335394 - file_data: URL (http://, https://, or @http://), base64 encoded data, or plain text content
336395 - file_id: ID of an uploaded file
337396 - filename: name of the file
397+
398+ Args:
399+ message: File content part to parse
400+ info: Dictionary containing user_id and session_id
401+ **kwargs: Additional parameters including:
402+ - custom_tags: Optional list of custom tags for LLM extraction
403+ - context_items: Optional list of TextualMemoryItem for context
338404 """
339405 if not isinstance (message , dict ):
340406 logger .warning (f"[FileContentParser] Expected dict, got { type (message )} " )
@@ -351,6 +417,9 @@ def parse_fine(
351417 file_id = file_info .get ("file_id" , "" )
352418 filename = file_info .get ("filename" , "" )
353419
420+ # Extract custom_tags from kwargs (for LLM extraction)
421+ custom_tags = kwargs .get ("custom_tags" )
422+
354423 # Use parser from utils
355424 parser = self .parser or get_parser ()
356425 if not parser :
@@ -359,6 +428,7 @@ def parse_fine(
359428
360429 parsed_text = ""
361430 temp_file_path = None
431+ is_markdown = False
362432
363433 try :
364434 # Priority 1: If file_data is provided, process it
@@ -367,7 +437,9 @@ def parse_fine(
367437 url_str = file_data [1 :] if file_data .startswith ("@" ) else file_data
368438
369439 if url_str .startswith (("http://" , "https://" )):
370- parsed_text , temp_file_path = self ._handle_url (url_str , filename )
440+ parsed_text , temp_file_path , is_markdown = self ._handle_url (
441+ url_str , filename
442+ )
371443 if temp_file_path :
372444 try :
373445 # Use parser from utils
@@ -432,26 +504,30 @@ def parse_fine(
432504 # Split parsed text into chunks
433505 content_chunks = self ._split_text (parsed_text )
434506
435- # Create memory items for each chunk
436- memory_items = []
437- for chunk_idx , chunk_text in enumerate (content_chunks ):
438- if not chunk_text .strip ():
439- continue
440-
441- memory_item = TextualMemoryItem (
442- memory = chunk_text ,
507+ # Filter out empty chunks and create indexed list
508+ valid_chunks = [
509+ (idx , chunk_text ) for idx , chunk_text in enumerate (content_chunks ) if chunk_text .strip ()
510+ ]
511+ total_chunks = len (content_chunks )
512+
513+ # Helper function to create memory item (similar to SimpleStructMemReader._make_memory_item)
514+ def _make_memory_item (
515+ value : str ,
516+ mem_type : str = memory_type ,
517+ tags : list [str ] | None = None ,
518+ key : str | None = None ,
519+ ) -> TextualMemoryItem :
520+ """Construct memory item with common fields."""
521+ return TextualMemoryItem (
522+ memory = value ,
443523 metadata = TreeNodeTextualMemoryMetadata (
444524 user_id = user_id ,
445525 session_id = session_id ,
446- memory_type = memory_type ,
526+ memory_type = mem_type ,
447527 status = "activated" ,
448- tags = [
449- "mode:fine" ,
450- "multimodal:file" ,
451- f"chunk:{ chunk_idx + 1 } /{ len (content_chunks )} " ,
452- ],
453- key = _derive_key (chunk_text ),
454- embedding = self .embedder .embed ([chunk_text ])[0 ],
528+ tags = tags or [],
529+ key = key if key is not None else _derive_key (value ),
530+ embedding = self .embedder .embed ([value ])[0 ],
455531 usage = [],
456532 sources = [source ],
457533 background = "" ,
@@ -460,28 +536,102 @@ def parse_fine(
460536 info = info_ ,
461537 ),
462538 )
463- memory_items .append (memory_item )
464539
465- # If no chunks were created, create a placeholder
466- if not memory_items :
467- memory_item = TextualMemoryItem (
468- memory = parsed_text ,
469- metadata = TreeNodeTextualMemoryMetadata (
470- user_id = user_id ,
471- session_id = session_id ,
472- memory_type = memory_type ,
473- status = "activated" ,
474- tags = ["mode:fine" , "multimodal:file" ],
475- key = _derive_key (parsed_text ),
476- embedding = self .embedder .embed ([parsed_text ])[0 ],
477- usage = [],
478- sources = [source ],
479- background = "" ,
480- confidence = 0.99 ,
481- type = "fact" ,
482- info = info_ ,
483- ),
540+ # Helper function to create fallback item for a chunk
541+ def _make_fallback (
542+ chunk_idx : int , chunk_text : str , reason : str = "raw"
543+ ) -> TextualMemoryItem :
544+ """Create fallback memory item with raw chunk text."""
545+ return _make_memory_item (
546+ value = chunk_text ,
547+ tags = [
548+ "mode:fine" ,
549+ "multimodal:file" ,
550+ f"fallback:{ reason } " ,
551+ f"chunk:{ chunk_idx + 1 } /{ total_chunks } " ,
552+ ],
484553 )
485- memory_items .append (memory_item )
486554
487- return memory_items
555+ # Handle empty chunks case
556+ if not valid_chunks :
557+ return [
558+ _make_memory_item (
559+ value = parsed_text or "[File: empty content]" ,
560+ tags = ["mode:fine" , "multimodal:file" ],
561+ )
562+ ]
563+
564+ # If no LLM available, create memory items directly from chunks
565+ if not self .llm :
566+ return [_make_fallback (idx , text , "no_llm" ) for idx , text in valid_chunks ]
567+
568+ # Process single chunk with LLM extraction (worker function)
569+ def _process_chunk (chunk_idx : int , chunk_text : str ) -> TextualMemoryItem :
570+ """Process chunk with LLM, fallback to raw on failure."""
571+ try :
572+ response_json = self ._get_doc_llm_response (chunk_text , custom_tags )
573+ if response_json :
574+ value = response_json .get ("value" , "" ).strip ()
575+ if value :
576+ tags = response_json .get ("tags" , [])
577+ tags = tags if isinstance (tags , list ) else []
578+ tags .extend (["mode:fine" , "multimodal:file" ])
579+
580+ llm_mem_type = response_json .get ("memory_type" , memory_type )
581+ if llm_mem_type not in ["LongTermMemory" , "UserMemory" ]:
582+ llm_mem_type = memory_type
583+
584+ return _make_memory_item (
585+ value = value ,
586+ mem_type = llm_mem_type ,
587+ tags = tags ,
588+ key = response_json .get ("key" ),
589+ )
590+ except Exception as e :
591+ logger .error (f"[FileContentParser] LLM error for chunk { chunk_idx } : { e } " )
592+
593+ # Fallback to raw chunk
594+ logger .warning (f"[FileContentParser] Fallback to raw for chunk { chunk_idx } " )
595+ return _make_fallback (chunk_idx , chunk_text )
596+
597+ # Process chunks concurrently with progress bar
598+ memory_items = []
599+ chunk_map = dict (valid_chunks )
600+ total_chunks = len (valid_chunks )
601+
602+ logger .info (f"[FileContentParser] Processing { total_chunks } chunks with LLM..." )
603+
604+ with ContextThreadPoolExecutor (max_workers = 20 ) as executor :
605+ futures = {
606+ executor .submit (_process_chunk , idx , text ): idx for idx , text in valid_chunks
607+ }
608+
609+ # Use tqdm for progress bar (similar to simple_struct.py _process_doc_data)
610+ for future in tqdm (
611+ concurrent .futures .as_completed (futures ),
612+ total = total_chunks ,
613+ desc = "[FileContentParser] Processing chunks" ,
614+ ):
615+ chunk_idx = futures [future ]
616+ try :
617+ node = future .result ()
618+ if node :
619+ memory_items .append (node )
620+ except Exception as e :
621+ tqdm .write (f"[ERROR] Chunk { chunk_idx } failed: { e } " )
622+ logger .error (f"[FileContentParser] Future failed for chunk { chunk_idx } : { e } " )
623+ # Create fallback for failed future
624+ if chunk_idx in chunk_map :
625+ memory_items .append (
626+ _make_fallback (chunk_idx , chunk_map [chunk_idx ], "error" )
627+ )
628+
629+ logger .info (
630+ f"[FileContentParser] Completed processing { len (memory_items )} /{ total_chunks } chunks"
631+ )
632+
633+ return memory_items or [
634+ _make_memory_item (
635+ value = parsed_text or "[File: empty content]" , tags = ["mode:fine" , "multimodal:file" ]
636+ )
637+ ]
0 commit comments