Skip to content

Commit 24411ec

Browse files
committed
feat(upload): Added the function of resuming unfinished multipart upload tasks, supported streaming uploads and optimized upload logic
1 parent a58e939 commit 24411ec

File tree

6 files changed

+247
-40
lines changed

6 files changed

+247
-40
lines changed

internal/db/slice_upload.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,23 @@ func DeleteSliceUploadByTaskID(taskID string) error {
4343
return errors.WithStack(db.Where("task_id = ?", taskID).Delete(&tables.SliceUpload{}).Error)
4444
}
4545

46+
// GetIncompleteSliceUploads 获取所有未完成的分片上传任务(用于重启恢复)
47+
func GetIncompleteSliceUploads() ([]*tables.SliceUpload, error) {
48+
var uploads []*tables.SliceUpload
49+
err := db.Where("status IN (?)", []int{
50+
tables.SliceUploadStatusWaiting,
51+
tables.SliceUploadStatusUploading,
52+
tables.SliceUploadStatusProxyComplete,
53+
tables.SliceUploadStatusPendingComplete,
54+
}).Find(&uploads).Error
55+
56+
if err != nil {
57+
return nil, errors.WithStack(err)
58+
}
59+
60+
return uploads, nil
61+
}
62+
4663
// UpdateSliceUploadWithTx 使用事务更新分片上传状态,确保数据一致性
4764
func UpdateSliceUploadWithTx(su *tables.SliceUpload) error {
4865
return errors.WithStack(db.Transaction(func(tx *gorm.DB) error {

internal/fs/fs.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package fs
33
import (
44
"context"
55
"io"
6-
"mime/multipart"
76

87
log "github.com/sirupsen/logrus"
98

@@ -198,9 +197,9 @@ func Preup(c context.Context, s driver.Driver, actualPath string, req *reqres.Pr
198197
return getGlobalSliceManager().CreateSession(c, s, actualPath, req)
199198
}
200199

201-
// UploadSlice 上传切片 - 使用新的管理器重构
202-
func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, file multipart.File) error {
203-
return getGlobalSliceManager().UploadSlice(ctx, storage, req, file)
200+
// UploadSlice 流式上传切片 - 使用新的管理器重构,支持流式上传
201+
func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, reader io.Reader) error {
202+
return getGlobalSliceManager().UploadSlice(ctx, storage, req, reader)
204203
}
205204

206205
// SliceUpComplete 完成分片上传 - 使用新的管理器重构

internal/fs/sliceup.go

Lines changed: 196 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package fs
33
import (
44
"context"
55
"fmt"
6-
"mime/multipart"
6+
"io"
77
"os"
88
"path/filepath"
99
"strings"
@@ -42,9 +42,14 @@ type SliceUploadSession struct {
4242
// NewSliceUploadManager 创建分片上传管理器
4343
func NewSliceUploadManager() *SliceUploadManager {
4444
tempDirPath := tempdir.GetPersistentTempDir()
45-
return &SliceUploadManager{
45+
manager := &SliceUploadManager{
4646
tempDir: tempDirPath,
4747
}
48+
49+
// 启动时恢复未完成的上传任务
50+
go manager.recoverIncompleteUploads()
51+
52+
return manager
4853
}
4954

5055
// CreateSession 创建新的上传会话 - 完整实现Preup逻辑
@@ -227,8 +232,8 @@ func (m *SliceUploadManager) getOrLoadSession(taskID string) (*SliceUploadSessio
227232
return session, nil
228233
}
229234

230-
// UploadSlice 上传分片 - 完整实现原始逻辑
231-
func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, file multipart.File) error {
235+
// UploadSlice 流式上传分片 - 支持流式上传,避免表单上传的内存占用
236+
func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, reader io.Reader) error {
232237
var err error
233238

234239
session, err := m.getOrLoadSession(req.TaskID)
@@ -286,7 +291,39 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
286291
switch s := storage.(type) {
287292
case driver.ISliceUpload:
288293
log.Info("SliceUpload support")
289-
if err := s.SliceUpload(ctx, session.SliceUpload, req.SliceNum, file); err != nil {
294+
// 对于支持原生分片上传的驱动,我们需要将流数据缓存到临时文件中
295+
// 以支持重试和断点续传场景
296+
if err := session.ensureTmpFile(); err != nil {
297+
log.Error("ensureTmpFile error for native slice upload", req, err)
298+
return err
299+
}
300+
301+
// 将流数据写入临时文件的指定位置
302+
sw := &sliceWriter{
303+
file: session.tmpFile,
304+
offset: int64(req.SliceNum) * int64(session.SliceSize),
305+
}
306+
writtenBytes, err := utils.CopyWithBuffer(sw, reader)
307+
if err != nil {
308+
log.Error("Copy to temp file error for native slice upload", req, err)
309+
return err
310+
}
311+
log.Debugf("Written %d bytes to temp file for slice %d", writtenBytes, req.SliceNum)
312+
313+
// 从临时文件读取数据进行上传
314+
sliceSize := session.SliceSize
315+
if req.SliceNum == session.SliceCnt-1 {
316+
// 最后一个分片,计算实际大小
317+
sliceSize = session.Size - int64(req.SliceNum)*int64(session.SliceSize)
318+
}
319+
320+
sliceReader := &sliceReader{
321+
file: session.tmpFile,
322+
offset: int64(req.SliceNum) * int64(session.SliceSize),
323+
size: sliceSize,
324+
}
325+
326+
if err := s.SliceUpload(ctx, session.SliceUpload, req.SliceNum, sliceReader); err != nil {
290327
log.Error("SliceUpload error", req, err)
291328
return err
292329
}
@@ -302,7 +339,7 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
302339
file: session.tmpFile,
303340
offset: int64(req.SliceNum) * int64(session.SliceSize),
304341
}
305-
_, err := utils.CopyWithBuffer(sw, file)
342+
_, err := utils.CopyWithBuffer(sw, reader)
306343
if err != nil {
307344
log.Error("Copy error", req, err)
308345
return err
@@ -336,16 +373,22 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
336373
// 检查是否所有分片都已上传
337374
session.mutex.Lock()
338375
allUploaded := tables.IsAllSliceUploaded(session.SliceUploadStatus, session.SliceCnt)
376+
isPendingComplete := session.Status == tables.SliceUploadStatusPendingComplete
339377
session.mutex.Unlock()
340378

341-
if !allUploaded {
379+
if !allUploaded && !isPendingComplete {
342380
return &reqres.UploadSliceCompleteResp{
343381
Complete: 0,
344382
SliceUploadStatus: session.SliceUploadStatus,
345383
TaskID: session.TaskID,
346384
}, nil
347385
}
348386

387+
// 如果是PendingComplete状态,说明是重启后恢复的任务,直接尝试完成
388+
if isPendingComplete {
389+
log.Infof("Processing pending complete task after restart: %s", session.TaskID)
390+
}
391+
349392
defer func() {
350393
// 确保资源清理和缓存删除
351394
session.cleanup()
@@ -475,6 +518,12 @@ func (s *SliceUploadSession) ensureTmpFile() error {
475518
s.TmpFile = tmpPath
476519
s.tmpFile = tf
477520

521+
// 更新数据库中的临时文件路径,支持重启后恢复
522+
if updateErr := db.UpdateSliceUpload(s.SliceUpload); updateErr != nil {
523+
log.Errorf("Failed to update temp file path in database: %v", updateErr)
524+
// 不返回错误,因为文件已经创建成功,只是数据库更新失败
525+
}
526+
478527
log.Debugf("Created persistent temp file: %s", tmpPath)
479528
return nil
480529
}
@@ -537,3 +586,143 @@ func (sw *sliceWriter) Write(p []byte) (int, error) {
537586
sw.offset += int64(n)
538587
return n, err
539588
}
589+
590+
// sliceReader 用于从临时文件中读取指定分片的数据,支持断点续传
591+
type sliceReader struct {
592+
file *os.File
593+
offset int64
594+
size int64
595+
position int64 // 当前读取位置(相对于分片开始)
596+
}
597+
598+
// Read implements io.Reader interface
599+
func (sr *sliceReader) Read(p []byte) (int, error) {
600+
if sr.position >= sr.size {
601+
return 0, io.EOF
602+
}
603+
604+
// 计算实际可读取的字节数
605+
remaining := sr.size - sr.position
606+
if int64(len(p)) > remaining {
607+
p = p[:remaining]
608+
}
609+
610+
n, err := sr.file.ReadAt(p, sr.offset+sr.position)
611+
sr.position += int64(n)
612+
return n, err
613+
}
614+
615+
// Seek implements io.Seeker interface,支持重试场景
616+
func (sr *sliceReader) Seek(offset int64, whence int) (int64, error) {
617+
var newPos int64
618+
switch whence {
619+
case io.SeekStart:
620+
newPos = offset
621+
case io.SeekCurrent:
622+
newPos = sr.position + offset
623+
case io.SeekEnd:
624+
newPos = sr.size + offset
625+
default:
626+
return 0, fmt.Errorf("invalid whence value: %d", whence)
627+
}
628+
629+
if newPos < 0 {
630+
return 0, fmt.Errorf("negative position: %d", newPos)
631+
}
632+
if newPos > sr.size {
633+
newPos = sr.size
634+
}
635+
636+
sr.position = newPos
637+
return newPos, nil
638+
}
639+
640+
// recoverIncompleteUploads 恢复重启后未完成的上传任务
641+
func (m *SliceUploadManager) recoverIncompleteUploads() {
642+
defer func() {
643+
if r := recover(); r != nil {
644+
log.Errorf("Panic in recoverIncompleteUploads: %v", r)
645+
}
646+
}()
647+
648+
// 等待一段时间,确保系统完全启动
649+
time.Sleep(10 * time.Second)
650+
651+
log.Info("Starting recovery of incomplete slice uploads...")
652+
653+
// 查询所有未完成的上传任务
654+
incompleteUploads, err := db.GetIncompleteSliceUploads()
655+
if err != nil {
656+
log.Errorf("Failed to get incomplete slice uploads: %v", err)
657+
return
658+
}
659+
660+
if len(incompleteUploads) == 0 {
661+
log.Info("No incomplete slice uploads found")
662+
return
663+
}
664+
665+
log.Infof("Found %d incomplete slice uploads, starting recovery...", len(incompleteUploads))
666+
667+
for _, upload := range incompleteUploads {
668+
m.recoverSingleUpload(upload)
669+
}
670+
671+
log.Info("Slice upload recovery completed")
672+
}
673+
674+
// recoverSingleUpload 恢复单个上传任务
675+
func (m *SliceUploadManager) recoverSingleUpload(upload *tables.SliceUpload) {
676+
defer func() {
677+
if r := recover(); r != nil {
678+
log.Errorf("Panic in recoverSingleUpload for task %s: %v", upload.TaskID, r)
679+
}
680+
}()
681+
682+
log.Infof("Recovering upload task: %s, status: %s", upload.TaskID, upload.Status)
683+
684+
// 检查是否所有切片都已上传完成
685+
if tables.IsAllSliceUploaded(upload.SliceUploadStatus, upload.SliceCnt) {
686+
// 所有切片都已完成,尝试完成上传
687+
log.Infof("All slices completed for task %s, attempting to complete upload", upload.TaskID)
688+
m.attemptCompleteUpload(upload)
689+
return
690+
}
691+
692+
// 部分切片未完成的情况
693+
completedSlices := tables.CountUploadedSlices(upload.SliceUploadStatus)
694+
log.Infof("Task %s: %d/%d slices completed, marking as available for resume",
695+
upload.TaskID, completedSlices, upload.SliceCnt)
696+
697+
// 更新状态为等待用户继续上传
698+
upload.Status = tables.SliceUploadStatusWaiting
699+
upload.Message = "Ready for resume after server restart"
700+
if err := db.UpdateSliceUpload(upload); err != nil {
701+
log.Errorf("Failed to update slice upload status for task %s: %v", upload.TaskID, err)
702+
}
703+
704+
// 如果有临时文件但文件不存在,清理记录
705+
if upload.TmpFile != "" {
706+
if _, err := os.Stat(upload.TmpFile); os.IsNotExist(err) {
707+
log.Warnf("Temporary file lost for task %s, cleaning up", upload.TaskID)
708+
if err := db.DeleteSliceUploadByTaskID(upload.TaskID); err != nil {
709+
log.Errorf("Failed to clean up lost task %s: %v", upload.TaskID, err)
710+
}
711+
}
712+
}
713+
}
714+
715+
// attemptCompleteUpload 尝试完成上传(用于恢复已完成切片的任务)
716+
func (m *SliceUploadManager) attemptCompleteUpload(upload *tables.SliceUpload) {
717+
// 这里需要获取存储驱动,但在恢复阶段我们无法直接获取 storage driver
718+
// 所以我们将状态标记为待完成,等用户下次操作时自动完成
719+
upload.Status = tables.SliceUploadStatusPendingComplete
720+
upload.Message = "All slices completed, waiting for final completion"
721+
722+
if err := db.UpdateSliceUpload(upload); err != nil {
723+
log.Errorf("Failed to update slice upload to pending complete for task %s: %v", upload.TaskID, err)
724+
return
725+
}
726+
727+
log.Infof("Task %s marked as pending completion", upload.TaskID)
728+
}

internal/model/tables/slice_upload.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const (
1313
SliceUploadStatusFailed
1414
// SliceUploadStatusProxyComplete 成功上传到代理服务,等待上传到网盘
1515
SliceUploadStatusProxyComplete
16+
// SliceUploadStatusPendingComplete 等待完成(所有切片已上传,等待最终完成处理)
17+
SliceUploadStatusPendingComplete
1618
)
1719

1820
// SliceUpload 分片上传数据表

0 commit comments

Comments
 (0)