11"""Parser for file content parts (RawMessageList)."""
22
3+ import concurrent .futures
34import os
45import tempfile
56
67from typing import Any
78
9+ from tqdm import tqdm
10+
11+ from memos .context .context import ContextThreadPoolExecutor
812from memos .embedders .base import BaseEmbedder
913from memos .llms .base import BaseLLM
1014from memos .log import get_logger
1317 TextualMemoryItem ,
1418 TreeNodeTextualMemoryMetadata ,
1519)
20+ from memos .templates .mem_reader_prompts import (
21+ CUSTOM_TAGS_INSTRUCTION ,
22+ CUSTOM_TAGS_INSTRUCTION_ZH ,
23+ SIMPLE_STRUCT_DOC_READER_PROMPT ,
24+ SIMPLE_STRUCT_DOC_READER_PROMPT_ZH ,
25+ )
1626from memos .types .openai_chat_completion_types import File
1727
1828from .base import BaseMessageParser , _derive_key
19- from .utils import get_parser , get_text_splitter
29+ from .utils import (
30+ detect_lang ,
31+ get_parser ,
32+ get_text_splitter ,
33+ parse_json_result ,
34+ )
2035
2136
2237logger = get_logger (__name__ )
2338
39+ # Prompt dictionary for doc processing (shared by simple_struct and file_content_parser)
40+ DOC_PROMPT_DICT = {
41+ "doc" : {"en" : SIMPLE_STRUCT_DOC_READER_PROMPT , "zh" : SIMPLE_STRUCT_DOC_READER_PROMPT_ZH },
42+ "custom_tags" : {"en" : CUSTOM_TAGS_INSTRUCTION , "zh" : CUSTOM_TAGS_INSTRUCTION_ZH },
43+ }
44+
2445
2546class FileContentParser (BaseMessageParser ):
2647 """Parser for file content parts."""
2748
28- def _handle_url (self , url_str : str , filename : str ) -> tuple [str , str | None ]:
49+ def _get_doc_llm_response (self , chunk_text : str , custom_tags : list [str ] | None = None ) -> dict :
50+ """
51+ Call LLM to extract memory from document chunk.
52+ Uses doc prompts from DOC_PROMPT_DICT.
53+
54+ Args:
55+ chunk_text: Text chunk to extract memory from
56+ custom_tags: Optional list of custom tags for LLM extraction
57+
58+ Returns:
59+ Parsed JSON response from LLM or empty dict if failed
60+ """
61+ if not self .llm :
62+ logger .warning ("[FileContentParser] LLM not available for fine mode" )
63+ return {}
64+
65+ lang = detect_lang (chunk_text )
66+ template = DOC_PROMPT_DICT ["doc" ][lang ]
67+ prompt = template .replace ("{chunk_text}" , chunk_text )
68+
69+ custom_tags_prompt = (
70+ DOC_PROMPT_DICT ["custom_tags" ][lang ].replace ("{custom_tags}" , str (custom_tags ))
71+ if custom_tags
72+ else ""
73+ )
74+ prompt = prompt .replace ("{custom_tags_prompt}" , custom_tags_prompt )
75+
76+ messages = [{"role" : "user" , "content" : prompt }]
77+ try :
78+ response_text = self .llm .generate (messages )
79+ response_json = parse_json_result (response_text )
80+ except Exception as e :
81+ logger .error (f"[FileContentParser] LLM generation error: { e } " )
82+ response_json = {}
83+ return response_json
84+
85+ def _handle_url (self , url_str : str , filename : str ) -> tuple [str , str | None , bool ]:
2986 """Download and parse file from URL."""
3087 try :
3188 from urllib .parse import urlparse
@@ -42,14 +99,14 @@ def _handle_url(self, url_str: str, filename: str) -> tuple[str, str | None]:
4299 filename = os .path .basename (parsed_url .path ) or "downloaded_file"
43100
44101 if hostname in self .direct_markdown_hostnames :
45- return response .text , None
102+ return response .text , None , True
46103
47104 file_ext = os .path .splitext (filename )[1 ].lower ()
48105 if file_ext in [".md" , ".markdown" , ".txt" ]:
49- return response .text , None
106+ return response .text , None , True
50107 with tempfile .NamedTemporaryFile (mode = "wb" , delete = False , suffix = file_ext ) as temp_file :
51108 temp_file .write (response .content )
52- return "" , temp_file .name
109+ return "" , temp_file .name , False
53110 except Exception as e :
54111 logger .error (f"[FileContentParser] URL processing error: { e } " )
55112 return f"[File URL download failed: { url_str } ]" , None
@@ -108,7 +165,7 @@ def __init__(
108165 else :
109166 self .direct_markdown_hostnames = []
110167
111- def _split_text (self , text : str ) -> list [str ]:
168+ def _split_text (self , text : str , is_markdown : bool = False ) -> list [str ]:
112169 """
113170 Split text into chunks using text splitter from utils.
114171
@@ -288,6 +345,8 @@ def parse_fast(
288345
289346 # Extract info fields
290347 info_ = info .copy ()
348+ if file_id :
349+ info_ .update ({"file_id" : file_id })
291350 user_id = info_ .pop ("user_id" , "" )
292351 session_id = info_ .pop ("session_id" , "" )
293352
@@ -358,10 +417,19 @@ def parse_fine(
358417 """
359418 Parse file content part in fine mode.
360419 Fine mode downloads and parses file content, especially for URLs.
420+ Then uses LLM to extract structured memories from each chunk.
421+
361422 Handles various file parameter scenarios:
362423 - file_data: URL (http://, https://, or @http://), base64 encoded data, or plain text content
363424 - file_id: ID of an uploaded file
364425 - filename: name of the file
426+
427+ Args:
428+ message: File content part to parse
429+ info: Dictionary containing user_id and session_id
430+ **kwargs: Additional parameters including:
431+ - custom_tags: Optional list of custom tags for LLM extraction
432+ - context_items: Optional list of TextualMemoryItem for context
365433 """
366434 if not isinstance (message , dict ):
367435 logger .warning (f"[FileContentParser] Expected dict, got { type (message )} " )
@@ -378,6 +446,9 @@ def parse_fine(
378446 file_id = file_info .get ("file_id" , "" )
379447 filename = file_info .get ("filename" , "" )
380448
449+ # Extract custom_tags from kwargs (for LLM extraction)
450+ custom_tags = kwargs .get ("custom_tags" )
451+
381452 # Use parser from utils
382453 parser = self .parser or get_parser ()
383454 if not parser :
@@ -386,6 +457,7 @@ def parse_fine(
386457
387458 parsed_text = ""
388459 temp_file_path = None
460+ is_markdown = False
389461
390462 try :
391463 # Priority 1: If file_data is provided, process it
@@ -394,7 +466,9 @@ def parse_fine(
394466 url_str = file_data [1 :] if file_data .startswith ("@" ) else file_data
395467
396468 if url_str .startswith (("http://" , "https://" )):
397- parsed_text , temp_file_path = self ._handle_url (url_str , filename )
469+ parsed_text , temp_file_path , is_markdown = self ._handle_url (
470+ url_str , filename
471+ )
398472 if temp_file_path :
399473 try :
400474 # Use parser from utils
@@ -457,28 +531,32 @@ def parse_fine(
457531 memory_type = "LongTermMemory"
458532
459533 # Split parsed text into chunks
460- content_chunks = self ._split_text (parsed_text )
461-
462- # Create memory items for each chunk
463- memory_items = []
464- for chunk_idx , chunk_text in enumerate (content_chunks ):
465- if not chunk_text .strip ():
466- continue
467-
468- memory_item = TextualMemoryItem (
469- memory = chunk_text ,
534+ content_chunks = self ._split_text (parsed_text , is_markdown )
535+
536+ # Filter out empty chunks and create indexed list
537+ valid_chunks = [
538+ (idx , chunk_text ) for idx , chunk_text in enumerate (content_chunks ) if chunk_text .strip ()
539+ ]
540+ total_chunks = len (content_chunks )
541+
542+ # Helper function to create memory item (similar to SimpleStructMemReader._make_memory_item)
543+ def _make_memory_item (
544+ value : str ,
545+ mem_type : str = memory_type ,
546+ tags : list [str ] | None = None ,
547+ key : str | None = None ,
548+ ) -> TextualMemoryItem :
549+ """Construct memory item with common fields."""
550+ return TextualMemoryItem (
551+ memory = value ,
470552 metadata = TreeNodeTextualMemoryMetadata (
471553 user_id = user_id ,
472554 session_id = session_id ,
473- memory_type = memory_type ,
555+ memory_type = mem_type ,
474556 status = "activated" ,
475- tags = [
476- "mode:fine" ,
477- "multimodal:file" ,
478- f"chunk:{ chunk_idx + 1 } /{ len (content_chunks )} " ,
479- ],
480- key = _derive_key (chunk_text ),
481- embedding = self .embedder .embed ([chunk_text ])[0 ],
557+ tags = tags or [],
558+ key = key if key is not None else _derive_key (value ),
559+ embedding = self .embedder .embed ([value ])[0 ],
482560 usage = [],
483561 sources = [source ],
484562 background = "" ,
@@ -487,28 +565,102 @@ def parse_fine(
487565 info = info_ ,
488566 ),
489567 )
490- memory_items .append (memory_item )
491568
492- # If no chunks were created, create a placeholder
493- if not memory_items :
494- memory_item = TextualMemoryItem (
495- memory = parsed_text ,
496- metadata = TreeNodeTextualMemoryMetadata (
497- user_id = user_id ,
498- session_id = session_id ,
499- memory_type = memory_type ,
500- status = "activated" ,
501- tags = ["mode:fine" , "multimodal:file" ],
502- key = _derive_key (parsed_text ),
503- embedding = self .embedder .embed ([parsed_text ])[0 ],
504- usage = [],
505- sources = [source ],
506- background = "" ,
507- confidence = 0.99 ,
508- type = "fact" ,
509- info = info_ ,
510- ),
569+ # Helper function to create fallback item for a chunk
570+ def _make_fallback (
571+ chunk_idx : int , chunk_text : str , reason : str = "raw"
572+ ) -> TextualMemoryItem :
573+ """Create fallback memory item with raw chunk text."""
574+ return _make_memory_item (
575+ value = chunk_text ,
576+ tags = [
577+ "mode:fine" ,
578+ "multimodal:file" ,
579+ f"fallback:{ reason } " ,
580+ f"chunk:{ chunk_idx + 1 } /{ total_chunks } " ,
581+ ],
511582 )
512- memory_items .append (memory_item )
513583
514- return memory_items
584+ # Handle empty chunks case
585+ if not valid_chunks :
586+ return [
587+ _make_memory_item (
588+ value = parsed_text or "[File: empty content]" ,
589+ tags = ["mode:fine" , "multimodal:file" ],
590+ )
591+ ]
592+
593+ # If no LLM available, create memory items directly from chunks
594+ if not self .llm :
595+ return [_make_fallback (idx , text , "no_llm" ) for idx , text in valid_chunks ]
596+
597+ # Process single chunk with LLM extraction (worker function)
598+ def _process_chunk (chunk_idx : int , chunk_text : str ) -> TextualMemoryItem :
599+ """Process chunk with LLM, fallback to raw on failure."""
600+ try :
601+ response_json = self ._get_doc_llm_response (chunk_text , custom_tags )
602+ if response_json :
603+ value = response_json .get ("value" , "" ).strip ()
604+ if value :
605+ tags = response_json .get ("tags" , [])
606+ tags = tags if isinstance (tags , list ) else []
607+ tags .extend (["mode:fine" , "multimodal:file" ])
608+
609+ llm_mem_type = response_json .get ("memory_type" , memory_type )
610+ if llm_mem_type not in ["LongTermMemory" , "UserMemory" ]:
611+ llm_mem_type = memory_type
612+
613+ return _make_memory_item (
614+ value = value ,
615+ mem_type = llm_mem_type ,
616+ tags = tags ,
617+ key = response_json .get ("key" ),
618+ )
619+ except Exception as e :
620+ logger .error (f"[FileContentParser] LLM error for chunk { chunk_idx } : { e } " )
621+
622+ # Fallback to raw chunk
623+ logger .warning (f"[FileContentParser] Fallback to raw for chunk { chunk_idx } " )
624+ return _make_fallback (chunk_idx , chunk_text )
625+
626+ # Process chunks concurrently with progress bar
627+ memory_items = []
628+ chunk_map = dict (valid_chunks )
629+ total_chunks = len (valid_chunks )
630+
631+ logger .info (f"[FileContentParser] Processing { total_chunks } chunks with LLM..." )
632+
633+ with ContextThreadPoolExecutor (max_workers = 20 ) as executor :
634+ futures = {
635+ executor .submit (_process_chunk , idx , text ): idx for idx , text in valid_chunks
636+ }
637+
638+ # Use tqdm for progress bar (similar to simple_struct.py _process_doc_data)
639+ for future in tqdm (
640+ concurrent .futures .as_completed (futures ),
641+ total = total_chunks ,
642+ desc = "[FileContentParser] Processing chunks" ,
643+ ):
644+ chunk_idx = futures [future ]
645+ try :
646+ node = future .result ()
647+ if node :
648+ memory_items .append (node )
649+ except Exception as e :
650+ tqdm .write (f"[ERROR] Chunk { chunk_idx } failed: { e } " )
651+ logger .error (f"[FileContentParser] Future failed for chunk { chunk_idx } : { e } " )
652+ # Create fallback for failed future
653+ if chunk_idx in chunk_map :
654+ memory_items .append (
655+ _make_fallback (chunk_idx , chunk_map [chunk_idx ], "error" )
656+ )
657+
658+ logger .info (
659+ f"[FileContentParser] Completed processing { len (memory_items )} /{ total_chunks } chunks"
660+ )
661+
662+ return memory_items or [
663+ _make_memory_item (
664+ value = parsed_text or "[File: empty content]" , tags = ["mode:fine" , "multimodal:file" ]
665+ )
666+ ]
0 commit comments