Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def save_image(image_list):
source_id=meta['application_id'] if meta['application_id'] else meta['knowledge_id'],
meta=meta
)
new_file.save(file_bytes)
if not QuerySet(File).filter(id=new_file.id).exists():
new_file.save(file_bytes)

document_list = []
for doc in document:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code looks generally clear and functional, but there are a few areas where improvements can be made:

  1. Empty File Check: The line new_file.save(file_bytes) might not work correctly if file_bytes is empty or null because saving an empty file would result in incorrect data being written to the database.

  2. Duplicate Entry Prevention: Although checking if a record with the same ID already exists before saving is good practice to prevent duplicate entries, it should also handle scenarios where such duplicates need to be updated instead of created (depending on your application logic).

  3. Semicolon Usage: There seems to be a semicolon at the end of the first line inside the loop that's iterating over document. This semicolon has no effect on the functionality, so you could remove it to improve readability.

  4. Comments: It'd be helpful to add comments to explain why certain lines are included, especially those related to data validation or handling.

Here's a refined version of your code with these suggestions applied:

from django.db import models

class Document(models.Model):
    # Define fields here

def save_image(image_list):
    for img in image_list:
        metadata = img.get('metadata')  # Extracting metadata from each item
        new_file = NewFileType.objects.create(  # Assuming this class exists
            name=img['name'],  # Replace 'name' key with actual field name
            content_type=img['content_type'],
            source_id=metadata['application_id'] if metadata['application_id'] else metadata['knowledge_id'],
            meta=metadata
        )
        
        file_bytes = get_file_contents(img)  # Function to retrieve file bytes
        
        # Save only if the file doesn't exist in the database
        if not QuerySet(File).filter(id=new_file.id).exists():
            new_file.save(file_bytes)

def save_document(document_data):
    for doc_data in document_data:
        document_instance = Document(...)  # Initialize Document instance with appropriate arguments
        document_instance.save()  # Save the document instance
    

Make sure to adjust method calls, variable names, and assumptions according to your specific implementation details.

Expand Down
128 changes: 56 additions & 72 deletions apps/common/handle/impl/text/zip_split_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,37 @@ def get_image_list(result_list: list, zip_files: List[str]):
return image_file_list


def get_image_list_by_content(name: str, content: str, zip_files: List[str]):
image_file_list = []
image_list = parse_md_image(content)
for image in image_list:
search = re.search("\(.*\)", image)
if search:
new_image_id = str(uuid.uuid7())
source_image_path = search.group().replace('(', '').replace(')', '')
source_image_path = source_image_path.strip().split(" ")[0]
image_path = urljoin(name, '.' + source_image_path if source_image_path.startswith(
'/') else source_image_path)
if not zip_files.__contains__(image_path):
continue
if image_path.startswith('oss/file/') or image_path.startswith('oss/image/'):
image_id = image_path.replace('oss/file/', '').replace('oss/file/', '')
if is_valid_uuid(image_id):
image_file_list.append({'source_file': image_path,
'image_id': image_id})
else:
image_file_list.append({'source_file': image_path,
'image_id': new_image_id})
content = content.replace(source_image_path, f'./oss/file/{new_image_id}')

else:
image_file_list.append({'source_file': image_path,
'image_id': new_image_id})
content = content.replace(source_image_path, f'./oss/file/{new_image_id}')

return image_file_list, content


def get_file_name(file_name):
try:
file_name_code = file_name.encode('cp437')
Expand Down Expand Up @@ -171,17 +202,12 @@ def get_content(self, file, save_image):
"""
buffer = file.read() if hasattr(file, 'read') else None
bytes_io = io.BytesIO(buffer) if buffer is not None else io.BytesIO(file)
md_items = [] # 存储 (md_text, source_file_path)
image_mode_list = []

import posixpath

def is_image_name(name: str):
ext = posixpath.splitext(name.lower())[1]
return ext in ('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.svg')
image_list = []
content_parts = []

with zipfile.ZipFile(bytes_io, 'r') as zip_ref:
files = zip_ref.namelist()
file_content_list = []
for inner_name in files:
if inner_name.endswith('/') or inner_name.startswith('__MACOSX'):
continue
Expand All @@ -190,77 +216,35 @@ def is_image_name(name: str):
real_name = get_file_name(zf.name)
except Exception:
real_name = zf.name
raw = zf.read()
# 图片直接收集
if is_image_name(real_name):
image_id = str(uuid.uuid7())
fmodel = File(
id=image_id,
file_name=os.path.basename(real_name),
meta={'debug': False, 'content': raw}
)
image_mode_list.append(fmodel)
continue

# 为 split_handle 提供可重复读取的 file-like 对象
inner_file = io.BytesIO(raw)
inner_file.name = real_name

# 尝试使用已注册的 split handle 的 get_content
md_text = None
zf.name = real_name
for split_handle in split_handles:
# 准备一个简单的 get_buffer 回调,返回当前 raw
get_buffer = lambda f, _raw=raw: _raw
if split_handle.support(inner_file, get_buffer):
inner_file.seek(0)
md_text = split_handle.get_content(inner_file, save_image)
get_buffer = FileBufferHandle().get_buffer
if split_handle.support(zf, get_buffer):
row = get_buffer(zf)
md_text = split_handle.get_content(io.BytesIO(row), save_image)
file_content_list.append({'content': md_text, 'name': real_name})
break

# 如果没有任何 split_handle 处理,按文本解码作为后备
if md_text is None:
enc = detect(raw).get('encoding') or 'utf-8'
try:
md_text = raw.decode(enc, errors='ignore')
except Exception:
md_text = raw.decode('utf-8', errors='ignore')

if isinstance(md_text, str) and md_text.strip():
# 保存 md 文本与其所在的文件路径,后面统一做图片路径替换
md_items.append((md_text, real_name))
for file_content in file_content_list:
_image_list, content = get_image_list_by_content(file_content.get('name'), file_content.get("content"),
files)
content_parts.append(content)
for image in _image_list:
image_list.append(image)

# 将收集到的图片通过回调保存(一次性)
if image_mode_list:
if image_list:
image_mode_list = []
for image in image_list:
with zip_ref.open(image.get('source_file')) as f:
i = File(
id=image.get('image_id'),
file_name=os.path.basename(image.get('source_file')),
meta={'debug': False, 'content': f.read()} # 这里的content是二进制数据
)
image_mode_list.append(i)
save_image(image_mode_list)

# 后处理:在每个 md 片段中将相对/绝对引用替换为已保存图片的 oss 路径
content_parts = []
for md_text, base_name in md_items:
image_refs = parse_md_image(md_text)
for image in image_refs:
search = re.search(r"\(.*\)", image)
if not search:
continue
source_image_path = search.group().strip("()").split(" ")[0]

# 规范化 zip 内部路径:若以 '/' 开头,视为相对于 zip 根,否则相对于 base_name 的目录
if source_image_path.startswith('/'):
joined = posixpath.normpath(source_image_path.lstrip('/'))
else:
base_dir = posixpath.dirname(base_name)
joined = posixpath.normpath(posixpath.join(base_dir, source_image_path))

# 匹配已收集图片:以文件名做匹配(zip 中的文件名通常是不含反斜杠的 POSIX 风格)
matched = None
for img_model in image_mode_list:
if img_model.file_name == posixpath.basename(joined):
matched = img_model
break

if matched:
md_text = md_text.replace(source_image_path, f'./oss/file/{matched.id}')

content_parts.append(md_text)

return '\n\n'.join(content_parts)


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The get_image_list_by_content function handles both relative and absolute paths in images within Markdown content, but it only constructs the path from oss/file/, assuming that all referenced images are stored in a specific subdirectory. This might lead to incorrect behavior if images are distributed across directories.

    Optimization Suggestion: Consider using an argument or configuration option that allows specifying the exact subdirectory where these images should be looked for (oss/file/). Additionally, ensure that this directory exists before attempting to retrieve files from it.

  2. There's a small inconsistency between checking whether a file starts with 'oss/file/' or 'oss/image/'. Although they are likely synonymous based on context, explicit differentiation could improve readability:

    # Original code
    if image_path.startswith('oss/files/') or image_path.startswith('oss/images/'):
    
    # Revised code
    if image_path.startswith(('oss,file/', 'oss,image/')):
  3. Some variable names like _image_list and content_parts can be confusing without clear contexts or explanations of their purpose. It would help clarify them by prefixing them with meaningful prefixes such as "result_" or "processed_":

    result_image_list = []
    processed_content_parts = []
  4. The use of hardcoded UUID strings ("./oss/file/" + new_image_id) in place of actual UUIDs (e.g., "./oss/file/{}/{}") when logging details about replacements could potentially lead to misleading information:

    # Incorrectly hard-coded log message
    print(f"Replaced {source_image_path} with ./oss/file/{new_image_id}")
    
    # Correctly formatted log message
    print(f"Replaced {source_image_path} with ./oss/file/{image_id}")
  5. While your handling of ZIP archives is comprehensive, there’s room for improvement regarding error management. For instance, exceptions related to decoding text data or invalid file types might need more attention:

    try:
        enc = detect(raw).get('encoding') or 'utf-8'
        md_text = raw.decode(enc, errors='ignore')
    except Exception as e:
        print(f"Failed to decode text: {e}. Content was ignored.")
        md_text = raw.decode('utf-8', errors='ignore')
  6. You are creating File instances in two separate operations – once directly after extracting each file and another time when you collect all relevant images into one final list. Consolidating these steps into one pass could simplify the logic and make future maintenance easier:

    for name, zf in zip(files, zip_ref):
        if name.endswith('/') or name.startswith('__MACOSX'):
            continue
        # Collect relevant file contents
        content_parts.append({'content': get_buffer(zf), 'name': name})
    
    # Once all valid contents have been collected, create `File` instances.
    for file_content in content_parts:
        rows, updated_content = get_image_list_by_content(file_content.get('name'), file_content.get("content"), files)
        update_content_with_images(updated_content)
    
    def update_content_with_images(new_content):
        # Append all found images
        for image in image_mode_list:
            new_content = new_content.replace(old_source_file, f'./oss/file/{image.id}')
    
        return new_content

7. If you need to maintain compatibility with certain older Python versions or libraries (like `urljoin` or `posixpath`), ensure that those functions are imported correctly and supported in your environment. In newer Python versions or environments, these functions may already be available without needing imports:

```python   
from urllib.parse import urljoin
import os
import posixpath   # Only needed if used directly instead of built-in urlparse.urljoin()

# Example usage:
joined = urljoin(base_dir, source_image_path)
if not posixpath.startswith(joined, '/'):
     joined = posixpath.normpath(os.path.join(base_dir, source_image_path)))

These points aim to enhance the robustness, clarity, efficiency, maintainability and compliance of your code while avoiding potential issues common in its current implementation.

Loading