Skip to content

Commit b0dbbeb

Browse files
TwoOnefourxrgzsILoveScratch2
authored
feat(drivers): add Teldrive driver (#1116)
https://github.com/tgdrive/teldrive https://teldrive-docs.pages.dev/docs/api 实现: * copy * move * link (302 share and local proxy) * chunked uploads * rename 未实现: - openlist扫码登陆 - refresh token OpenListTeam/OpenList-Docs#155 * feat(Teldrive): Add driver Teldrive * fix(teldrive): force webproxy and memory optimized * chore(teldrive): go fmt * chore(teldrive): remove TODO * chore(teldrive): organize code * feat(teldrive): add UseShareLink option and support 302 * fix(teldrive): standardize API path construction * fix(teldrive): trim trailing slash from Address in Init method * chore(teldrive): update help text for UseShareLink field in Addition struct * fix(teldrive): set 10 MiB as default chunk size --------- Co-authored-by: MadDogOwner <[email protected]> Co-authored-by: ILoveScratch <[email protected]>
1 parent 0c27b4b commit b0dbbeb

File tree

7 files changed

+940
-0
lines changed

7 files changed

+940
-0
lines changed

drivers/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import (
6161
_ "github.com/OpenListTeam/OpenList/v4/drivers/smb"
6262
_ "github.com/OpenListTeam/OpenList/v4/drivers/strm"
6363
_ "github.com/OpenListTeam/OpenList/v4/drivers/teambition"
64+
_ "github.com/OpenListTeam/OpenList/v4/drivers/teldrive"
6465
_ "github.com/OpenListTeam/OpenList/v4/drivers/terabox"
6566
_ "github.com/OpenListTeam/OpenList/v4/drivers/thunder"
6667
_ "github.com/OpenListTeam/OpenList/v4/drivers/thunder_browser"

drivers/teldrive/copy.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package teldrive
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
7+
"github.com/OpenListTeam/OpenList/v4/drivers/base"
8+
"github.com/OpenListTeam/OpenList/v4/internal/model"
9+
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
10+
"github.com/go-resty/resty/v2"
11+
"golang.org/x/net/context"
12+
"golang.org/x/sync/errgroup"
13+
"golang.org/x/sync/semaphore"
14+
)
15+
16+
func NewCopyManager(ctx context.Context, concurrent int, d *Teldrive) *CopyManager {
17+
g, ctx := errgroup.WithContext(ctx)
18+
19+
return &CopyManager{
20+
TaskChan: make(chan CopyTask, concurrent*2),
21+
Sem: semaphore.NewWeighted(int64(concurrent)),
22+
G: g,
23+
Ctx: ctx,
24+
d: d,
25+
}
26+
}
27+
28+
func (cm *CopyManager) startWorkers() {
29+
workerCount := cap(cm.TaskChan) / 2
30+
for i := 0; i < workerCount; i++ {
31+
cm.G.Go(func() error {
32+
return cm.worker()
33+
})
34+
}
35+
}
36+
37+
func (cm *CopyManager) worker() error {
38+
for {
39+
select {
40+
case task, ok := <-cm.TaskChan:
41+
if !ok {
42+
return nil
43+
}
44+
45+
if err := cm.Sem.Acquire(cm.Ctx, 1); err != nil {
46+
return err
47+
}
48+
49+
var err error
50+
51+
err = cm.processFile(task)
52+
53+
cm.Sem.Release(1)
54+
55+
if err != nil {
56+
return fmt.Errorf("task processing failed: %w", err)
57+
}
58+
59+
case <-cm.Ctx.Done():
60+
return cm.Ctx.Err()
61+
}
62+
}
63+
}
64+
65+
func (cm *CopyManager) generateTasks(ctx context.Context, srcObj, dstDir model.Obj) error {
66+
if srcObj.IsDir() {
67+
return cm.generateFolderTasks(ctx, srcObj, dstDir)
68+
} else {
69+
// add single file task directly
70+
select {
71+
case cm.TaskChan <- CopyTask{SrcObj: srcObj, DstDir: dstDir}:
72+
return nil
73+
case <-ctx.Done():
74+
return ctx.Err()
75+
}
76+
}
77+
}
78+
79+
func (cm *CopyManager) generateFolderTasks(ctx context.Context, srcDir, dstDir model.Obj) error {
80+
objs, err := cm.d.List(ctx, srcDir, model.ListArgs{})
81+
if err != nil {
82+
return fmt.Errorf("failed to list directory %s: %w", srcDir.GetPath(), err)
83+
}
84+
85+
err = cm.d.MakeDir(cm.Ctx, dstDir, srcDir.GetName())
86+
if err != nil || len(objs) == 0 {
87+
return err
88+
}
89+
newDstDir := &model.Object{
90+
ID: dstDir.GetID(),
91+
Path: dstDir.GetPath() + "/" + srcDir.GetName(),
92+
Name: srcDir.GetName(),
93+
IsFolder: true,
94+
}
95+
96+
for _, file := range objs {
97+
if utils.IsCanceled(ctx) {
98+
return ctx.Err()
99+
}
100+
101+
srcFile := &model.Object{
102+
ID: file.GetID(),
103+
Path: srcDir.GetPath() + "/" + file.GetName(),
104+
Name: file.GetName(),
105+
IsFolder: file.IsDir(),
106+
}
107+
108+
// 递归生成任务
109+
if err := cm.generateTasks(ctx, srcFile, newDstDir); err != nil {
110+
return err
111+
}
112+
}
113+
114+
return nil
115+
}
116+
117+
func (cm *CopyManager) processFile(task CopyTask) error {
118+
return cm.copySingleFile(cm.Ctx, task.SrcObj, task.DstDir)
119+
}
120+
121+
func (cm *CopyManager) copySingleFile(ctx context.Context, srcObj, dstDir model.Obj) error {
122+
// `override copy mode` should delete the existing file
123+
if obj, err := cm.d.getFile(dstDir.GetPath(), srcObj.GetName(), srcObj.IsDir()); err == nil {
124+
if err := cm.d.Remove(ctx, obj); err != nil {
125+
return fmt.Errorf("failed to remove existing file: %w", err)
126+
}
127+
}
128+
129+
// Do copy
130+
return cm.d.request(http.MethodPost, "/api/files/{id}/copy", func(req *resty.Request) {
131+
req.SetPathParam("id", srcObj.GetID())
132+
req.SetBody(base.Json{
133+
"newName": srcObj.GetName(),
134+
"destination": dstDir.GetPath(),
135+
})
136+
}, nil)
137+
}

drivers/teldrive/driver.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
package teldrive
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"net/http"
8+
"net/url"
9+
"strings"
10+
11+
"github.com/OpenListTeam/OpenList/v4/drivers/base"
12+
"github.com/OpenListTeam/OpenList/v4/internal/driver"
13+
"github.com/OpenListTeam/OpenList/v4/internal/errs"
14+
"github.com/OpenListTeam/OpenList/v4/internal/model"
15+
"github.com/OpenListTeam/OpenList/v4/internal/op"
16+
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
17+
"github.com/go-resty/resty/v2"
18+
"github.com/google/uuid"
19+
)
20+
21+
type Teldrive struct {
22+
model.Storage
23+
Addition
24+
}
25+
26+
func (d *Teldrive) Config() driver.Config {
27+
return config
28+
}
29+
30+
func (d *Teldrive) GetAddition() driver.Additional {
31+
return &d.Addition
32+
}
33+
34+
func (d *Teldrive) Init(ctx context.Context) error {
35+
d.Address = strings.TrimSuffix(d.Address, "/")
36+
if d.Cookie == "" || !strings.HasPrefix(d.Cookie, "access_token=") {
37+
return fmt.Errorf("cookie must start with 'access_token='")
38+
}
39+
if d.UploadConcurrency == 0 {
40+
d.UploadConcurrency = 4
41+
}
42+
if d.ChunkSize == 0 {
43+
d.ChunkSize = 10
44+
}
45+
46+
op.MustSaveDriverStorage(d)
47+
return nil
48+
}
49+
50+
func (d *Teldrive) Drop(ctx context.Context) error {
51+
return nil
52+
}
53+
54+
func (d *Teldrive) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
55+
var listResp ListResp
56+
err := d.request(http.MethodGet, "/api/files", func(req *resty.Request) {
57+
req.SetQueryParams(map[string]string{
58+
"path": dir.GetPath(),
59+
"limit": "1000", // overide default 500, TODO pagination
60+
})
61+
}, &listResp)
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
return utils.SliceConvert(listResp.Items, func(src Object) (model.Obj, error) {
67+
return &model.Object{
68+
ID: src.ID,
69+
Name: src.Name,
70+
Size: func() int64 {
71+
if src.Type == "folder" {
72+
return 0
73+
}
74+
return src.Size
75+
}(),
76+
IsFolder: src.Type == "folder",
77+
Modified: src.UpdatedAt,
78+
}, nil
79+
})
80+
}
81+
82+
func (d *Teldrive) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
83+
if d.UseShareLink {
84+
shareObj, err := d.getShareFileById(file.GetID())
85+
if err != nil || shareObj == nil {
86+
if err := d.createShareFile(file.GetID()); err != nil {
87+
return nil, err
88+
}
89+
shareObj, err = d.getShareFileById(file.GetID())
90+
if err != nil {
91+
return nil, err
92+
}
93+
}
94+
return &model.Link{
95+
URL: d.Address + "/api/shares/" + url.PathEscape(shareObj.Id) + "/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()),
96+
}, nil
97+
}
98+
return &model.Link{
99+
URL: d.Address + "/api/files/" + url.PathEscape(file.GetID()) + "/" + url.PathEscape(file.GetName()),
100+
Header: http.Header{
101+
"Cookie": {d.Cookie},
102+
},
103+
}, nil
104+
}
105+
106+
func (d *Teldrive) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) error {
107+
return d.request(http.MethodPost, "/api/files/mkdir", func(req *resty.Request) {
108+
req.SetBody(map[string]interface{}{
109+
"path": parentDir.GetPath() + "/" + dirName,
110+
})
111+
}, nil)
112+
}
113+
114+
func (d *Teldrive) Move(ctx context.Context, srcObj, dstDir model.Obj) error {
115+
body := base.Json{
116+
"ids": []string{srcObj.GetID()},
117+
"destinationParent": dstDir.GetID(),
118+
}
119+
return d.request(http.MethodPost, "/api/files/move", func(req *resty.Request) {
120+
req.SetBody(body)
121+
}, nil)
122+
}
123+
124+
func (d *Teldrive) Rename(ctx context.Context, srcObj model.Obj, newName string) error {
125+
body := base.Json{
126+
"name": newName,
127+
}
128+
return d.request(http.MethodPatch, "/api/files/{id}", func(req *resty.Request) {
129+
req.SetPathParam("id", srcObj.GetID())
130+
req.SetBody(body)
131+
}, nil)
132+
}
133+
134+
func (d *Teldrive) Copy(ctx context.Context, srcObj, dstDir model.Obj) error {
135+
copyConcurrentLimit := 4
136+
copyManager := NewCopyManager(ctx, copyConcurrentLimit, d)
137+
copyManager.startWorkers()
138+
copyManager.G.Go(func() error {
139+
defer close(copyManager.TaskChan)
140+
return copyManager.generateTasks(ctx, srcObj, dstDir)
141+
})
142+
return copyManager.G.Wait()
143+
}
144+
145+
func (d *Teldrive) Remove(ctx context.Context, obj model.Obj) error {
146+
body := base.Json{
147+
"ids": []string{obj.GetID()},
148+
}
149+
return d.request(http.MethodPost, "/api/files/delete", func(req *resty.Request) {
150+
req.SetBody(body)
151+
}, nil)
152+
}
153+
154+
func (d *Teldrive) Put(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress) error {
155+
fileId := uuid.New().String()
156+
chunkSizeInMB := d.ChunkSize
157+
chunkSize := chunkSizeInMB * 1024 * 1024 // Convert MB to bytes
158+
totalSize := file.GetSize()
159+
totalParts := int(math.Ceil(float64(totalSize) / float64(chunkSize)))
160+
maxRetried := 3
161+
162+
// delete the upload task when finished or failed
163+
defer func() {
164+
_ = d.request(http.MethodDelete, "/api/uploads/{id}", func(req *resty.Request) {
165+
req.SetPathParam("id", fileId)
166+
}, nil)
167+
}()
168+
169+
if obj, err := d.getFile(dstDir.GetPath(), file.GetName(), file.IsDir()); err == nil {
170+
if err = d.Remove(ctx, obj); err != nil {
171+
return err
172+
}
173+
}
174+
// start the upload process
175+
if err := d.request(http.MethodGet, "/api/uploads/fileId", func(req *resty.Request) {
176+
req.SetPathParam("id", fileId)
177+
}, nil); err != nil {
178+
return err
179+
}
180+
if totalSize == 0 {
181+
return d.touch(file.GetName(), dstDir.GetPath())
182+
}
183+
184+
if totalParts <= 1 {
185+
return d.doSingleUpload(ctx, dstDir, file, up, totalParts, chunkSize, fileId)
186+
}
187+
188+
return d.doMultiUpload(ctx, dstDir, file, up, maxRetried, totalParts, chunkSize, fileId)
189+
}
190+
191+
func (d *Teldrive) GetArchiveMeta(ctx context.Context, obj model.Obj, args model.ArchiveArgs) (model.ArchiveMeta, error) {
192+
// TODO get archive file meta-info, return errs.NotImplement to use an internal archive tool, optional
193+
return nil, errs.NotImplement
194+
}
195+
196+
func (d *Teldrive) ListArchive(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) ([]model.Obj, error) {
197+
// TODO list args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional
198+
return nil, errs.NotImplement
199+
}
200+
201+
func (d *Teldrive) Extract(ctx context.Context, obj model.Obj, args model.ArchiveInnerArgs) (*model.Link, error) {
202+
// TODO return link of file args.InnerPath in the archive obj, return errs.NotImplement to use an internal archive tool, optional
203+
return nil, errs.NotImplement
204+
}
205+
206+
func (d *Teldrive) ArchiveDecompress(ctx context.Context, srcObj, dstDir model.Obj, args model.ArchiveDecompressArgs) ([]model.Obj, error) {
207+
// TODO extract args.InnerPath path in the archive srcObj to the dstDir location, optional
208+
// a folder with the same name as the archive file needs to be created to store the extracted results if args.PutIntoNewDir
209+
// return errs.NotImplement to use an internal archive tool
210+
return nil, errs.NotImplement
211+
}
212+
213+
//func (d *Teldrive) Other(ctx context.Context, args model.OtherArgs) (interface{}, error) {
214+
// return nil, errs.NotSupport
215+
//}
216+
217+
var _ driver.Driver = (*Teldrive)(nil)

drivers/teldrive/meta.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package teldrive
2+
3+
import (
4+
"github.com/OpenListTeam/OpenList/v4/internal/driver"
5+
"github.com/OpenListTeam/OpenList/v4/internal/op"
6+
)
7+
8+
type Addition struct {
9+
driver.RootPath
10+
Address string `json:"url" required:"true"`
11+
Cookie string `json:"cookie" type:"string" required:"true" help:"access_token=xxx"`
12+
UseShareLink bool `json:"use_share_link" type:"bool" default:"false" help:"Create share link when getting link to support 302. If disabled, you need to enable web proxy."`
13+
ChunkSize int64 `json:"chunk_size" type:"number" default:"10" help:"Chunk size in MiB"`
14+
UploadConcurrency int64 `json:"upload_concurrency" type:"number" default:"4" help:"Concurrency upload requests"`
15+
}
16+
17+
var config = driver.Config{
18+
Name: "Teldrive",
19+
DefaultRoot: "/",
20+
}
21+
22+
func init() {
23+
op.RegisterDriver(func() driver.Driver {
24+
return &Teldrive{}
25+
})
26+
}

0 commit comments

Comments
 (0)