Skip to content

Commit 47b03e0

Browse files
authored
Merge branch 'dev' into feat/task-message
2 parents edd8389 + c94792d commit 47b03e0

File tree

4 files changed

+144
-9
lines changed

4 files changed

+144
-9
lines changed

src/memos/graph_dbs/polardb.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import random
33
import textwrap
4+
import time
45

56
from contextlib import suppress
67
from datetime import datetime
@@ -152,7 +153,7 @@ def __init__(self, config: PolarDBGraphDBConfig):
152153
# Create connection pool
153154
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
154155
minconn=5,
155-
maxconn=2000,
156+
maxconn=100,
156157
host=host,
157158
port=port,
158159
user=user,
@@ -277,6 +278,8 @@ def _get_connection(self):
277278

278279
if attempt >= max_retries - 1:
279280
raise RuntimeError(f"Failed to get a valid connection from pool: {e}") from e
281+
else:
282+
time.sleep(0.1)
280283
continue
281284

282285
def _return_connection(self, connection):

src/memos/mem_reader/read_multi_modal/file_content_parser.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import concurrent.futures
44
import os
5+
import re
56
import tempfile
67

78
from typing import Any
@@ -13,6 +14,7 @@
1314
from memos.llms.base import BaseLLM
1415
from memos.log import get_logger
1516
from memos.mem_reader.read_multi_modal.base import BaseMessageParser, _derive_key
17+
from memos.mem_reader.read_multi_modal.image_parser import ImageParser
1618
from memos.mem_reader.read_multi_modal.utils import (
1719
detect_lang,
1820
get_parser,
@@ -129,6 +131,137 @@ def _handle_local(self, data: str) -> str:
129131
logger.info("[FileContentParser] Local file paths are not supported in fine mode.")
130132
return ""
131133

134+
def _process_single_image(
135+
self, image_url: str, original_ref: str, info: dict[str, Any], **kwargs
136+
) -> tuple[str, str]:
137+
"""
138+
Process a single image and return (original_ref, replacement_text).
139+
140+
Args:
141+
image_url: URL of the image to process
142+
original_ref: Original markdown image reference to replace
143+
info: Dictionary containing user_id and session_id
144+
**kwargs: Additional parameters for ImageParser
145+
146+
Returns:
147+
Tuple of (original_ref, replacement_text)
148+
"""
149+
try:
150+
# Construct image message format for ImageParser
151+
image_message = {
152+
"type": "image_url",
153+
"image_url": {
154+
"url": image_url,
155+
"detail": "auto",
156+
},
157+
}
158+
159+
# Process image using ImageParser
160+
logger.debug(f"[FileContentParser] Processing image: {image_url}")
161+
memory_items = self.image_parser.parse_fine(image_message, info, **kwargs)
162+
163+
# Extract text content from memory items (only strings as requested)
164+
extracted_texts = []
165+
for item in memory_items:
166+
if hasattr(item, "memory") and item.memory:
167+
extracted_texts.append(str(item.memory))
168+
169+
if extracted_texts:
170+
# Combine all extracted texts
171+
extracted_content = "\n".join(extracted_texts)
172+
# Replace image with extracted content
173+
return (
174+
original_ref,
175+
f"\n[Image Content from {image_url}]:\n{extracted_content}\n",
176+
)
177+
else:
178+
# If no content extracted, keep original with a note
179+
logger.warning(f"[FileContentParser] No content extracted from image: {image_url}")
180+
return (
181+
original_ref,
182+
f"\n[Image: {image_url} - No content extracted]\n",
183+
)
184+
185+
except Exception as e:
186+
logger.error(f"[FileContentParser] Error processing image {image_url}: {e}")
187+
# On error, keep original image reference
188+
return (original_ref, original_ref)
189+
190+
def _extract_and_process_images(self, text: str, info: dict[str, Any], **kwargs) -> str:
191+
"""
192+
Extract all images from markdown text and process them using ImageParser in parallel.
193+
Replaces image references with extracted text content.
194+
195+
Args:
196+
text: Markdown text containing image references
197+
info: Dictionary containing user_id and session_id
198+
**kwargs: Additional parameters for ImageParser
199+
200+
Returns:
201+
Text with image references replaced by extracted content
202+
"""
203+
if not text or not self.image_parser:
204+
return text
205+
206+
# Pattern to match markdown images: ![](url) or ![alt](url)
207+
image_pattern = r"!\[([^\]]*)\]\(([^)]+)\)"
208+
209+
# Find all image matches first
210+
image_matches = list(re.finditer(image_pattern, text))
211+
if not image_matches:
212+
return text
213+
214+
logger.info(f"[FileContentParser] Found {len(image_matches)} images to process in parallel")
215+
216+
# Prepare tasks for parallel processing
217+
tasks = []
218+
for match in image_matches:
219+
image_url = match.group(2)
220+
original_ref = match.group(0)
221+
tasks.append((image_url, original_ref))
222+
223+
# Process images in parallel
224+
replacements = {}
225+
max_workers = min(len(tasks), 10) # Limit concurrent image processing
226+
227+
with ContextThreadPoolExecutor(max_workers=max_workers) as executor:
228+
futures = {
229+
executor.submit(
230+
self._process_single_image, image_url, original_ref, info, **kwargs
231+
): (image_url, original_ref)
232+
for image_url, original_ref in tasks
233+
}
234+
235+
# Collect results with progress tracking
236+
for future in tqdm(
237+
concurrent.futures.as_completed(futures),
238+
total=len(futures),
239+
desc="[FileContentParser] Processing images",
240+
):
241+
try:
242+
original_ref, replacement = future.result()
243+
replacements[original_ref] = replacement
244+
except Exception as e:
245+
image_url, original_ref = futures[future]
246+
logger.error(f"[FileContentParser] Future failed for image {image_url}: {e}")
247+
# On error, keep original image reference
248+
replacements[original_ref] = original_ref
249+
250+
# Replace all images in the text
251+
processed_text = text
252+
for original, replacement in replacements.items():
253+
processed_text = processed_text.replace(original, replacement, 1)
254+
255+
# Count successfully extracted images
256+
success_count = sum(
257+
1 for replacement in replacements.values() if "Image Content from" in replacement
258+
)
259+
logger.info(
260+
f"[FileContentParser] Processed {len(image_matches)} images in parallel, "
261+
f"extracted content for {success_count} images"
262+
)
263+
return processed_text
264+
132265
def __init__(
133266
self,
134267
embedder: BaseEmbedder,
@@ -149,6 +282,8 @@ def __init__(
149282
"""
150283
super().__init__(embedder, llm)
151284
self.parser = parser
285+
# Initialize ImageParser for processing images in markdown
286+
self.image_parser = ImageParser(embedder, llm) if llm else None
152287

153288
# Get inner markdown hostnames from config or environment
154289
if direct_markdown_hostnames is not None:
@@ -521,6 +656,10 @@ def parse_fine(
521656
f"[FileContentParser] Failed to delete temp file {temp_file_path}: {e}"
522657
)
523658

659+
# Extract and process images from parsed_text
660+
if is_markdown and parsed_text and self.image_parser:
661+
parsed_text = self._extract_and_process_images(parsed_text, info, **kwargs)
662+
524663
# Extract info fields
525664
if not info:
526665
info = {}

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,6 @@ def __init__(self, config: BaseSchedulerConfig):
180180
self.current_user_id: UserID | str | None = None
181181
self.current_mem_cube_id: MemCubeID | str | None = None
182182
self.current_mem_cube: BaseMemCube | None = None
183-
try:
184-
self.components = init_components()
185-
self.current_mem_cube: BaseMemCube = self.components["naive_mem_cube"]
186-
except Exception:
187-
logger.info(
188-
"No environment available to initialize mem cube. Using fallback naive_mem_cube."
189-
)
190183

191184
self._mem_cubes: dict[str, BaseMemCube] = {}
192185
self.auth_config_path: str | Path | None = self.config.get("auth_config_path", None)

src/memos/memories/textual/tree_text_memory/organize/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def add(
9292
"""
9393
added_ids: list[str] = []
9494

95-
with ContextThreadPoolExecutor(max_workers=50) as executor:
95+
with ContextThreadPoolExecutor(max_workers=10) as executor:
9696
futures = {executor.submit(self._process_memory, m, user_name): m for m in memories}
9797
for future in as_completed(futures, timeout=500):
9898
try:

0 commit comments

Comments
 (0)