Skip to content

Commit e93ab76

Browse files
Seven66677731Copilotj2rong4cn
authored
feat(task-group): introduce TaskGroupCoordinator for coordinated task execution (#721)
* feat(task): add task hook,batch task refactor(move): move use CopyTask * Update internal/task/batch_task/refresh.go Co-authored-by: Copilot <[email protected]> Signed-off-by: Seven <[email protected]> * fix: upload task allFinish judge * Update internal/task/batch_task/refresh.go Co-authored-by: Copilot <[email protected]> Signed-off-by: Seven <[email protected]> * feat: enhance concurrency safety * 优化代码 * 解压缩 * 修复死锁 * refactor(move): move as task * 重构,优化 * . * 优化,修复bug * . * 修复bug * feat: add task retry judge * 代理Task.SetState函数来判断Task的生命周期 * chore: use OnSucceeded、OnFailed、OnBeforeRetry functions * 优化 * 优化,去除重复代码 * . * 优化 * . * webdav * Revert "fix(fs):After the file is copied or moved, flush the cache of the directory that was copied or moved to." This reverts commit 5f03edd. --------- Signed-off-by: Seven <[email protected]> Co-authored-by: Copilot <[email protected]> Co-authored-by: j2rong4cn <[email protected]>
1 parent a9f02ec commit e93ab76

File tree

26 files changed

+726
-1214
lines changed

26 files changed

+726
-1214
lines changed

drivers/alias/driver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ func (d *Alias) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
193193
}
194194
if len(srcPath) == len(dstPath) {
195195
for i := range srcPath {
196-
err = errors.Join(err, fs.Move(ctx, *srcPath[i], *dstPath[i]))
196+
_, e := fs.Move(ctx, *srcPath[i], *dstPath[i])
197+
err = errors.Join(err, e)
197198
}
198199
return err
199200
} else {

drivers/doubao/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"math/rand"
1515
"net/http"
1616
"net/url"
17-
"path/filepath"
17+
stdpath "path"
1818
"sort"
1919
"strconv"
2020
"strings"
@@ -353,7 +353,7 @@ func (d *Doubao) getUploadConfig(upConfig *UploadConfig, dataType string, file m
353353
"ServiceId": d.UploadToken.Alice[dataType].ServiceID,
354354
"NeedFallback": "true",
355355
"FileSize": strconv.FormatInt(file.GetSize(), 10),
356-
"FileExtension": filepath.Ext(file.GetName()),
356+
"FileExtension": stdpath.Ext(file.GetName()),
357357
"s": randomString(),
358358
}
359359
}

internal/bootstrap/task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ func InitTaskManager() {
2222
op.RegisterSettingChangingCallback(func() {
2323
fs.UploadTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskUploadThreadsNum, conf.Conf.Tasks.Upload.Workers)))
2424
})
25-
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
25+
fs.CopyTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
2626
op.RegisterSettingChangingCallback(func() {
2727
fs.CopyTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskCopyThreadsNum, conf.Conf.Tasks.Copy.Workers)))
2828
})
29-
fs.MoveTaskManager = tache.NewManager[*fs.MoveTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry))
29+
fs.MoveTaskManager = tache.NewManager[*fs.FileTransferTask](tache.WithWorks(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)), tache.WithPersistFunction(db.GetTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant), db.UpdateTaskDataFunc("move", conf.Conf.Tasks.Move.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Move.MaxRetry))
3030
op.RegisterSettingChangingCallback(func() {
3131
fs.MoveTaskManager.SetWorkersNumActive(taskFilterNegative(setting.GetInt(conf.TaskMoveThreadsNum, conf.Conf.Tasks.Move.Workers)))
3232
})

internal/fs/archive.go

Lines changed: 87 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@ import (
66
"fmt"
77
"io"
88
"math/rand"
9-
"mime"
109
"os"
1110
stdpath "path"
1211
"path/filepath"
13-
"strconv"
1412
"strings"
1513
"time"
1614

@@ -21,30 +19,22 @@ import (
2119
"github.com/OpenListTeam/OpenList/v4/internal/op"
2220
"github.com/OpenListTeam/OpenList/v4/internal/stream"
2321
"github.com/OpenListTeam/OpenList/v4/internal/task"
22+
"github.com/OpenListTeam/OpenList/v4/internal/task_group"
23+
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
24+
"github.com/OpenListTeam/OpenList/v4/server/common"
2425
"github.com/OpenListTeam/tache"
2526
"github.com/pkg/errors"
2627
log "github.com/sirupsen/logrus"
2728
)
2829

2930
type ArchiveDownloadTask struct {
30-
task.TaskExtension
31+
TaskData
3132
model.ArchiveDecompressArgs
32-
status string
33-
SrcObjPath string
34-
DstDirPath string
35-
srcStorage driver.Driver
36-
dstStorage driver.Driver
37-
SrcStorageMp string
38-
DstStorageMp string
3933
}
4034

4135
func (t *ArchiveDownloadTask) GetName() string {
42-
return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcObjPath,
43-
t.InnerPath, t.DstStorageMp, t.DstDirPath, t.Password)
44-
}
45-
46-
func (t *ArchiveDownloadTask) GetStatus() string {
47-
return t.status
36+
return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcActualPath,
37+
t.InnerPath, t.DstStorageMp, t.DstActualPath, t.Password)
4838
}
4939

5040
func (t *ArchiveDownloadTask) Run() error {
@@ -58,16 +48,21 @@ func (t *ArchiveDownloadTask) Run() error {
5848
if err != nil {
5949
return err
6050
}
51+
uploadTask.groupID = stdpath.Join(uploadTask.DstStorageMp, uploadTask.DstActualPath)
52+
task_group.TransferCoordinator.AddTask(uploadTask.groupID, nil)
6153
ArchiveContentUploadTaskManager.Add(uploadTask)
6254
return nil
6355
}
6456

6557
func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) {
6658
var err error
67-
if t.srcStorage == nil {
68-
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
59+
if t.SrcStorage == nil {
60+
t.SrcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
61+
if err != nil {
62+
return nil, err
63+
}
6964
}
70-
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.srcStorage, t.SrcObjPath, model.LinkArgs{})
65+
srcObj, tool, ss, err := op.GetArchiveToolAndStream(t.Ctx(), t.SrcStorage, t.SrcActualPath, model.LinkArgs{})
7166
if err != nil {
7267
return nil, err
7368
}
@@ -87,7 +82,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
8782
total += s.GetSize()
8883
}
8984
t.SetTotalBytes(total)
90-
t.status = "getting src object"
85+
t.Status = "getting src object"
9186
for _, s := range ss {
9287
if s.GetFile() == nil {
9388
_, err = stream.CacheFullInTempFileAndWriter(s, func(p float64) {
@@ -104,7 +99,7 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
10499
} else {
105100
decompressUp = t.SetProgress
106101
}
107-
t.status = "walking and decompressing"
102+
t.Status = "walking and decompressing"
108103
dir, err := os.MkdirTemp(conf.Conf.TempDir, "dir-*")
109104
if err != nil {
110105
return nil, err
@@ -117,13 +112,14 @@ func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadT
117112
uploadTask := &ArchiveContentUploadTask{
118113
TaskExtension: task.TaskExtension{
119114
Creator: t.GetCreator(),
115+
ApiUrl: t.ApiUrl,
120116
},
121-
ObjName: baseName,
122-
InPlace: !t.PutIntoNewDir,
123-
FilePath: dir,
124-
DstDirPath: t.DstDirPath,
125-
dstStorage: t.dstStorage,
126-
DstStorageMp: t.DstStorageMp,
117+
ObjName: baseName,
118+
InPlace: !t.PutIntoNewDir,
119+
FilePath: dir,
120+
DstActualPath: t.DstActualPath,
121+
dstStorage: t.DstStorage,
122+
DstStorageMp: t.DstStorageMp,
127123
}
128124
return uploadTask, nil
129125
}
@@ -132,18 +128,19 @@ var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask]
132128

133129
type ArchiveContentUploadTask struct {
134130
task.TaskExtension
135-
status string
136-
ObjName string
137-
InPlace bool
138-
FilePath string
139-
DstDirPath string
140-
dstStorage driver.Driver
141-
DstStorageMp string
142-
finalized bool
131+
status string
132+
ObjName string
133+
InPlace bool
134+
FilePath string
135+
DstActualPath string
136+
dstStorage driver.Driver
137+
DstStorageMp string
138+
finalized bool
139+
groupID string
143140
}
144141

145142
func (t *ArchiveContentUploadTask) GetName() string {
146-
return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstDirPath)
143+
return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstActualPath)
147144
}
148145

149146
func (t *ArchiveContentUploadTask) GetStatus() string {
@@ -163,21 +160,42 @@ func (t *ArchiveContentUploadTask) Run() error {
163160
})
164161
}
165162

166-
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *ArchiveContentUploadTask) error) error {
163+
func (t *ArchiveContentUploadTask) OnSucceeded() {
164+
task_group.TransferCoordinator.Done(t.groupID, true)
165+
}
166+
167+
func (t *ArchiveContentUploadTask) OnFailed() {
168+
task_group.TransferCoordinator.Done(t.groupID, false)
169+
}
170+
171+
func (t *ArchiveContentUploadTask) SetRetry(retry int, maxRetry int) {
172+
t.TaskExtension.SetRetry(retry, maxRetry)
173+
if retry == 0 &&
174+
(len(t.groupID) == 0 || // 重启恢复
175+
(t.GetErr() == nil && t.GetState() != tache.StatePending)) { // 手动重试
176+
t.groupID = stdpath.Join(t.DstStorageMp, t.DstActualPath)
177+
task_group.TransferCoordinator.AddTask(t.groupID, nil)
178+
}
179+
}
180+
181+
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTask *ArchiveContentUploadTask) error) error {
167182
var err error
168183
if t.dstStorage == nil {
169184
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
185+
if err != nil {
186+
return err
187+
}
170188
}
171189
info, err := os.Stat(t.FilePath)
172190
if err != nil {
173191
return err
174192
}
175193
if info.IsDir() {
176194
t.status = "src object is dir, listing objs"
177-
nextDstPath := t.DstDirPath
195+
nextDstActualPath := t.DstActualPath
178196
if !t.InPlace {
179-
nextDstPath = stdpath.Join(nextDstPath, t.ObjName)
180-
err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstPath)
197+
nextDstActualPath = stdpath.Join(nextDstActualPath, t.ObjName)
198+
err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstActualPath)
181199
if err != nil {
182200
return err
183201
}
@@ -186,6 +204,9 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
186204
if err != nil {
187205
return err
188206
}
207+
if !t.InPlace && len(t.groupID) > 0 {
208+
task_group.TransferCoordinator.AppendPayload(t.groupID, task_group.DstPathToRefresh(nextDstActualPath))
209+
}
189210
var es error
190211
for _, entry := range entries {
191212
var nextFilePath string
@@ -198,16 +219,21 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
198219
es = stderrors.Join(es, err)
199220
continue
200221
}
222+
if len(t.groupID) > 0 {
223+
task_group.TransferCoordinator.AddTask(t.groupID, nil)
224+
}
201225
err = f(&ArchiveContentUploadTask{
202226
TaskExtension: task.TaskExtension{
203227
Creator: t.GetCreator(),
228+
ApiUrl: t.ApiUrl,
204229
},
205-
ObjName: entry.Name(),
206-
InPlace: false,
207-
FilePath: nextFilePath,
208-
DstDirPath: nextDstPath,
209-
dstStorage: t.dstStorage,
210-
DstStorageMp: t.DstStorageMp,
230+
ObjName: entry.Name(),
231+
InPlace: false,
232+
FilePath: nextFilePath,
233+
DstActualPath: nextDstActualPath,
234+
dstStorage: t.dstStorage,
235+
DstStorageMp: t.DstStorageMp,
236+
groupID: t.groupID,
211237
})
212238
if err != nil {
213239
es = stderrors.Join(es, err)
@@ -228,13 +254,13 @@ func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *Archi
228254
Size: info.Size(),
229255
Modified: time.Now(),
230256
},
231-
Mimetype: mime.TypeByExtension(filepath.Ext(t.ObjName)),
257+
Mimetype: utils.GetMimeType(stdpath.Ext(t.ObjName)),
232258
WebPutAsTask: true,
233259
Reader: file,
234260
}
235261
fs.Closers.Add(file)
236262
t.status = "uploading"
237-
err = op.Put(t.Ctx(), t.dstStorage, t.DstDirPath, fs, t.SetProgress, true)
263+
err = op.Put(t.Ctx(), t.dstStorage, t.DstActualPath, fs, t.SetProgress, true)
238264
if err != nil {
239265
return err
240266
}
@@ -271,8 +297,9 @@ func moveToTempPath(path, prefix string) (string, error) {
271297

272298
func genTempFileName(prefix string) (string, error) {
273299
retry := 0
300+
t := time.Now().UnixMilli()
274301
for retry < 10000 {
275-
newPath := stdpath.Join(conf.Conf.TempDir, prefix+strconv.FormatUint(uint64(rand.Uint32()), 10))
302+
newPath := filepath.Join(conf.Conf.TempDir, prefix+fmt.Sprintf("%x-%x", t, rand.Uint32()))
276303
if _, err := os.Stat(newPath); err != nil {
277304
if os.IsNotExist(err) {
278305
return newPath, nil
@@ -354,16 +381,19 @@ func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args
354381
}
355382
taskCreator, _ := ctx.Value(conf.UserKey).(*model.User)
356383
tsk := &ArchiveDownloadTask{
357-
TaskExtension: task.TaskExtension{
358-
Creator: taskCreator,
384+
TaskData: TaskData{
385+
TaskExtension: task.TaskExtension{
386+
Creator: taskCreator,
387+
ApiUrl: common.GetApiUrl(ctx),
388+
},
389+
SrcStorage: srcStorage,
390+
DstStorage: dstStorage,
391+
SrcActualPath: srcObjActualPath,
392+
DstActualPath: dstDirActualPath,
393+
SrcStorageMp: srcStorage.GetStorage().MountPath,
394+
DstStorageMp: dstStorage.GetStorage().MountPath,
359395
},
360396
ArchiveDecompressArgs: args,
361-
srcStorage: srcStorage,
362-
dstStorage: dstStorage,
363-
SrcObjPath: srcObjActualPath,
364-
DstDirPath: dstDirActualPath,
365-
SrcStorageMp: srcStorage.GetStorage().MountPath,
366-
DstStorageMp: dstStorage.GetStorage().MountPath,
367397
}
368398
if ctx.Value(conf.NoTaskKey) != nil {
369399
uploadTask, err := tsk.RunWithoutPushUploadTask()

0 commit comments

Comments
 (0)