Skip to content

Commit 9ac0484

Browse files
authored
perf(ftp): improve concurrent Link response; fix alias/local driver issues (#974)
1 parent 8cf1518 commit 9ac0484

File tree

21 files changed

+338
-394
lines changed

21 files changed

+338
-394
lines changed

drivers/189pc/utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,9 +550,9 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
550550
return err
551551
}
552552
silceMd5.Reset()
553-
w, _ := utils.CopyWithBuffer(writers, reader)
553+
w, err := utils.CopyWithBuffer(writers, reader)
554554
if w != size {
555-
return fmt.Errorf("can't read data, expected=%d, got=%d", size, w)
555+
return fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", size, w, err)
556556
}
557557
// 计算块md5并进行hex和base64编码
558558
md5Bytes := silceMd5.Sum(nil)

drivers/alias/driver.go

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,18 @@ func (d *Alias) Get(ctx context.Context, path string) (model.Obj, error) {
7878
return nil, errs.ObjectNotFound
7979
}
8080
for _, dst := range dsts {
81-
obj, err := d.get(ctx, path, dst, sub)
82-
if err == nil {
83-
return obj, nil
81+
obj, err := fs.Get(ctx, stdpath.Join(dst, sub), &fs.GetArgs{NoLog: true})
82+
if err != nil {
83+
continue
8484
}
85+
return &model.Object{
86+
Path: path,
87+
Name: obj.GetName(),
88+
Size: obj.GetSize(),
89+
Modified: obj.ModTime(),
90+
IsFolder: obj.IsDir(),
91+
HashInfo: obj.GetHash(),
92+
}, nil
8593
}
8694
return nil, errs.ObjectNotFound
8795
}
@@ -99,7 +107,27 @@ func (d *Alias) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([
99107
var objs []model.Obj
100108
fsArgs := &fs.ListArgs{NoLog: true, Refresh: args.Refresh}
101109
for _, dst := range dsts {
102-
tmp, err := d.list(ctx, dst, sub, fsArgs)
110+
tmp, err := fs.List(ctx, stdpath.Join(dst, sub), fsArgs)
111+
if err == nil {
112+
tmp, err = utils.SliceConvert(tmp, func(obj model.Obj) (model.Obj, error) {
113+
thumb, ok := model.GetThumb(obj)
114+
objRes := model.Object{
115+
Name: obj.GetName(),
116+
Size: obj.GetSize(),
117+
Modified: obj.ModTime(),
118+
IsFolder: obj.IsDir(),
119+
}
120+
if !ok {
121+
return &objRes, nil
122+
}
123+
return &model.ObjThumb{
124+
Object: objRes,
125+
Thumbnail: model.Thumbnail{
126+
Thumbnail: thumb,
127+
},
128+
}, nil
129+
})
130+
}
103131
if err == nil {
104132
objs = append(objs, tmp...)
105133
}
@@ -113,43 +141,50 @@ func (d *Alias) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
113141
if !ok {
114142
return nil, errs.ObjectNotFound
115143
}
144+
// proxy || ftp,s3
145+
if common.GetApiUrl(ctx) == "" {
146+
args.Redirect = false
147+
}
116148
for _, dst := range dsts {
117149
reqPath := stdpath.Join(dst, sub)
118-
link, file, err := d.link(ctx, reqPath, args)
150+
link, fi, err := d.link(ctx, reqPath, args)
119151
if err != nil {
120152
continue
121153
}
122-
var resultLink *model.Link
123-
if link != nil {
124-
resultLink = &model.Link{
125-
URL: link.URL,
126-
Header: link.Header,
127-
RangeReader: link.RangeReader,
128-
SyncClosers: utils.NewSyncClosers(link),
129-
ContentLength: link.ContentLength,
130-
}
131-
if link.MFile != nil {
132-
resultLink.RangeReader = &model.FileRangeReader{
133-
RangeReaderIF: stream.GetRangeReaderFromMFile(file.GetSize(), link.MFile),
134-
}
135-
}
136-
137-
} else {
138-
resultLink = &model.Link{
154+
if link == nil {
155+
// 重定向且需要通过代理
156+
return &model.Link{
139157
URL: fmt.Sprintf("%s/p%s?sign=%s",
140158
common.GetApiUrl(ctx),
141159
utils.EncodePath(reqPath, true),
142160
sign.Sign(reqPath)),
143-
}
161+
}, nil
162+
}
163+
if args.Redirect {
164+
return link, nil
165+
}
144166

167+
resultLink := &model.Link{
168+
URL: link.URL,
169+
Header: link.Header,
170+
RangeReader: link.RangeReader,
171+
MFile: link.MFile,
172+
Concurrency: link.Concurrency,
173+
PartSize: link.PartSize,
174+
ContentLength: link.ContentLength,
175+
SyncClosers: utils.NewSyncClosers(link),
145176
}
146-
if !args.Redirect {
147-
if d.DownloadConcurrency > 0 {
148-
resultLink.Concurrency = d.DownloadConcurrency
149-
}
150-
if d.DownloadPartSize > 0 {
151-
resultLink.PartSize = d.DownloadPartSize * utils.KB
152-
}
177+
if resultLink.ContentLength == 0 {
178+
resultLink.ContentLength = fi.GetSize()
179+
}
180+
if resultLink.MFile != nil {
181+
return resultLink, nil
182+
}
183+
if d.DownloadConcurrency > 0 {
184+
resultLink.Concurrency = d.DownloadConcurrency
185+
}
186+
if d.DownloadPartSize > 0 {
187+
resultLink.PartSize = d.DownloadPartSize * utils.KB
153188
}
154189
return resultLink, nil
155190
}

drivers/alias/util.go

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -54,55 +54,12 @@ func (d *Alias) getRootAndPath(path string) (string, string) {
5454
return parts[0], parts[1]
5555
}
5656

57-
func (d *Alias) get(ctx context.Context, path string, dst, sub string) (model.Obj, error) {
58-
obj, err := fs.Get(ctx, stdpath.Join(dst, sub), &fs.GetArgs{NoLog: true})
59-
if err != nil {
60-
return nil, err
61-
}
62-
return &model.Object{
63-
Path: path,
64-
Name: obj.GetName(),
65-
Size: obj.GetSize(),
66-
Modified: obj.ModTime(),
67-
IsFolder: obj.IsDir(),
68-
HashInfo: obj.GetHash(),
69-
}, nil
70-
}
71-
72-
func (d *Alias) list(ctx context.Context, dst, sub string, args *fs.ListArgs) ([]model.Obj, error) {
73-
objs, err := fs.List(ctx, stdpath.Join(dst, sub), args)
74-
// the obj must implement the model.SetPath interface
75-
// return objs, err
76-
if err != nil {
77-
return nil, err
78-
}
79-
return utils.SliceConvert(objs, func(obj model.Obj) (model.Obj, error) {
80-
thumb, ok := model.GetThumb(obj)
81-
objRes := model.Object{
82-
Name: obj.GetName(),
83-
Size: obj.GetSize(),
84-
Modified: obj.ModTime(),
85-
IsFolder: obj.IsDir(),
86-
}
87-
if !ok {
88-
return &objRes, nil
89-
}
90-
return &model.ObjThumb{
91-
Object: objRes,
92-
Thumbnail: model.Thumbnail{
93-
Thumbnail: thumb,
94-
},
95-
}, nil
96-
})
97-
}
98-
9957
func (d *Alias) link(ctx context.Context, reqPath string, args model.LinkArgs) (*model.Link, model.Obj, error) {
10058
storage, reqActualPath, err := op.GetStorageAndActualPath(reqPath)
10159
if err != nil {
10260
return nil, nil, err
10361
}
104-
// proxy || ftp,s3
105-
if !args.Redirect || len(common.GetApiUrl(ctx)) == 0 {
62+
if !args.Redirect {
10663
return op.Link(ctx, storage, reqActualPath, args)
10764
}
10865
obj, err := fs.Get(ctx, reqPath, &fs.GetArgs{NoLog: true})

drivers/aliyundrive_open/upload.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,8 @@ func (d *AliyundriveOpen) calProofCode(stream model.FileStreamer) (string, error
137137
}
138138
buf := make([]byte, length)
139139
n, err := io.ReadFull(reader, buf)
140-
if err == io.ErrUnexpectedEOF {
141-
return "", fmt.Errorf("can't read data, expected=%d, got=%d", len(buf), n)
142-
}
143-
if err != nil {
144-
return "", err
140+
if n != int(length) {
141+
return "", fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
145142
}
146143
return base64.StdEncoding.EncodeToString(buf), nil
147144
}

drivers/crypt/driver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,10 @@ func (d *Crypt) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (
292292

293293
if offset == 0 && limit > 0 {
294294
fileHeader = make([]byte, fileHeaderSize)
295-
n, _ := io.ReadFull(remoteReader, fileHeader)
295+
n, err := io.ReadFull(remoteReader, fileHeader)
296296
if n != fileHeaderSize {
297297
fileHeader = nil
298-
return nil, fmt.Errorf("can't read data, expected=%d, got=%d", fileHeaderSize, n)
298+
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", fileHeaderSize, n, err)
299299
}
300300
if limit <= fileHeaderSize {
301301
remoteReader.Close()

drivers/doubao/util.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,9 @@ func (d *Doubao) Upload(ctx context.Context, config *UploadConfig, dstDir model.
460460

461461
// 计算CRC32
462462
crc32Hash := crc32.NewIEEE()
463-
w, _ := utils.CopyWithBuffer(crc32Hash, reader)
463+
w, err := utils.CopyWithBuffer(crc32Hash, reader)
464464
if w != file.GetSize() {
465-
return nil, fmt.Errorf("can't read data, expected=%d, got=%d", file.GetSize(), w)
465+
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", file.GetSize(), w, err)
466466
}
467467
crc32Value := hex.EncodeToString(crc32Hash.Sum(nil))
468468

@@ -588,9 +588,9 @@ func (d *Doubao) UploadByMultipart(ctx context.Context, config *UploadConfig, fi
588588
return err
589589
}
590590
hash.Reset()
591-
w, _ := utils.CopyWithBuffer(hash, reader)
591+
w, err := utils.CopyWithBuffer(hash, reader)
592592
if w != size {
593-
return fmt.Errorf("can't read data, expected=%d, got=%d", size, w)
593+
return fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", size, w, err)
594594
}
595595
crc32Value = hex.EncodeToString(hash.Sum(nil))
596596
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)

drivers/ftp/driver.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ package ftp
22

33
import (
44
"context"
5+
"io"
56
stdpath "path"
7+
"sync"
8+
"time"
69

710
"github.com/OpenListTeam/OpenList/v4/internal/driver"
811
"github.com/OpenListTeam/OpenList/v4/internal/errs"
912
"github.com/OpenListTeam/OpenList/v4/internal/model"
1013
"github.com/OpenListTeam/OpenList/v4/internal/stream"
14+
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1115
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1216
"github.com/jlaffaye/ftp"
1317
)
@@ -16,6 +20,9 @@ type FTP struct {
1620
model.Storage
1721
Addition
1822
conn *ftp.ServerConn
23+
24+
ctx context.Context
25+
cancel context.CancelFunc
1926
}
2027

2128
func (d *FTP) Config() driver.Config {
@@ -27,12 +34,16 @@ func (d *FTP) GetAddition() driver.Additional {
2734
}
2835

2936
func (d *FTP) Init(ctx context.Context) error {
30-
return d._login()
37+
d.ctx, d.cancel = context.WithCancel(context.Background())
38+
var err error
39+
d.conn, err = d._login(ctx)
40+
return err
3141
}
3242

3343
func (d *FTP) Drop(ctx context.Context) error {
3444
if d.conn != nil {
35-
_ = d.conn.Logout()
45+
_ = d.conn.Quit()
46+
d.cancel()
3647
}
3748
return nil
3849
}
@@ -61,26 +72,53 @@ func (d *FTP) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]m
6172
return res, nil
6273
}
6374

64-
func (d *FTP) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
65-
if err := d.login(); err != nil {
75+
func (d *FTP) Link(_ context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
76+
ctx, cancel := context.WithCancel(context.Background())
77+
conn, err := d._login(ctx)
78+
if err != nil {
79+
cancel()
6680
return nil, err
6781
}
82+
close := func() error {
83+
_ = conn.Quit()
84+
cancel()
85+
return nil
86+
}
6887

69-
remoteFile := NewFileReader(d.conn, encode(file.GetPath(), d.Encoding), file.GetSize())
70-
if remoteFile != nil && !d.Config().OnlyLinkMFile {
71-
return &model.Link{
72-
RangeReader: &model.FileRangeReader{
73-
RangeReaderIF: stream.RateLimitRangeReaderFunc(stream.GetRangeReaderFromMFile(file.GetSize(), remoteFile)),
74-
},
75-
SyncClosers: utils.NewSyncClosers(remoteFile),
88+
path := encode(file.GetPath(), d.Encoding)
89+
size := file.GetSize()
90+
mu := &sync.Mutex{}
91+
resultRangeReader := func(context context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
92+
length := httpRange.Length
93+
if length < 0 || httpRange.Start+length > size {
94+
length = size - httpRange.Start
95+
}
96+
mu.Lock()
97+
defer mu.Unlock()
98+
r, err := conn.RetrFrom(path, uint64(httpRange.Start))
99+
if err != nil {
100+
_ = conn.Quit()
101+
conn, err = d._login(ctx)
102+
if err == nil {
103+
r, err = conn.RetrFrom(path, uint64(httpRange.Start))
104+
}
105+
if err != nil {
106+
return nil, err
107+
}
108+
}
109+
r.SetDeadline(time.Now().Add(time.Second))
110+
return &FileReader{
111+
Response: r,
112+
Reader: io.LimitReader(r, length),
113+
ctx: context,
76114
}, nil
77115
}
116+
78117
return &model.Link{
79-
MFile: &stream.RateLimitFile{
80-
File: remoteFile,
81-
Limiter: stream.ServerDownloadLimit,
82-
Ctx: ctx,
118+
RangeReader: &model.FileRangeReader{
119+
RangeReaderIF: stream.RateLimitRangeReaderFunc(resultRangeReader),
83120
},
121+
SyncClosers: utils.NewSyncClosers(utils.CloseFunc(close)),
84122
}, nil
85123
}
86124

drivers/ftp/meta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type Addition struct {
3333
var config = driver.Config{
3434
Name: "FTP",
3535
LocalSort: true,
36-
OnlyLinkMFile: true,
36+
OnlyLinkMFile: false,
3737
DefaultRoot: "/",
3838
NoLinkURL: true,
3939
}

0 commit comments

Comments
 (0)