Skip to content

Commit 9d3d11b

Browse files
committed
fix(upload): 移除冗余
1 parent f995c48 commit 9d3d11b

File tree

6 files changed

+8
-64
lines changed

6 files changed

+8
-64
lines changed

internal/db/db.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ func Init(d *gorm.DB) {
1717
if err != nil {
1818
log.Fatalf("failed migrate database: %s", err.Error())
1919
}
20-
21-
// 清理启动前遗留的孤儿分片上传任务
20+
2221
if err := CleanupOrphanedSliceUploads(); err != nil {
2322
log.Errorf("Failed to cleanup orphaned slice uploads: %v", err)
2423
}

internal/db/slice_upload.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,19 @@ func UpdateSliceUploadWithTx(su *tables.SliceUpload) error {
7070
// UpdateSliceStatusAtomic 原子性地更新分片状态
7171
func UpdateSliceStatusAtomic(taskID string, sliceNum int, status []byte) error {
7272
return errors.WithStack(db.Transaction(func(tx *gorm.DB) error {
73-
// 先读取当前状态
7473
var su tables.SliceUpload
7574
if err := tx.Where("task_id = ?", taskID).First(&su).Error; err != nil {
7675
return err
7776
}
7877

79-
// 更新分片状态
8078
tables.SetSliceUploaded(su.SliceUploadStatus, sliceNum)
8179

82-
// 保存更新
8380
return tx.Save(&su).Error
8481
}))
8582
}
8683

8784
// CleanupOrphanedSliceUploads 清理孤儿分片上传记录(启动时调用)
8885
func CleanupOrphanedSliceUploads() error {
89-
// 清理超过24小时的未完成任务
9086
cutoff := time.Now().Add(-24 * time.Hour)
9187

9288
var orphanedTasks []tables.SliceUpload
@@ -99,7 +95,6 @@ func CleanupOrphanedSliceUploads() error {
9995

10096
cleanedCount := 0
10197
for _, task := range orphanedTasks {
102-
// 清理临时文件
10398
if task.TmpFile != "" {
10499
if err := os.Remove(task.TmpFile); err != nil && !os.IsNotExist(err) {
105100
log.Warnf("Failed to remove orphaned tmp file %s: %v", task.TmpFile, err)
@@ -108,7 +103,6 @@ func CleanupOrphanedSliceUploads() error {
108103
}
109104
}
110105

111-
// 删除数据库记录
112106
if err := db.Delete(&task).Error; err != nil {
113107
log.Errorf("Failed to delete orphaned slice upload task %s: %v", task.TaskID, err)
114108
} else {
@@ -120,30 +114,25 @@ func CleanupOrphanedSliceUploads() error {
120114
log.Infof("Cleaned up %d orphaned slice upload tasks", cleanedCount)
121115
}
122116

123-
// 额外清理:扫描临时目录中的孤儿文件
124117
return cleanupOrphanedTempFiles()
125118
}
126119

127120
// cleanupOrphanedTempFiles 清理临时目录中的孤儿文件
128121
func cleanupOrphanedTempFiles() error {
129-
// 获取临时目录路径,使用共享的tempdir包
130122
tempDir := conf.GetPersistentTempDir()
131123

132-
// 检查临时目录是否存在
133124
if _, err := os.Stat(tempDir); os.IsNotExist(err) {
134125
log.Debugf("Temp directory does not exist: %s", tempDir)
135126
return nil
136127
}
137128

138-
// 获取所有活跃的分片上传任务的临时文件列表
139129
var activeTasks []tables.SliceUpload
140130
if err := db.Where("tmp_file IS NOT NULL AND tmp_file != '' AND status IN (?, ?)",
141131
tables.SliceUploadStatusWaiting,
142132
tables.SliceUploadStatusUploading).Find(&activeTasks).Error; err != nil {
143133
return errors.WithStack(err)
144134
}
145135

146-
// 构建活跃文件的映射表
147136
activeFiles := make(map[string]bool)
148137
for _, task := range activeTasks {
149138
if task.TmpFile != "" {
@@ -152,43 +141,36 @@ func cleanupOrphanedTempFiles() error {
152141
}
153142

154143
cleanedCount := 0
155-
cutoff := time.Now().Add(-24 * time.Hour) // 只清理超过24小时的文件
144+
cutoff := time.Now().Add(-24 * time.Hour)
156145

157-
// 遍历临时目录
158146
err := filepath.WalkDir(tempDir, func(path string, d fs.DirEntry, err error) error {
159147
if err != nil {
160148
log.Warnf("Failed to access path %s: %v", path, err)
161149
return nil // 继续处理其他文件
162150
}
163151

164-
// 跳过目录
165152
if d.IsDir() {
166153
return nil
167154
}
168155

169-
// 只处理分片上传临时文件(以slice_upload_开头)
170156
if !strings.HasPrefix(d.Name(), "slice_upload_") {
171157
return nil
172158
}
173159

174-
// 检查文件是否在活跃任务列表中
175160
if activeFiles[path] {
176161
return nil // 文件仍在使用中,跳过
177162
}
178163

179-
// 检查文件修改时间
180164
info, err := d.Info()
181165
if err != nil {
182166
log.Warnf("Failed to get file info for %s: %v", path, err)
183167
return nil
184168
}
185169

186-
// 只清理超过24小时的文件
187170
if info.ModTime().After(cutoff) {
188171
return nil
189172
}
190173

191-
// 删除孤儿文件
192174
if err := os.Remove(path); err != nil {
193175
log.Warnf("Failed to remove orphaned temp file %s: %v", path, err)
194176
} else {

internal/driver/driver.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,18 @@ type Remove interface {
8484
Remove(ctx context.Context, obj model.Obj) error
8585
}
8686

87-
// IUploadInfo 上传信息接口
8887
type IUploadInfo interface {
8988
GetUploadInfo() *model.UploadInfo
9089
}
9190

92-
// IPreup 预上传接口
9391
type IPreup interface {
9492
Preup(ctx context.Context, srcobj model.Obj, req *reqres.PreupReq) (*model.PreupInfo, error)
9593
}
9694

97-
// ISliceUpload 分片上传接口
9895
type ISliceUpload interface {
99-
// SliceUpload 分片上传
10096
SliceUpload(ctx context.Context, req *tables.SliceUpload, sliceno uint, file io.Reader) error
10197
}
10298

103-
// IUploadSliceComplete 分片上传完成接口
10499
type IUploadSliceComplete interface {
105100
UploadSliceComplete(ctx context.Context, req *tables.SliceUpload) error
106101
}

internal/fs/fs.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,19 +190,14 @@ func PutURL(ctx context.Context, path, dstName, urlStr string) error {
190190
return op.PutURL(ctx, storage, dstDirActualPath, dstName, urlStr)
191191
}
192192

193-
/// 分片上传功能--------------------------------------------------------------------
194-
195-
// Preup 预上传 - 使用新的管理器重构
196193
func Preup(c context.Context, s driver.Driver, actualPath string, req *reqres.PreupReq) (*reqres.PreupResp, error) {
197194
return getGlobalSliceManager().CreateSession(c, s, actualPath, req)
198195
}
199196

200-
// UploadSlice 流式上传切片 - 使用新的管理器重构,支持流式上传
201197
func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, reader io.Reader) error {
202198
return getGlobalSliceManager().UploadSlice(ctx, storage, req, reader)
203199
}
204200

205-
// SliceUpComplete 完成分片上传 - 使用新的管理器重构
206201
func SliceUpComplete(ctx context.Context, storage driver.Driver, taskID string) (*reqres.UploadSliceCompleteResp, error) {
207202
return getGlobalSliceManager().CompleteUpload(ctx, storage, taskID)
208203
}

internal/fs/sliceup.go

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,8 @@ func (m *SliceUploadManager) CreateSession(ctx context.Context, storage driver.D
5454
}
5555
user := ctx.Value(conf.UserKey).(*model.User)
5656

57-
// 生成唯一的TaskID
5857
taskID := uuid.New().String()
5958

60-
//创建新的上传任务
6159
createsu := &tables.SliceUpload{
6260
TaskID: taskID,
6361
DstPath: req.Path,
@@ -121,13 +119,11 @@ func (m *SliceUploadManager) CreateSession(ctx context.Context, storage driver.D
121119
}, nil
122120
}
123121

124-
// getOrLoadSession 获取或加载会话,提高代码复用性
125122
func (m *SliceUploadManager) getOrLoadSession(taskID string) (*SliceUploadSession, error) {
126123
session, err, _ := m.sessionG.Do(taskID, func() (*SliceUploadSession, error) {
127124
if s, ok := m.cache.Load(taskID); ok {
128125
return s.(*SliceUploadSession), nil
129126
}
130-
// 首次加载,需要从数据库获取
131127
su, err := db.GetSliceUploadByTaskID(taskID)
132128
if err != nil {
133129
return nil, errors.WithMessagef(err, "failed get slice upload [%s]", taskID)
@@ -141,21 +137,20 @@ func (m *SliceUploadManager) getOrLoadSession(taskID string) (*SliceUploadSessio
141137
return session, err
142138
}
143139

144-
// UploadSlice 流式上传分片 - 支持流式上传,避免表单上传的内存占用
140+
// UploadSlice 流式上传分片
145141
func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, reader io.Reader) error {
146142
session, err := m.getOrLoadSession(req.TaskID)
147143
if err != nil {
148144
log.Errorf("failed to get session: %+v", err)
149145
return err
150146
}
151147

152-
// 确保并发安全的错误处理
153148
defer func() {
154149
if err != nil {
155150
session.mutex.Lock()
156151
session.Status = tables.SliceUploadStatusFailed
157152
session.Message = err.Error()
158-
updateData := *session.SliceUpload // 复制数据避免锁持有时间过长
153+
updateData := *session.SliceUpload
159154
session.mutex.Unlock()
160155

161156
if updateErr := db.UpdateSliceUpload(&updateData); updateErr != nil {
@@ -166,7 +161,6 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
166161

167162
// 使用锁保护状态检查
168163
session.mutex.Lock()
169-
// 检查分片是否已上传过
170164
if tables.IsSliceUploaded(session.SliceUploadStatus, int(req.SliceNum)) {
171165
session.mutex.Unlock()
172166
log.Warnf("slice already uploaded,req:%+v", req)
@@ -178,27 +172,23 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
178172
if req.SliceHash != "" {
179173
session.mutex.Lock()
180174

181-
// 验证分片hash值
182-
if req.SliceNum == 0 { // 第一个分片,slicehash是所有的分片hash
175+
if req.SliceNum == 0 {
183176
hs := strings.Split(req.SliceHash, ",")
184177
if len(hs) != int(session.SliceCnt) {
185178
session.mutex.Unlock()
186179
err := fmt.Errorf("slice hash count mismatch, expected %d, got %d", session.SliceCnt, len(hs))
187180
log.Error("slice hash count mismatch", req, err)
188181
return err
189182
}
190-
session.SliceHash = req.SliceHash // 存储完整的hash字符串
183+
session.SliceHash = req.SliceHash
191184
} else {
192-
// 非第0个分片,不覆盖 SliceHash,保持完整的hash列表
193185
log.Debugf("Slice %d hash: %s (keeping complete hash list)", req.SliceNum, req.SliceHash)
194186
}
195187
session.mutex.Unlock()
196188
}
197189

198-
// 根据存储类型处理分片上传
199190
switch s := storage.(type) {
200191
case driver.ISliceUpload:
201-
// Native slice upload: directly pass stream data, let frontend handle retry and recovery
202192
if err := s.SliceUpload(ctx, session.SliceUpload, req.SliceNum, reader); err != nil {
203193
log.Errorf("Native slice upload failed - TaskID: %s, SliceNum: %d, Error: %v",
204194
req.TaskID, req.SliceNum, err)
@@ -207,13 +197,12 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
207197
log.Debugf("Native slice upload success - TaskID: %s, SliceNum: %d",
208198
req.TaskID, req.SliceNum)
209199

210-
default: //其他网盘先缓存到本地
200+
default:
211201
if err := session.ensureTmpFile(); err != nil {
212202
log.Error("ensureTmpFile error", req, err)
213203
return err
214204
}
215205

216-
// 流式复制,减少内存占用
217206
sw := &sliceWriter{
218207
file: session.tmpFile,
219208
offset: int64(req.SliceNum) * int64(session.SliceSize),
@@ -225,10 +214,9 @@ func (m *SliceUploadManager) UploadSlice(ctx context.Context, storage driver.Dri
225214
}
226215
}
227216

228-
// 原子性更新分片状态
229217
session.mutex.Lock()
230218
tables.SetSliceUploaded(session.SliceUploadStatus, int(req.SliceNum))
231-
updateData := *session.SliceUpload // 复制数据
219+
updateData := *session.SliceUpload
232220
session.mutex.Unlock()
233221

234222
err = db.UpdateSliceUpload(&updateData)
@@ -263,7 +251,6 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
263251
}
264252

265253
defer func() {
266-
// 确保资源清理和缓存删除
267254
session.cleanup()
268255
m.cache.Delete(session.TaskID)
269256

@@ -278,7 +265,6 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
278265
log.Errorf("Failed to update slice upload status: %v", updateErr)
279266
}
280267
} else {
281-
// 上传成功后从数据库中删除记录,允许重复上传
282268
if deleteErr := db.DeleteSliceUploadByTaskID(session.TaskID); deleteErr != nil {
283269
log.Errorf("Failed to delete slice upload record: %v", deleteErr)
284270
}
@@ -293,14 +279,12 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
293279
return nil, err
294280
}
295281

296-
// 原生分片上传成功,直接返回,defer中会删除数据库记录
297282
return &reqres.UploadSliceCompleteResp{
298283
Complete: 1,
299284
TaskID: session.TaskID,
300285
}, nil
301286

302287
default:
303-
// 其他网盘客户端上传到本地后,上传到网盘,使用任务处理
304288
session.mutex.Lock()
305289
tmpFile := session.tmpFile
306290
session.mutex.Unlock()
@@ -330,7 +314,6 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
330314

331315
if session.AsTask {
332316
file.SetTmpFile(tmpFile)
333-
// 防止defer中清理文件
334317
session.mutex.Lock()
335318
session.tmpFile = nil
336319
session.TmpFile = ""
@@ -360,15 +343,12 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
360343
}
361344
}
362345

363-
// ensureTmpFile 确保临时文件存在且正确初始化,线程安全 - 使用持久化目录
364346
func (s *SliceUploadSession) ensureTmpFile() error {
365347
s.mutex.Lock()
366348
defer s.mutex.Unlock()
367349

368350
if s.TmpFile == "" {
369-
// 使用TaskID作为文件名的一部分,确保唯一性和可识别性
370351
filename := fmt.Sprintf("slice_upload_%s_%s", s.TaskID, s.Name)
371-
// 清理文件名中的特殊字符
372352
filename = strings.ReplaceAll(filename, "/", "_")
373353
filename = strings.ReplaceAll(filename, "\\", "_")
374354
filename = strings.ReplaceAll(filename, ":", "_")
@@ -392,7 +372,6 @@ func (s *SliceUploadSession) ensureTmpFile() error {
392372
// 更新数据库中的临时文件路径
393373
if updateErr := db.UpdateSliceUpload(s.SliceUpload); updateErr != nil {
394374
log.Errorf("Failed to update temp file path in database: %v", updateErr)
395-
// 不返回错误,因为文件已经创建成功,只是数据库更新失败
396375
}
397376

398377
log.Debugf("Created persistent temp file: %s", tmpPath)
@@ -410,7 +389,6 @@ func (s *SliceUploadSession) ensureTmpFile() error {
410389
return nil
411390
}
412391

413-
// cleanup 清理资源,线程安全 - 保持原始实现
414392
func (s *SliceUploadSession) cleanup() {
415393
s.mutex.Lock()
416394
defer s.mutex.Unlock()

server/handles/fsup.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ func FsPreup(c *gin.Context) {
238238
return
239239
}
240240

241-
// 基本参数验证
242241
if req.Name == "" {
243242
common.ErrorResp(c, fmt.Errorf("file name is required"), 400)
244243
return
@@ -274,7 +273,6 @@ func FsUpSlice(c *gin.Context) {
274273
}
275274
_ = c.Request.Body.Close()
276275
}()
277-
// 从HTTP头获取参数
278276
taskID := c.GetHeader("X-Task-ID")
279277
if taskID == "" {
280278
common.ErrorResp(c, fmt.Errorf("X-Task-ID header is required"), 400)
@@ -295,14 +293,12 @@ func FsUpSlice(c *gin.Context) {
295293

296294
sliceHash := c.GetHeader("X-Slice-Hash")
297295

298-
// 构建请求对象
299296
req := &reqres.UploadSliceReq{
300297
TaskID: taskID,
301298
SliceHash: sliceHash,
302299
SliceNum: uint(sliceNum),
303300
}
304301

305-
// 获取请求体作为流
306302
reader := c.Request.Body
307303
if reader == nil {
308304
common.ErrorResp(c, fmt.Errorf("request body is required"), 400)
@@ -311,7 +307,6 @@ func FsUpSlice(c *gin.Context) {
311307

312308
storage := c.Request.Context().Value(conf.StorageKey).(driver.Driver)
313309

314-
// 调用流式上传分片函数
315310
err = fs.UploadSlice(c.Request.Context(), storage, req, reader)
316311
if err != nil {
317312
common.ErrorResp(c, fmt.Errorf("upload slice failed: %w", err), 500)

0 commit comments

Comments
 (0)