Skip to content

Commit c1966b3

Browse files
committed
feat: s3适配切片上传
1 parent a20a0be commit c1966b3

File tree

6 files changed

+549
-165
lines changed

6 files changed

+549
-165
lines changed

apps/base/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class UploadChunk(models.Model):
5151
file_name = fields.CharField(max_length=255)
5252
created_at = fields.DatetimeField(auto_now_add=True)
5353
completed = fields.BooleanField(default=False)
54+
actual_size = fields.IntField(null=True) # 实际接收的分片大小
5455

5556

5657
class KeyValue(Model):

apps/base/views.py

Lines changed: 179 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,16 @@ async def download_file(key: str, code: str, ip: str = Depends(ip_limit["error"]
164164

165165
@chunk_api.post("/upload/init/", dependencies=[Depends(share_required_login)])
166166
async def init_chunk_upload(data: InitChunkUploadModel):
167+
# 服务端校验:根据 total_chunks * chunk_size 计算理论最大上传量
168+
total_chunks = (data.file_size + data.chunk_size - 1) // data.chunk_size
169+
max_possible_size = total_chunks * data.chunk_size
170+
if max_possible_size > settings.uploadSize:
171+
max_size_mb = settings.uploadSize / (1024 * 1024)
172+
raise HTTPException(
173+
status_code=403,
174+
detail=f"文件大小超过限制,最大为 {max_size_mb:.2f} MB"
175+
)
176+
167177
# # 秒传检查
168178
# existing = await FileCodes.filter(file_hash=data.file_hash).first()
169179
# if existing:
@@ -179,9 +189,30 @@ async def init_chunk_upload(data: InitChunkUploadModel):
179189
# "name": f'{existing.prefix}{existing.suffix}'
180190
# })
181191

182-
# 创建上传会话
192+
# 断点续传:检查是否存在相同文件的未完成上传会话
193+
existing_session = await UploadChunk.filter(
194+
chunk_hash=data.file_hash,
195+
chunk_index=-1,
196+
file_size=data.file_size,
197+
file_name=data.file_name,
198+
).first()
199+
200+
if existing_session:
201+
# 复用已有会话,获取已上传的分片列表
202+
uploaded_chunks = await UploadChunk.filter(
203+
upload_id=existing_session.upload_id,
204+
completed=True
205+
).values_list('chunk_index', flat=True)
206+
return APIResponse(detail={
207+
"existed": False,
208+
"upload_id": existing_session.upload_id,
209+
"chunk_size": existing_session.chunk_size,
210+
"total_chunks": existing_session.total_chunks,
211+
"uploaded_chunks": list(uploaded_chunks)
212+
})
213+
214+
# 创建新的上传会话
183215
upload_id = uuid.uuid4().hex
184-
total_chunks = (data.file_size + data.chunk_size - 1) // data.chunk_size
185216
await UploadChunk.create(
186217
upload_id=upload_id,
187218
chunk_index=-1,
@@ -191,17 +222,12 @@ async def init_chunk_upload(data: InitChunkUploadModel):
191222
chunk_hash=data.file_hash,
192223
file_name=data.file_name,
193224
)
194-
# 获取已上传的分片列表
195-
uploaded_chunks = await UploadChunk.filter(
196-
upload_id=upload_id,
197-
completed=True
198-
).values_list('chunk_index', flat=True)
199225
return APIResponse(detail={
200226
"existed": False,
201227
"upload_id": upload_id,
202228
"chunk_size": data.chunk_size,
203229
"total_chunks": total_chunks,
204-
"uploaded_chunks": uploaded_chunks
230+
"uploaded_chunks": []
205231
})
206232

207233

@@ -220,11 +246,52 @@ async def upload_chunk(
220246
if chunk_index < 0 or chunk_index >= chunk_info.total_chunks:
221247
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="无效的分片索引")
222248

249+
# 检查是否已上传(支持断点续传)
250+
existing_chunk = await UploadChunk.filter(
251+
upload_id=upload_id,
252+
chunk_index=chunk_index,
253+
completed=True
254+
).first()
255+
if existing_chunk:
256+
return APIResponse(detail={"chunk_hash": existing_chunk.chunk_hash, "skipped": True})
257+
223258
# 读取分片数据并计算哈希
224259
chunk_data = await chunk.read()
260+
chunk_size = len(chunk_data)
261+
262+
# 校验分片大小不超过声明的 chunk_size
263+
if chunk_size > chunk_info.chunk_size:
264+
raise HTTPException(
265+
status.HTTP_400_BAD_REQUEST,
266+
detail=f"分片大小超过声明值: 最大 {chunk_info.chunk_size}, 实际 {chunk_size}"
267+
)
268+
269+
# 计算已上传的实际总大小(防止绕过前端限制)
270+
uploaded_chunks_list = await UploadChunk.filter(
271+
upload_id=upload_id,
272+
completed=True
273+
).all()
274+
uploaded_size = sum(c.actual_size for c in uploaded_chunks_list if c.actual_size)
275+
if uploaded_size + chunk_size > settings.uploadSize:
276+
max_size_mb = settings.uploadSize / (1024 * 1024)
277+
raise HTTPException(
278+
status_code=403,
279+
detail=f"累计上传大小超过限制,最大为 {max_size_mb:.2f} MB"
280+
)
281+
225282
chunk_hash = hashlib.sha256(chunk_data).hexdigest()
226283

227-
# 更新或创建分片记录
284+
# 获取文件路径
285+
_, _, _, _, save_path = await get_chunk_file_path_name(chunk_info.file_name, upload_id)
286+
287+
# 保存分片到存储
288+
storage = storages[settings.file_storage]()
289+
try:
290+
await storage.save_chunk(upload_id, chunk_index, chunk_data, chunk_hash, save_path)
291+
except Exception as e:
292+
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"分片保存失败: {str(e)}")
293+
294+
# 更新或创建分片记录(保存成功后再记录,包含实际大小)
228295
await UploadChunk.update_or_create(
229296
upload_id=upload_id,
230297
chunk_index=chunk_index,
@@ -234,15 +301,59 @@ async def upload_chunk(
234301
'file_size': chunk_info.file_size,
235302
'total_chunks': chunk_info.total_chunks,
236303
'chunk_size': chunk_info.chunk_size,
237-
'file_name': chunk_info.file_name
304+
'file_name': chunk_info.file_name,
305+
'actual_size': chunk_size
238306
}
239307
)
308+
return APIResponse(detail={"chunk_hash": chunk_hash})
309+
310+
311+
@chunk_api.delete("/upload/{upload_id}", dependencies=[Depends(share_required_login)])
312+
async def cancel_upload(upload_id: str):
313+
"""取消上传并清理临时文件"""
314+
chunk_info = await UploadChunk.filter(upload_id=upload_id, chunk_index=-1).first()
315+
if not chunk_info:
316+
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="上传会话不存在")
317+
240318
# 获取文件路径
241319
_, _, _, _, save_path = await get_chunk_file_path_name(chunk_info.file_name, upload_id)
242-
# 保存分片到存储
320+
321+
# 清理存储中的临时文件
243322
storage = storages[settings.file_storage]()
244-
await storage.save_chunk(upload_id, chunk_index, chunk_data, chunk_hash, save_path)
245-
return APIResponse(detail={"chunk_hash": chunk_hash})
323+
try:
324+
await storage.clean_chunks(upload_id, save_path)
325+
except Exception as e:
326+
# 记录错误但不阻止删除数据库记录
327+
pass
328+
329+
# 清理数据库记录
330+
await UploadChunk.filter(upload_id=upload_id).delete()
331+
332+
return APIResponse(detail={"message": "上传已取消"})
333+
334+
335+
@chunk_api.get("/upload/status/{upload_id}", dependencies=[Depends(share_required_login)])
336+
async def get_upload_status(upload_id: str):
337+
"""获取上传状态"""
338+
chunk_info = await UploadChunk.filter(upload_id=upload_id, chunk_index=-1).first()
339+
if not chunk_info:
340+
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="上传会话不存在")
341+
342+
# 获取已上传的分片列表
343+
uploaded_chunks = await UploadChunk.filter(
344+
upload_id=upload_id,
345+
completed=True
346+
).values_list('chunk_index', flat=True)
347+
348+
return APIResponse(detail={
349+
"upload_id": upload_id,
350+
"file_name": chunk_info.file_name,
351+
"file_size": chunk_info.file_size,
352+
"chunk_size": chunk_info.chunk_size,
353+
"total_chunks": chunk_info.total_chunks,
354+
"uploaded_chunks": list(uploaded_chunks),
355+
"progress": len(uploaded_chunks) / chunk_info.total_chunks * 100
356+
})
246357

247358

248359
@chunk_api.post("/upload/complete/{upload_id}", dependencies=[Depends(share_required_login)])
@@ -254,32 +365,63 @@ async def complete_upload(upload_id: str, data: CompleteUploadModel, ip: str = D
254365

255366
storage = storages[settings.file_storage]()
256367
# 验证所有分片
257-
completed_chunks = await UploadChunk.filter(
368+
completed_chunks_list = await UploadChunk.filter(
258369
upload_id=upload_id,
259370
completed=True
260-
).count()
261-
if completed_chunks != chunk_info.total_chunks:
371+
).all()
372+
if len(completed_chunks_list) != chunk_info.total_chunks:
262373
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="分片不完整")
374+
375+
# 计算实际上传的总大小并校验
376+
actual_total_size = sum(c.actual_size for c in completed_chunks_list if c.actual_size)
377+
if actual_total_size > settings.uploadSize:
378+
# 清理已上传的分片
379+
_, _, _, _, save_path = await get_chunk_file_path_name(chunk_info.file_name, upload_id)
380+
try:
381+
await storage.clean_chunks(upload_id, save_path)
382+
except Exception:
383+
pass
384+
await UploadChunk.filter(upload_id=upload_id).delete()
385+
max_size_mb = settings.uploadSize / (1024 * 1024)
386+
raise HTTPException(
387+
status_code=403,
388+
detail=f"实际上传大小超过限制,最大为 {max_size_mb:.2f} MB"
389+
)
390+
263391
# 获取文件路径
264392
path, suffix, prefix, _, save_path = await get_chunk_file_path_name(chunk_info.file_name, upload_id)
265-
# 合并文件并计算哈希
266-
await storage.merge_chunks(upload_id, chunk_info, save_path)
267-
# 创建文件记录
268-
expired_at, expired_count, used_count, code = await get_expire_info(data.expire_value, data.expire_style)
269-
await FileCodes.create(
270-
code=code,
271-
file_hash=chunk_info.chunk_hash,
272-
is_chunked=True,
273-
upload_id=upload_id,
274-
size=chunk_info.file_size,
275-
expired_at=expired_at,
276-
expired_count=expired_count,
277-
used_count=used_count,
278-
file_path=path,
279-
uuid_file_name=f"{prefix}{suffix}",
280-
prefix=prefix,
281-
suffix=suffix
282-
)
283-
# 清理临时文件
284-
await storage.clean_chunks(upload_id, save_path)
285-
return APIResponse(detail={"code": code, "name": chunk_info.file_name})
393+
394+
try:
395+
# 合并文件并计算哈希
396+
_, file_hash = await storage.merge_chunks(upload_id, chunk_info, save_path)
397+
# 创建文件记录
398+
expired_at, expired_count, used_count, code = await get_expire_info(data.expire_value, data.expire_style)
399+
await FileCodes.create(
400+
code=code,
401+
file_hash=file_hash, # 使用合并后计算的哈希
402+
is_chunked=True,
403+
upload_id=upload_id,
404+
size=actual_total_size, # 使用实际上传大小而非前端声明值
405+
expired_at=expired_at,
406+
expired_count=expired_count,
407+
used_count=used_count,
408+
file_path=path,
409+
uuid_file_name=f"{prefix}{suffix}",
410+
prefix=prefix,
411+
suffix=suffix
412+
)
413+
# 清理临时文件
414+
await storage.clean_chunks(upload_id, save_path)
415+
# 清理数据库中的分片记录
416+
await UploadChunk.filter(upload_id=upload_id).delete()
417+
ip_limit["upload"].add_ip(ip)
418+
return APIResponse(detail={"code": code, "name": chunk_info.file_name})
419+
except ValueError as e:
420+
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
421+
except Exception as e:
422+
# 合并失败时清理临时文件
423+
try:
424+
await storage.clean_chunks(upload_id, save_path)
425+
except Exception:
426+
pass
427+
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"文件合并失败: {str(e)}")

0 commit comments

Comments
 (0)