Skip to content

Commit c8500ec

Browse files
authored
feat: Enhance File Parsing Pipeline with Chunk-Level Source Tracking & Unified Multi-Modal Parsing (#645)
* fix: doc fine mode bug * fix: doc fine mode bug * feat: init longbench_v2 * feat: more strict embedder trucation * feat: parallel processing fine mode in multi-modal-fine * feat: update parsers; add chunk info into source; remove origin_part * feat: modify chunk_content in file-fine-parser
1 parent 1f3606f commit c8500ec

File tree

6 files changed

+72
-36
lines changed

6 files changed

+72
-36
lines changed

src/memos/mem_reader/multi_modal_struct.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def _process_one_item(fast_item: TextualMemoryItem) -> list[TextualMemoryItem]:
422422

423423
fine_memory_items: list[TextualMemoryItem] = []
424424

425-
with ContextThreadPoolExecutor(max_workers=8) as executor:
425+
with ContextThreadPoolExecutor(max_workers=30) as executor:
426426
futures = [executor.submit(_process_one_item, item) for item in fast_memory_items]
427427

428428
for future in concurrent.futures.as_completed(futures):

src/memos/mem_reader/read_multi_modal/file_content_parser.py

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -167,28 +167,38 @@ def create_source(
167167
self,
168168
message: File,
169169
info: dict[str, Any],
170+
chunk_index: int | None = None,
171+
chunk_total: int | None = None,
170172
chunk_content: str | None = None,
171173
) -> SourceMessage:
172174
"""Create SourceMessage from file content part."""
173175
if isinstance(message, dict):
174176
file_info = message.get("file", {})
175-
return SourceMessage(
176-
type="file",
177-
doc_path=file_info.get("filename") or file_info.get("file_id", ""),
178-
content=chunk_content if chunk_content else file_info.get("file_data", ""),
179-
original_part=message,
180-
)
181-
return SourceMessage(type="file", doc_path=str(message))
177+
source_dict = {
178+
"type": "file",
179+
"doc_path": file_info.get("filename") or file_info.get("file_id", ""),
180+
"content": chunk_content if chunk_content else file_info.get("file_data", ""),
181+
}
182+
# Add chunk ordering information if provided
183+
if chunk_index is not None:
184+
source_dict["chunk_index"] = chunk_index
185+
if chunk_total is not None:
186+
source_dict["chunk_total"] = chunk_total
187+
return SourceMessage(**source_dict)
188+
source_dict = {"type": "file", "doc_path": str(message)}
189+
if chunk_index is not None:
190+
source_dict["chunk_index"] = chunk_index
191+
if chunk_total is not None:
192+
source_dict["chunk_total"] = chunk_total
193+
if chunk_content is not None:
194+
source_dict["content"] = chunk_content
195+
return SourceMessage(**source_dict)
182196

183197
def rebuild_from_source(
184198
self,
185199
source: SourceMessage,
186200
) -> File:
187201
"""Rebuild file content part from SourceMessage."""
188-
# Use original_part if available
189-
if hasattr(source, "original_part") and source.original_part:
190-
return source.original_part
191-
192202
# Rebuild from source fields
193203
return {
194204
"type": "file",
@@ -312,9 +322,6 @@ def parse_fast(
312322
# Split content into chunks
313323
content_chunks = self._split_text(content)
314324

315-
# Create source
316-
source = self.create_source(message, info)
317-
318325
# Extract info fields
319326
info_ = info.copy()
320327
if file_id:
@@ -326,12 +333,23 @@ def parse_fast(
326333
# (since we don't have role information at this level)
327334
memory_type = "LongTermMemory"
328335
file_ids = [file_id] if file_id else []
336+
total_chunks = len(content_chunks)
337+
329338
# Create memory items for each chunk
330339
memory_items = []
331340
for chunk_idx, chunk_text in enumerate(content_chunks):
332341
if not chunk_text.strip():
333342
continue
334343

344+
# Create source for this specific chunk with its index and content
345+
source = self.create_source(
346+
message,
347+
info,
348+
chunk_index=chunk_idx,
349+
chunk_total=total_chunks,
350+
chunk_content=chunk_text,
351+
)
352+
335353
memory_item = TextualMemoryItem(
336354
memory=chunk_text,
337355
metadata=TreeNodeTextualMemoryMetadata(
@@ -342,7 +360,7 @@ def parse_fast(
342360
tags=[
343361
"mode:fast",
344362
"multimodal:file",
345-
f"chunk:{chunk_idx + 1}/{len(content_chunks)}",
363+
f"chunk:{chunk_idx + 1}/{total_chunks}",
346364
],
347365
key=_derive_key(chunk_text),
348366
embedding=self.embedder.embed([chunk_text])[0],
@@ -359,6 +377,14 @@ def parse_fast(
359377

360378
# If no chunks were created, create a placeholder
361379
if not memory_items:
380+
# Create source for placeholder (no chunk index since there are no chunks)
381+
placeholder_source = self.create_source(
382+
message,
383+
info,
384+
chunk_index=None,
385+
chunk_total=0,
386+
chunk_content=content,
387+
)
362388
memory_item = TextualMemoryItem(
363389
memory=content,
364390
metadata=TreeNodeTextualMemoryMetadata(
@@ -370,7 +396,7 @@ def parse_fast(
370396
key=_derive_key(content),
371397
embedding=self.embedder.embed([content])[0],
372398
usage=[],
373-
sources=[source],
399+
sources=[placeholder_source],
374400
background="",
375401
confidence=0.99,
376402
type="fact",
@@ -463,7 +489,9 @@ def parse_fine(
463489
parsed_text = self._handle_base64(file_data)
464490

465491
else:
466-
parsed_text = file_data
492+
# TODO: discuss the proper place for processing
493+
# string file-data
494+
return []
467495
# Priority 2: If file_id is provided but no file_data, try to use file_id as path
468496
elif file_id:
469497
logger.warning(f"[FileContentParser] File data not provided for file_id: {file_id}")
@@ -518,10 +546,26 @@ def _make_memory_item(
518546
mem_type: str = memory_type,
519547
tags: list[str] | None = None,
520548
key: str | None = None,
549+
chunk_idx: int | None = None,
521550
chunk_content: str | None = None,
522551
) -> TextualMemoryItem:
523-
"""Construct memory item with common fields."""
524-
source = self.create_source(message, info, chunk_content)
552+
"""Construct memory item with common fields.
553+
554+
Args:
555+
value: Memory content (chunk text)
556+
mem_type: Memory type
557+
tags: Tags for the memory item
558+
key: Key for the memory item
559+
chunk_idx: Index of the chunk in the document (0-based)
560+
"""
561+
# Create source for this specific chunk with its index and content
562+
chunk_source = self.create_source(
563+
message,
564+
info,
565+
chunk_index=chunk_idx,
566+
chunk_total=total_chunks,
567+
chunk_content=chunk_content,
568+
)
525569
return TextualMemoryItem(
526570
memory=value,
527571
metadata=TreeNodeTextualMemoryMetadata(
@@ -533,7 +577,7 @@ def _make_memory_item(
533577
key=key if key is not None else _derive_key(value),
534578
embedding=self.embedder.embed([value])[0],
535579
usage=[],
536-
sources=[source],
580+
sources=[chunk_source],
537581
background="",
538582
confidence=0.99,
539583
type="fact",
@@ -555,6 +599,8 @@ def _make_fallback(
555599
f"fallback:{reason}",
556600
f"chunk:{chunk_idx + 1}/{total_chunks}",
557601
],
602+
chunk_idx=chunk_idx,
603+
chunk_content=chunk_text,
558604
)
559605

560606
# Handle empty chunks case
@@ -563,6 +609,7 @@ def _make_fallback(
563609
_make_memory_item(
564610
value=parsed_text or "[File: empty content]",
565611
tags=["mode:fine", "multimodal:file"],
612+
chunk_idx=None,
566613
)
567614
]
568615

@@ -591,6 +638,7 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem:
591638
mem_type=llm_mem_type,
592639
tags=tags,
593640
key=response_json.get("key"),
641+
chunk_idx=chunk_idx,
594642
chunk_content=chunk_text,
595643
)
596644
except Exception as e:
@@ -638,6 +686,8 @@ def _process_chunk(chunk_idx: int, chunk_text: str) -> TextualMemoryItem:
638686

639687
return memory_items or [
640688
_make_memory_item(
641-
value=parsed_text or "[File: empty content]", tags=["mode:fine", "multimodal:file"]
689+
value=parsed_text or "[File: empty content]",
690+
tags=["mode:fine", "multimodal:file"],
691+
chunk_idx=None,
642692
)
643693
]

src/memos/mem_reader/read_multi_modal/image_parser.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ def create_source(
5353
return SourceMessage(
5454
type="image",
5555
content=url,
56-
original_part=message,
5756
url=url,
5857
detail=detail,
5958
)
@@ -64,10 +63,6 @@ def rebuild_from_source(
6463
source: SourceMessage,
6564
) -> ChatCompletionContentPartImageParam:
6665
"""Rebuild image_url content part from SourceMessage."""
67-
# Use original_part if available
68-
if hasattr(source, "original_part") and source.original_part:
69-
return source.original_part
70-
7166
# Rebuild from source fields
7267
url = getattr(source, "url", "") or (source.content or "").replace("[image_url]: ", "")
7368
detail = getattr(source, "detail", "auto")

src/memos/mem_reader/read_multi_modal/text_content_parser.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ def create_source(
5151
return SourceMessage(
5252
type="text",
5353
content=text,
54-
original_part=message,
5554
)
5655
return SourceMessage(type="text", content=str(message))
5756

src/memos/mem_reader/read_multi_modal/tool_parser.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ def create_source(
7979
filename=file_info.get("filename", ""),
8080
file_id=file_info.get("file_id", ""),
8181
tool_call_id=tool_call_id,
82-
original_part=part,
8382
)
8483
)
8584
elif part_type == "image_url":
@@ -93,7 +92,6 @@ def create_source(
9392
content=file_info.get("url", ""),
9493
detail=file_info.get("detail", "auto"),
9594
tool_call_id=tool_call_id,
96-
original_part=part,
9795
)
9896
)
9997
elif part_type == "input_audio":
@@ -107,7 +105,6 @@ def create_source(
107105
content=file_info.get("data", ""),
108106
format=file_info.get("format", "wav"),
109107
tool_call_id=tool_call_id,
110-
original_part=part,
111108
)
112109
)
113110
else:

src/memos/mem_reader/read_multi_modal/user_parser.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ def create_source(
6868
chat_time=chat_time,
6969
message_id=message_id,
7070
content=part.get("text", ""),
71-
# Save original part for reconstruction
72-
original_part=part,
7371
)
7472
)
7573
elif part_type == "file":
@@ -82,7 +80,6 @@ def create_source(
8280
message_id=message_id,
8381
doc_path=file_info.get("filename") or file_info.get("file_id", ""),
8482
content=file_info.get("file_data", ""),
85-
original_part=part,
8683
)
8784
)
8885
elif part_type == "image_url":
@@ -94,7 +91,6 @@ def create_source(
9491
chat_time=chat_time,
9592
message_id=message_id,
9693
image_path=image_info.get("url"),
97-
original_part=part,
9894
)
9995
)
10096
else:
@@ -106,7 +102,6 @@ def create_source(
106102
chat_time=chat_time,
107103
message_id=message_id,
108104
content=f"[{part_type}]",
109-
original_part=part,
110105
)
111106
)
112107
else:

0 commit comments

Comments
 (0)