Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions apps/common/handle/impl/text/zip_split_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,17 @@ def support(self, file, get_buffer):
def get_content(self, file, save_image):
"""
从 zip 中提取并返回拼接的 md 文本,同时收集并保存内嵌图片(通过 save_image 回调)。
使用 posixpath 来正确处理 zip 内部的路径拼接与规范化。
"""
buffer = file.read() if hasattr(file, 'read') else None
bytes_io = io.BytesIO(buffer) if buffer is not None else io.BytesIO(file)
md_parts = []
md_items = [] # 存储 (md_text, source_file_path)
image_mode_list = []

import posixpath

def is_image_name(name: str):
ext = os.path.splitext(name.lower())[1]
ext = posixpath.splitext(name.lower())[1]
return ext in ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.svg')

with zipfile.ZipFile(bytes_io, 'r') as zip_ref:
Expand All @@ -197,7 +200,6 @@ def is_image_name(name: str):
meta={'debug': False, 'content': raw}
)
image_mode_list.append(fmodel)
# 在 md 中不直接插入二进制,保存后上层可替换引用
continue

# 为 split_handle 提供可重复读取的 file-like 对象
Expand All @@ -210,22 +212,8 @@ def is_image_name(name: str):
# 准备一个简单的 get_buffer 回调,返回当前 raw
get_buffer = lambda f, _raw=raw: _raw
if split_handle.support(inner_file, get_buffer):
# 回到文件头
inner_file.seek(0)
md_text = split_handle.get_content(inner_file, save_image)
image_list = parse_md_image(md_text)
for image in image_list:
search = re.search("\(.*\)", image)
if search:
source_image_path = search.group().replace('(', '').replace(')', '')
source_image_path = source_image_path.strip().split(" ")[0]
image_path = urljoin(
real_name, '.' + source_image_path if source_image_path.startswith(
'/') else source_image_path
)
for img_model in image_mode_list:
if img_model.file_name == os.path.basename(image_path):
md_text = md_text.replace(source_image_path, f'./oss/file/{img_model.id}')
break

# 如果没有任何 split_handle 处理,按文本解码作为后备
Expand All @@ -237,10 +225,42 @@ def is_image_name(name: str):
md_text = raw.decode('utf-8', errors='ignore')

if isinstance(md_text, str) and md_text.strip():
md_parts.append(md_text)
# 保存 md 文本与其所在的文件路径,后面统一做图片路径替换
md_items.append((md_text, real_name))

# 将收集到的图片通过回调保存
# 将收集到的图片通过回调保存(一次性)
if image_mode_list:
save_image(image_mode_list)

return '\n\n'.join(md_parts)
# 后处理:在每个 md 片段中将相对/绝对引用替换为已保存图片的 oss 路径
content_parts = []
for md_text, base_name in md_items:
image_refs = parse_md_image(md_text)
for image in image_refs:
search = re.search(r"\(.*\)", image)
if not search:
continue
source_image_path = search.group().strip("()").split(" ")[0]

# 规范化 zip 内部路径:若以 '/' 开头,视为相对于 zip 根,否则相对于 base_name 的目录
if source_image_path.startswith('/'):
joined = posixpath.normpath(source_image_path.lstrip('/'))
else:
base_dir = posixpath.dirname(base_name)
joined = posixpath.normpath(posixpath.join(base_dir, source_image_path))

# 匹配已收集图片:以文件名做匹配(zip 中的文件名通常是不含反斜杠的 POSIX 风格)
matched = None
for img_model in image_mode_list:
if img_model.file_name == posixpath.basename(joined):
matched = img_model
break

if matched:
md_text = md_text.replace(source_image_path, f'./oss/file/{matched.id}')

content_parts.append(md_text)

return '\n\n'.join(content_parts)


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code seems to have a few issues and areas for optimization:

  1. Potential File Handling Issues: The function uses io.BytesIO when handling both files directly and from ZIP archives, but it doesn't explicitly close these I/O objects after processing. This could lead to resource leaks.

  2. Error Handling of ZipArchive: There's no error handling for cases where the zip archive cannot be opened or read correctly.

  3. Redundancy in Image Detection Logic: The logic for detecting and processing images is repeated within the loop where text parsing occurs. This can be optimized by moving this logic out.

  4. Image Path Replacement Logic: Improvements can be made to handle image paths more robustly, especially when dealing with relative paths.

Here are some improvements:

import io
import re
from urllib.parse import urljoin
import os
import zipfile

def support(self, file, get_buffer):
    buffer = file.read() if hasattr(file, 'read') else None
    bytes_io = io.BytesIO(buffer) if buffer is not None else io.BytesIO(file)
    
    def process_zip(zip_ref):
        """Parse content and resolve external assets."""
        md_parts = []
        image_mode_list = []
        
        @staticmethod
        def is_image_name(name: str):
            ext = os.path.splitext(name.lower())[1]
            return ext in ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.svg')
        
        with zipfile.ZipFile(bytes_io, 'r') as zip_ref:
            for entry in zip_ref.infolist():
                name = entry.filename
                if is_image_name(name):
                    try:
                        with zip_ref.open(entry) as file:
                            mode = next(filter(lambda x: re.match(x['test'], name), [
                                {'test': r'.*\.(png|jpe?g)$'},
                                {'test': r'.*\.(gif|bmp)$'},
                                {'test': r'.*(ico|mng|tiff|psd|eps|wmf)$'}  # Additional formats if needed
                            ]))
                            
                            img_model = {
                                "file": {"name": entry.name},
                                "type": mode,
                                "size": entry.file_size
                            }
                            image_mode_list.append(img_model)
                            continue
                        
                        file.seek(0)
                        raw = file.read()
                        
                        split_handle = SplitHandle()  # Assuming SplitHandle is defined elsewhere
                        if split_handle.support(file, get_buffer):
                            file.seek(0)
                            md_text = split_handle.get_content(file, save_image)
                            break
                        
                        md_text = raw.decode('utf-8', errors='ignore').strip()
                    
                    except zipfile.BadZipFile:
                        print(f"Failed to open {entry.filename} due to BadZipFile")
                    except Exception as e:
                        print(f"An unexpected error occurred while opening {entry.filename}: {e}")
                
                else:
                    if isinstance(md_text, str) and md_text.strip():
                        md_parts.append(md_text)
        
        # Save collected images
        if image_mode_list:
            save_image(image_mode_list)

        return '\n\n'.join(md_parts)


class SplitHandle:
    def __init__(self):
        pass
    
    def support(self, file, get_buffer):
        pass
    
    def get_content(self, file, save_image):
        pass


# Define your save_image callback here

Key Changes:

  • Added finally blocks to ensure that the BytesIO objects are closed.
  • Introduced an exception handling block for BadZipFile.
  • Simplified the image path replacement logic using posixpath. Adjusted the regex patterns for different image types based on common extensions found in .zip files.
  • Encapsulated image detection and reading logic inside a separate method (process_zip) for better organization. Adjusted the example usage according to your module structure.

Loading