Skip to content

Commit a58e939

Browse files
committed
feat(upload): Added the function of cleaning up orphan shard upload tasks and optimized temporary file management
1 parent 70fbe17 commit a58e939

File tree

5 files changed

+246
-26
lines changed

5 files changed

+246
-26
lines changed

internal/db/db.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ func Init(d *gorm.DB) {
1717
if err != nil {
1818
log.Fatalf("failed migrate database: %s", err.Error())
1919
}
20+
21+
// 清理启动前遗留的孤儿分片上传任务
22+
if err := CleanupOrphanedSliceUploads(); err != nil {
23+
log.Errorf("Failed to cleanup orphaned slice uploads: %v", err)
24+
}
2025
}
2126

2227
func AutoMigrate(dst ...interface{}) error {

internal/db/slice_upload.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package db
22

33
import (
4+
"io/fs"
5+
"os"
6+
"path/filepath"
7+
"strings"
8+
"time"
9+
410
"github.com/OpenListTeam/OpenList/v4/internal/model/tables"
11+
"github.com/OpenListTeam/OpenList/v4/pkg/tempdir"
512
"github.com/pkg/errors"
613
"gorm.io/gorm"
14+
log "github.com/sirupsen/logrus"
715
)
816

917
func CreateSliceUpload(su *tables.SliceUpload) error {
@@ -58,3 +66,130 @@ func UpdateSliceStatusAtomic(taskID string, sliceNum int, status []byte) error {
5866
return tx.Save(&su).Error
5967
}))
6068
}
69+
70+
// CleanupOrphanedSliceUploads 清理孤儿分片上传记录(启动时调用)
71+
func CleanupOrphanedSliceUploads() error {
72+
// 清理超过24小时的未完成任务
73+
cutoff := time.Now().Add(-24 * time.Hour)
74+
75+
var orphanedTasks []tables.SliceUpload
76+
if err := db.Where("status IN (?, ?) AND updated_at < ?",
77+
tables.SliceUploadStatusWaiting,
78+
tables.SliceUploadStatusUploading,
79+
cutoff).Find(&orphanedTasks).Error; err != nil {
80+
return errors.WithStack(err)
81+
}
82+
83+
cleanedCount := 0
84+
for _, task := range orphanedTasks {
85+
// 清理临时文件
86+
if task.TmpFile != "" {
87+
if err := os.Remove(task.TmpFile); err != nil && !os.IsNotExist(err) {
88+
log.Warnf("Failed to remove orphaned tmp file %s: %v", task.TmpFile, err)
89+
} else if err == nil {
90+
log.Debugf("Removed orphaned tmp file: %s", task.TmpFile)
91+
}
92+
}
93+
94+
// 删除数据库记录
95+
if err := db.Delete(&task).Error; err != nil {
96+
log.Errorf("Failed to delete orphaned slice upload task %s: %v", task.TaskID, err)
97+
} else {
98+
cleanedCount++
99+
}
100+
}
101+
102+
if cleanedCount > 0 {
103+
log.Infof("Cleaned up %d orphaned slice upload tasks", cleanedCount)
104+
}
105+
106+
// 额外清理:扫描临时目录中的孤儿文件
107+
return cleanupOrphanedTempFiles()
108+
}
109+
110+
// cleanupOrphanedTempFiles 清理临时目录中的孤儿文件
111+
func cleanupOrphanedTempFiles() error {
112+
// 获取临时目录路径,使用共享的tempdir包
113+
tempDir := tempdir.GetPersistentTempDir()
114+
115+
// 检查临时目录是否存在
116+
if _, err := os.Stat(tempDir); os.IsNotExist(err) {
117+
log.Debugf("Temp directory does not exist: %s", tempDir)
118+
return nil
119+
}
120+
121+
// 获取所有活跃的分片上传任务的临时文件列表
122+
var activeTasks []tables.SliceUpload
123+
if err := db.Where("tmp_file IS NOT NULL AND tmp_file != '' AND status IN (?, ?)",
124+
tables.SliceUploadStatusWaiting,
125+
tables.SliceUploadStatusUploading).Find(&activeTasks).Error; err != nil {
126+
return errors.WithStack(err)
127+
}
128+
129+
// 构建活跃文件的映射表
130+
activeFiles := make(map[string]bool)
131+
for _, task := range activeTasks {
132+
if task.TmpFile != "" {
133+
activeFiles[task.TmpFile] = true
134+
}
135+
}
136+
137+
cleanedCount := 0
138+
cutoff := time.Now().Add(-24 * time.Hour) // 只清理超过24小时的文件
139+
140+
// 遍历临时目录
141+
err := filepath.WalkDir(tempDir, func(path string, d fs.DirEntry, err error) error {
142+
if err != nil {
143+
log.Warnf("Failed to access path %s: %v", path, err)
144+
return nil // 继续处理其他文件
145+
}
146+
147+
// 跳过目录
148+
if d.IsDir() {
149+
return nil
150+
}
151+
152+
// 只处理分片上传临时文件(以slice_upload_开头)
153+
if !strings.HasPrefix(d.Name(), "slice_upload_") {
154+
return nil
155+
}
156+
157+
// 检查文件是否在活跃任务列表中
158+
if activeFiles[path] {
159+
return nil // 文件仍在使用中,跳过
160+
}
161+
162+
// 检查文件修改时间
163+
info, err := d.Info()
164+
if err != nil {
165+
log.Warnf("Failed to get file info for %s: %v", path, err)
166+
return nil
167+
}
168+
169+
// 只清理超过24小时的文件
170+
if info.ModTime().After(cutoff) {
171+
return nil
172+
}
173+
174+
// 删除孤儿文件
175+
if err := os.Remove(path); err != nil {
176+
log.Warnf("Failed to remove orphaned temp file %s: %v", path, err)
177+
} else {
178+
log.Debugf("Removed orphaned temp file: %s", path)
179+
cleanedCount++
180+
}
181+
182+
return nil
183+
})
184+
185+
if err != nil {
186+
log.Errorf("Failed to walk temp directory %s: %v", tempDir, err)
187+
return errors.WithStack(err)
188+
}
189+
190+
if cleanedCount > 0 {
191+
log.Infof("Cleaned up %d orphaned temp files from %s", cleanedCount, tempDir)
192+
}
193+
194+
return nil
195+
}

internal/fs/sliceup.go

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"mime/multipart"
77
"os"
8+
"path/filepath"
89
"strings"
910
"sync"
1011
"time"
@@ -18,6 +19,7 @@ import (
1819
"github.com/OpenListTeam/OpenList/v4/internal/op"
1920
"github.com/OpenListTeam/OpenList/v4/internal/stream"
2021
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
22+
"github.com/OpenListTeam/OpenList/v4/pkg/tempdir"
2123
"github.com/google/uuid"
2224
"github.com/pkg/errors"
2325
"gorm.io/gorm"
@@ -39,12 +41,9 @@ type SliceUploadSession struct {
3941

4042
// NewSliceUploadManager 创建分片上传管理器
4143
func NewSliceUploadManager() *SliceUploadManager {
42-
tempDir := os.TempDir() // 默认使用系统临时目录
43-
if conf.Conf != nil && conf.Conf.TempDir != "" {
44-
tempDir = conf.Conf.TempDir
45-
}
44+
tempDirPath := tempdir.GetPersistentTempDir()
4645
return &SliceUploadManager{
47-
tempDir: tempDir,
46+
tempDir: tempDirPath,
4847
}
4948
}
5049

@@ -74,14 +73,40 @@ func (m *SliceUploadManager) CreateSession(ctx context.Context, storage driver.D
7473
}
7574

7675
if su.ID != 0 { // 找到未完成的上传任务,支持断点续传
77-
session := &SliceUploadSession{SliceUpload: su}
78-
m.cache.Store(su.TaskID, session)
79-
return &reqres.PreupResp{
80-
TaskID: su.TaskID,
81-
SliceSize: su.SliceSize,
82-
SliceCnt: su.SliceCnt,
83-
SliceUploadStatus: su.SliceUploadStatus,
84-
}, nil
76+
// 验证临时文件是否仍然存在(重启后可能被清理)
77+
if su.TmpFile != "" {
78+
if _, err := os.Stat(su.TmpFile); os.IsNotExist(err) {
79+
// 临时文件丢失,清理数据库记录,重新开始
80+
log.Warnf("Temporary file lost after restart, cleaning up task: %s", su.TaskID)
81+
if deleteErr := db.DeleteSliceUploadByTaskID(su.TaskID); deleteErr != nil {
82+
log.Errorf("Failed to delete lost slice upload task: %v", deleteErr)
83+
}
84+
// 继续创建新任务
85+
} else {
86+
// 临时文件存在,可以继续断点续传
87+
session := &SliceUploadSession{SliceUpload: su}
88+
m.cache.Store(su.TaskID, session)
89+
log.Infof("Resuming slice upload after restart: %s, completed slices: %d/%d",
90+
su.TaskID, tables.CountUploadedSlices(su.SliceUploadStatus), su.SliceCnt)
91+
return &reqres.PreupResp{
92+
TaskID: su.TaskID,
93+
SliceSize: su.SliceSize,
94+
SliceCnt: su.SliceCnt,
95+
SliceUploadStatus: su.SliceUploadStatus,
96+
}, nil
97+
}
98+
} else {
99+
// 原生分片上传(如123open/baidu),无需临时文件
100+
session := &SliceUploadSession{SliceUpload: su}
101+
m.cache.Store(su.TaskID, session)
102+
log.Infof("Resuming native slice upload after restart: %s", su.TaskID)
103+
return &reqres.PreupResp{
104+
TaskID: su.TaskID,
105+
SliceSize: su.SliceSize,
106+
SliceCnt: su.SliceCnt,
107+
SliceUploadStatus: su.SliceUploadStatus,
108+
}, nil
109+
}
85110
}
86111

87112
srcobj, err := op.Get(ctx, storage, actualPath)
@@ -419,39 +444,48 @@ func (m *SliceUploadManager) CompleteUpload(ctx context.Context, storage driver.
419444
}
420445
}
421446

422-
// ensureTmpFile 确保临时文件存在且正确初始化,线程安全 - 保持原始实现
447+
// ensureTmpFile 确保临时文件存在且正确初始化,线程安全 - 使用持久化目录
423448
func (s *SliceUploadSession) ensureTmpFile() error {
424449
s.mutex.Lock()
425450
defer s.mutex.Unlock()
426451

427452
if s.TmpFile == "" {
428-
tempDir := os.TempDir() // 默认使用系统临时目录
429-
if conf.Conf != nil && conf.Conf.TempDir != "" {
430-
tempDir = conf.Conf.TempDir
431-
}
432-
tf, err := os.CreateTemp(tempDir, "file-*")
453+
tempDirPath := tempdir.GetPersistentTempDir()
454+
455+
// 使用TaskID作为文件名的一部分,确保唯一性和可识别性
456+
filename := fmt.Sprintf("slice_upload_%s_%s", s.TaskID, s.Name)
457+
// 清理文件名中的特殊字符
458+
filename = strings.ReplaceAll(filename, "/", "_")
459+
filename = strings.ReplaceAll(filename, "\\", "_")
460+
filename = strings.ReplaceAll(filename, ":", "_")
461+
462+
tmpPath := filepath.Join(tempDirPath, filename)
463+
464+
tf, err := os.OpenFile(tmpPath, os.O_CREATE|os.O_RDWR, 0644)
433465
if err != nil {
434-
return fmt.Errorf("CreateTemp error: %w", err)
466+
return fmt.Errorf("create persistent temp file error: %w", err)
435467
}
436468

437-
abspath := tf.Name() //这里返回的是绝对路径
438-
if err = os.Truncate(abspath, int64(s.Size)); err != nil {
469+
if err = os.Truncate(tmpPath, int64(s.Size)); err != nil {
439470
tf.Close() // 确保文件被关闭
440-
os.Remove(abspath) // 清理文件
441-
return fmt.Errorf("Truncate error: %w", err)
471+
os.Remove(tmpPath) // 清理文件
472+
return fmt.Errorf("truncate persistent temp file error: %w", err)
442473
}
443474

444-
s.TmpFile = abspath
475+
s.TmpFile = tmpPath
445476
s.tmpFile = tf
477+
478+
log.Debugf("Created persistent temp file: %s", tmpPath)
446479
return nil
447480
}
448481

449482
if s.tmpFile == nil {
450483
var err error
451484
s.tmpFile, err = os.OpenFile(s.TmpFile, os.O_RDWR, 0644)
452485
if err != nil {
453-
return fmt.Errorf("OpenFile error: %w", err)
486+
return fmt.Errorf("reopen persistent temp file error: %w", err)
454487
}
488+
log.Debugf("Reopened persistent temp file: %s", s.TmpFile)
455489
}
456490
return nil
457491
}

internal/model/tables/slice_upload.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,14 @@ func IsAllSliceUploaded(status []byte, sliceCnt uint) bool {
6060
}
6161
return true
6262
}
63+
64+
// CountUploadedSlices 统计已上传的分片数量
65+
func CountUploadedSlices(status []byte) uint {
66+
count := uint(0)
67+
for i := 0; i < len(status)*8; i++ {
68+
if status[i/8]&(1<<(i%8)) != 0 {
69+
count++
70+
}
71+
}
72+
return count
73+
}

pkg/tempdir/tempdir.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package tempdir
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"github.com/OpenListTeam/OpenList/v4/internal/conf"
7+
)
8+
9+
// GetPersistentTempDir 获取持久化临时目录
10+
// 这个函数被多个包共享使用,避免代码重复
11+
func GetPersistentTempDir() string {
12+
var tempDir string
13+
14+
// 优先使用配置的临时目录
15+
if conf.Conf != nil && conf.Conf.TempDir != "" {
16+
tempDir = conf.Conf.TempDir
17+
} else {
18+
// 使用数据目录下的slice_temp子目录
19+
if conf.Conf != nil && conf.Conf.Database.DBFile != "" {
20+
// 从数据库文件路径推断数据目录
21+
dataDir := filepath.Dir(conf.Conf.Database.DBFile)
22+
tempDir = filepath.Join(dataDir, "slice_temp")
23+
} else {
24+
// fallback到当前工作目录下的slice_temp
25+
if wd, err := os.Getwd(); err == nil {
26+
tempDir = filepath.Join(wd, "slice_temp")
27+
} else {
28+
// 最后的fallback
29+
tempDir = filepath.Join(os.TempDir(), "openlist_slice_temp")
30+
}
31+
}
32+
}
33+
34+
return tempDir
35+
}

0 commit comments

Comments
 (0)