22
33import concurrent .futures
44import os
5+ import re
56import tempfile
67
78from typing import Any
1314from memos .llms .base import BaseLLM
1415from memos .log import get_logger
1516from memos .mem_reader .read_multi_modal .base import BaseMessageParser , _derive_key
17+ from memos .mem_reader .read_multi_modal .image_parser import ImageParser
1618from memos .mem_reader .read_multi_modal .utils import (
1719 detect_lang ,
1820 get_parser ,
@@ -129,6 +131,137 @@ def _handle_local(self, data: str) -> str:
129131 logger .info ("[FileContentParser] Local file paths are not supported in fine mode." )
130132 return ""
131133
134+ def _process_single_image (
135+ self , image_url : str , original_ref : str , info : dict [str , Any ], ** kwargs
136+ ) -> tuple [str , str ]:
137+ """
138+ Process a single image and return (original_ref, replacement_text).
139+
140+ Args:
141+ image_url: URL of the image to process
142+ original_ref: Original markdown image reference to replace
143+ info: Dictionary containing user_id and session_id
144+ **kwargs: Additional parameters for ImageParser
145+
146+ Returns:
147+ Tuple of (original_ref, replacement_text)
148+ """
149+ try :
150+ # Construct image message format for ImageParser
151+ image_message = {
152+ "type" : "image_url" ,
153+ "image_url" : {
154+ "url" : image_url ,
155+ "detail" : "auto" ,
156+ },
157+ }
158+
159+ # Process image using ImageParser
160+ logger .debug (f"[FileContentParser] Processing image: { image_url } " )
161+ memory_items = self .image_parser .parse_fine (image_message , info , ** kwargs )
162+
163+ # Extract text content from memory items (only strings as requested)
164+ extracted_texts = []
165+ for item in memory_items :
166+ if hasattr (item , "memory" ) and item .memory :
167+ extracted_texts .append (str (item .memory ))
168+
169+ if extracted_texts :
170+ # Combine all extracted texts
171+ extracted_content = "\n " .join (extracted_texts )
172+ # Replace image with extracted content
173+ return (
174+ original_ref ,
175+ f"\n [Image Content from { image_url } ]:\n { extracted_content } \n " ,
176+ )
177+ else :
178+ # If no content extracted, keep original with a note
179+ logger .warning (f"[FileContentParser] No content extracted from image: { image_url } " )
180+ return (
181+ original_ref ,
182+ f"\n [Image: { image_url } - No content extracted]\n " ,
183+ )
184+
185+ except Exception as e :
186+ logger .error (f"[FileContentParser] Error processing image { image_url } : { e } " )
187+ # On error, keep original image reference
188+ return (original_ref , original_ref )
189+
190+ def _extract_and_process_images (self , text : str , info : dict [str , Any ], ** kwargs ) -> str :
191+ """
192+ Extract all images from markdown text and process them using ImageParser in parallel.
193+ Replaces image references with extracted text content.
194+
195+ Args:
196+ text: Markdown text containing image references
197+ info: Dictionary containing user_id and session_id
198+ **kwargs: Additional parameters for ImageParser
199+
200+ Returns:
201+ Text with image references replaced by extracted content
202+ """
203+ if not text or not self .image_parser :
204+ return text
205+
206+ # Pattern to match markdown images:  or 
207+ image_pattern = r"!\[([^\]]*)\]\(([^)]+)\)"
208+
209+ # Find all image matches first
210+ image_matches = list (re .finditer (image_pattern , text ))
211+ if not image_matches :
212+ return text
213+
214+ logger .info (f"[FileContentParser] Found { len (image_matches )} images to process in parallel" )
215+
216+ # Prepare tasks for parallel processing
217+ tasks = []
218+ for match in image_matches :
219+ image_url = match .group (2 )
220+ original_ref = match .group (0 )
221+ tasks .append ((image_url , original_ref ))
222+
223+ # Process images in parallel
224+ replacements = {}
225+ max_workers = min (len (tasks ), 10 ) # Limit concurrent image processing
226+
227+ with ContextThreadPoolExecutor (max_workers = max_workers ) as executor :
228+ futures = {
229+ executor .submit (
230+ self ._process_single_image , image_url , original_ref , info , ** kwargs
231+ ): (image_url , original_ref )
232+ for image_url , original_ref in tasks
233+ }
234+
235+ # Collect results with progress tracking
236+ for future in tqdm (
237+ concurrent .futures .as_completed (futures ),
238+ total = len (futures ),
239+ desc = "[FileContentParser] Processing images" ,
240+ ):
241+ try :
242+ original_ref , replacement = future .result ()
243+ replacements [original_ref ] = replacement
244+ except Exception as e :
245+ image_url , original_ref = futures [future ]
246+ logger .error (f"[FileContentParser] Future failed for image { image_url } : { e } " )
247+ # On error, keep original image reference
248+ replacements [original_ref ] = original_ref
249+
250+ # Replace all images in the text
251+ processed_text = text
252+ for original , replacement in replacements .items ():
253+ processed_text = processed_text .replace (original , replacement , 1 )
254+
255+ # Count successfully extracted images
256+ success_count = sum (
257+ 1 for replacement in replacements .values () if "Image Content from" in replacement
258+ )
259+ logger .info (
260+ f"[FileContentParser] Processed { len (image_matches )} images in parallel, "
261+ f"extracted content for { success_count } images"
262+ )
263+ return processed_text
264+
132265 def __init__ (
133266 self ,
134267 embedder : BaseEmbedder ,
@@ -149,6 +282,8 @@ def __init__(
149282 """
150283 super ().__init__ (embedder , llm )
151284 self .parser = parser
285+ # Initialize ImageParser for processing images in markdown
286+ self .image_parser = ImageParser (embedder , llm ) if llm else None
152287
153288 # Get inner markdown hostnames from config or environment
154289 if direct_markdown_hostnames is not None :
@@ -521,6 +656,10 @@ def parse_fine(
521656 f"[FileContentParser] Failed to delete temp file { temp_file_path } : { e } "
522657 )
523658
659+ # Extract and process images from parsed_text
660+ if is_markdown and parsed_text and self .image_parser :
661+ parsed_text = self ._extract_and_process_images (parsed_text , info , ** kwargs )
662+
524663 # Extract info fields
525664 if not info :
526665 info = {}
0 commit comments