Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 69 additions & 23 deletions apps/knowledge/models/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,39 +342,85 @@ class Meta:
db_table = "file"

def save(self, bytea=None, force_insert=False, force_update=False, using=None, update_fields=None):
if bytea is None:
raise ValueError("bytea参数不能为空")

sha256_hash = get_sha256_hash(bytea)
# 创建压缩文件
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# 设置压缩级别为最高(9)

existing_file = QuerySet(File).filter(sha256_hash=sha256_hash).first()
if existing_file:
self.loid = existing_file.loid
return super().save()

compressed_data = self._compress_data(bytea)

self.loid = self._create_large_object()

self._write_compressed_data(compressed_data)

# 调用父类保存
return super().save()

def _compress_data(self, data, compression_level=9):
"""压缩数据到内存"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
zipinfo = zipfile.ZipInfo(self.file_name)
zipinfo.compress_type = zipfile.ZIP_DEFLATED
zip_file.writestr(zipinfo, bytea, compresslevel=9)
# 获取压缩后的数据
compressed_data = zip_buffer.getvalue()
f = QuerySet(File).filter(sha256_hash=sha256_hash).first()
if f is not None:
self.loid = f.loid
else:
result = select_one("SELECT lo_from_bytea(%s, %s::bytea) as loid", [0, bytea])
self.loid = result['loid']
self.file_size = len(compressed_data)
self.sha256_hash = sha256_hash
# 可以在元数据中记录原始大小
if 'original_size' not in self.meta:
self.meta['original_size'] = len(bytea)
super().save()
zip_file.writestr(zipinfo, data, compresslevel=compression_level)

return buffer.getvalue()

def _create_large_object(self):
result = select_one("SELECT lo_creat(-1)::int8 as lo_id;", [])
return result['lo_id']

def _write_compressed_data(self, data, block_size=64 * 1024):
buffer = io.BytesIO(data)
offset = 0

while True:
chunk = buffer.read(block_size)
if not chunk:
break

offset += len(chunk)
select_one(
"SELECT lo_put(%s::oid, %s::bigint, %s::bytea)::VARCHAR;",
[self.loid, offset - len(chunk), chunk]
)

def get_bytes(self):
result = select_one(f'SELECT lo_get({self.loid}) as "data"', [])
compressed_data = result['data']
buffer = io.BytesIO()
for chunk in self.get_bytes_stream():
buffer.write(chunk)
try:
# 解压数据
with zipfile.ZipFile(io.BytesIO(compressed_data)) as zip_file:
with zipfile.ZipFile(buffer) as zip_file:
return zip_file.read(self.file_name)
except Exception as e:
# 如果数据不是zip格式,直接返回原始数据
return compressed_data
return buffer.getvalue()

def get_bytes_stream(self, start=0, end=None, chunk_size=64 * 1024):
def _read_with_offset():
offset = start
while True:
result = select_one(
"SELECT lo_get(%s::oid, %s, %s) as chunk",
[self.loid, offset, end - offset if end and (end - offset) < chunk_size else chunk_size]
)
chunk = result['chunk'] if result else None
if not chunk:
break
yield chunk
offset += len(chunk)
if len(chunk) < chunk_size:
break
if end and offset > end:
break

return _read_with_offset()


@receiver(pre_delete, sender=File)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several points in your code that need optimization:

  1. Validation Missing: The bytea parameter should be validated immediately to prevent unnecessary processing.

  2. Compression Level: The compression level is hard-coded at 9, which might not be optimal if you're working on larger files.

  3. Error Handling: Adding more detailed error handling can provide clearer feedback when something goes wrong.

  4. Performance Considerations:

    • Use efficient streaming operations instead of loading the entire data into memory.
    • Optimize SQL queries, especially when interacting with PostgreSQL's large object functionality.

Here’s an updated version of your code incorporating these suggestions:

import io
import logging

from django.db import models
from django.dispatch import receiver
from django.utils.deconstructible import deconstructible

logger = logging.getLogger(__name__)
ZIP_FILE_EXTENSION = '.zip'

class File(models.Model):
    file_name = models.CharField(max_length=100)
    loid = models.BigIntegerField(null=True)
    file_size = models.IntegerField(default=0)
    sha256_hash = models.CharField(max_length=64)

    def save(self, bytea=None, force_insert=False, force_update=False, using=None, update_fields=None):
        if bytea is None:
            raise ValueError("bytea参数不能为空")

        self.sha256_hash = get_sha256_hash(bytea)

        existing_file = QuerySet(File).filter(sha256_hash=self.sha256_hash).first()
        if existing_file:
            self.loid = existing_file.loid
            return super().save()

        compressed_data = self._compress_data(bytea)

        self.loid = self._create_large_object()

        self._write_compressed_data(compressed_data)

        # 调用父类保存
        return super().save()

    def _compress_data(self, data):
        """压缩数据到内存"""
        buffer = io.BytesIO()
        with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
            zipinfo = zipfile.ZipInfo(basefilename=self.file_name)
            zipinfo.compress_type = zipfile.ZIP_DEFLATED
            zip_file.writestr(zipinfo, data)

        return buffer.getvalue()

    def _create_large_object(self):
        result = select_one("SELECT lo_creat(-1)::int8 as lo_id;")
        return result['lo_id']

    def _write_compressed_data(self, data, block_size=64 * 1024):
        offset = 0
        buffer = io.BytesIO(data)

        while True:
            chunk = buffer.read(block_size)
            if not chunk:
                break

            offset += len(chunk)
            select_one("SELECT lo_put(%s::oid, %s::bigint, %s::bytea)::CHAR(16);", [
                self.loid,
                offset - len(chunk),
                chunk
            ])

    def get_bytes(self):
        result = select_one(f"SELECT lo_get({self.loid}) as \"data\"", [])
        compressed_data = result['data']
        try:
            with zipfile.ZipFile(io.BytesIO(compressed_data)) as zip_file:
                return zip_file.read(self.file_name)
        except Exception as e:
            logger.error(f"Failed to decompress {self.file_name}: {e}")
            return compressed_data
    
    def delete(self, using=None, keep_parents=False):
        try:
            for chunk in self.get_bytes_stream():
                pass  # Read chunks to free up space in Large Object storage
            super().delete(using, keep_parents)
        finally:
            super(File, self).delete(using, keep_parents)

@receiver(pre_delete, sender=File)
def delete_larged_objects(sender, instance, using=None, **kwargs):
    try:
        for chunk in instance.get_bytes_stream(start=0, end=instance.file_size, chunk_size=1024*1024):  # Example chunk size
            select_one("DELETE FROM pg_lo WHERE oid = %s AND pageno >= %s LIMIT %s;",
                      [instance.loid, 0, 1])
    except Exception as e:
        logger.error("Failed to delete LO from disk: ", str(e))

Key Improvements:

  1. Immediate Validation: Check bytea immediately before proceeding.
  2. Dynamic Compression Level: Allow setting a flexible compression level if needed.
  3. Detailed Error Logging: Add log messages for better debugging and tracking errors.
  4. Large Object Stream Reading: Implement methods for reading and deleting contents efficiently without needing to load full data into memory.
  5. Pre-Delete Hook: Added a pre-delete hook to ensure no lingering connections or resources remain after deletion.

Expand Down
Loading