@@ -23,7 +23,6 @@ import (
23
23
"github.com/google/uuid"
24
24
"github.com/pkg/errors"
25
25
log "github.com/sirupsen/logrus"
26
- "gorm.io/gorm"
27
26
)
28
27
29
28
// SliceUploadManager 分片上传管理器
@@ -36,85 +35,18 @@ type SliceUploadManager struct {
36
35
type SliceUploadSession struct {
37
36
* tables.SliceUpload
38
37
tmpFile * os.File
39
- mutex sync.Mutex // 使用Mutex而不是RWMutex,保持与原始实现一致
38
+ mutex sync.Mutex
40
39
}
41
40
42
41
// NewSliceUploadManager 创建分片上传管理器
43
42
func NewSliceUploadManager () * SliceUploadManager {
44
43
manager := & SliceUploadManager {}
45
- // 系统重启后清理未完成的上传任务,因为前端session会失效
46
44
go manager .cleanupIncompleteUploads ()
47
45
return manager
48
46
}
49
47
50
- // CreateSession 创建新的上传会话 - 完整实现Preup逻辑
48
+ // CreateSession 创建新的上传会话
51
49
func (m * SliceUploadManager ) CreateSession (ctx context.Context , storage driver.Driver , actualPath string , req * reqres.PreupReq ) (* reqres.PreupResp , error ) {
52
- // 检查是否存在未完成的上传任务(用于断点续传)
53
- wh := map [string ]any {
54
- "dst_path" : req .Path ,
55
- "name" : req .Name ,
56
- "size" : req .Size ,
57
- "status" : []int {
58
- tables .SliceUploadStatusWaiting , // 等待状态(重启后恢复)
59
- tables .SliceUploadStatusUploading , // 上传中状态
60
- },
61
- }
62
- if req .Hash .Md5 != "" {
63
- wh ["hash_md5" ] = req .Hash .Md5
64
- }
65
- if req .Hash .Sha1 != "" {
66
- wh ["hash_sha1" ] = req .Hash .Sha1
67
- }
68
- if req .Hash .Md5256KB != "" {
69
- wh ["hash_md5_256kb" ] = req .Hash .Md5256KB
70
- }
71
-
72
- su , err := db .GetSliceUpload (wh )
73
- if err != nil && ! errors .Is (err , gorm .ErrRecordNotFound ) {
74
- log .Error ("GetSliceUpload" , err )
75
- return nil , errors .WithStack (err )
76
- }
77
-
78
- if su .TaskID != "" { // 找到未完成的上传任务,支持断点续传
79
- // 验证临时文件是否仍然存在(仅对非原生分片上传)
80
- if su .TmpFile != "" {
81
- if _ , err := os .Stat (su .TmpFile ); os .IsNotExist (err ) {
82
- // 临时文件丢失,清理数据库记录,重新开始
83
- log .Warnf ("Temporary file lost after restart, cleaning up task: %s" , su .TaskID )
84
- if deleteErr := db .DeleteSliceUploadByTaskID (su .TaskID ); deleteErr != nil {
85
- log .Errorf ("Failed to delete lost slice upload task: %v" , deleteErr )
86
- }
87
- // 继续创建新任务
88
- } else {
89
- // Temporary file exists, can continue resumable upload (traditional upload mode)
90
- session := & SliceUploadSession {SliceUpload : su }
91
- m .cache .Store (su .TaskID , session )
92
- completedSlices := tables .CountUploadedSlices (su .SliceUploadStatus )
93
- log .Infof ("Resuming file-based slice upload: %s, completed: %d/%d" ,
94
- su .TaskID , completedSlices , su .SliceCnt )
95
- return & reqres.PreupResp {
96
- TaskID : su .TaskID ,
97
- SliceSize : su .SliceSize ,
98
- SliceCnt : su .SliceCnt ,
99
- SliceUploadStatus : su .SliceUploadStatus ,
100
- }, nil
101
- }
102
- } else {
103
- // Native slice upload, relying on frontend intelligent retry and state sync
104
- session := & SliceUploadSession {SliceUpload : su }
105
- m .cache .Store (su .TaskID , session )
106
- completedSlices := tables .CountUploadedSlices (su .SliceUploadStatus )
107
- log .Infof ("Resuming native slice upload: %s, completed: %d/%d, relying on frontend sync" ,
108
- su .TaskID , completedSlices , su .SliceCnt )
109
- return & reqres.PreupResp {
110
- TaskID : su .TaskID ,
111
- SliceSize : su .SliceSize ,
112
- SliceCnt : su .SliceCnt ,
113
- SliceUploadStatus : su .SliceUploadStatus ,
114
- }, nil
115
- }
116
- }
117
-
118
50
srcobj , err := op .Get (ctx , storage , actualPath )
119
51
if err != nil {
120
52
log .Error (err )
@@ -307,7 +239,7 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
307
239
return nil
308
240
}
309
241
310
- // CompleteUpload 完成上传 - 完整实现原始逻辑
242
+ // CompleteUpload 完成上传
311
243
func (m * SliceUploadManager ) CompleteUpload (ctx context.Context , storage driver.Driver , taskID string ) (* reqres.UploadSliceCompleteResp , error ) {
312
244
var err error
313
245
@@ -320,22 +252,16 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
320
252
// 检查是否所有分片都已上传
321
253
session .mutex .Lock ()
322
254
allUploaded := tables .IsAllSliceUploaded (session .SliceUploadStatus , session .SliceCnt )
323
- isPendingComplete := session .Status == tables .SliceUploadStatusPendingComplete
324
255
session .mutex .Unlock ()
325
256
326
- if ! allUploaded && ! isPendingComplete {
257
+ if ! allUploaded {
327
258
return & reqres.UploadSliceCompleteResp {
328
259
Complete : 0 ,
329
260
SliceUploadStatus : session .SliceUploadStatus ,
330
261
TaskID : session .TaskID ,
331
262
}, nil
332
263
}
333
264
334
- // 如果是PendingComplete状态,说明是重启后恢复的任务,直接尝试完成
335
- if isPendingComplete {
336
- log .Infof ("Processing pending complete task after restart: %s" , session .TaskID )
337
- }
338
-
339
265
defer func () {
340
266
// 确保资源清理和缓存删除
341
267
session .cleanup ()
@@ -463,7 +389,7 @@ func (s *SliceUploadSession) ensureTmpFile() error {
463
389
s .TmpFile = tmpPath
464
390
s .tmpFile = tf
465
391
466
- // 更新数据库中的临时文件路径,支持重启后恢复
392
+ // 更新数据库中的临时文件路径
467
393
if updateErr := db .UpdateSliceUpload (s .SliceUpload ); updateErr != nil {
468
394
log .Errorf ("Failed to update temp file path in database: %v" , updateErr )
469
395
// 不返回错误,因为文件已经创建成功,只是数据库更新失败
0 commit comments