Skip to content

Commit 57fceab

Browse files
j2rong4cnCopilot
andauthored
perf(stream): improve file stream range reading and caching mechanism (#1001)
* perf(stream): improve file stream range reading and caching mechanism * 。 * add bytes_test.go * fix(stream): handle EOF and buffer reading more gracefully * 注释 * refactor: update CacheFullAndWriter to accept pointer for UpdateProgress * update tests * Update drivers/google_drive/util.go Co-authored-by: Copilot <[email protected]> Signed-off-by: j2rong4cn <[email protected]> * 更优雅的克隆Link * 修复stream已缓存但无法重复读取 * 将Bytes类型重命名为Reader * 修复栈溢出 * update tests --------- Signed-off-by: j2rong4cn <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent 8c244a9 commit 57fceab

File tree

48 files changed

+654
-377
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+654
-377
lines changed

drivers/115/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,7 @@ func (d *Pan115) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
186186
preHash = strings.ToUpper(preHash)
187187
fullHash := stream.GetHash().GetHash(utils.SHA1)
188188
if len(fullHash) != utils.SHA1.Width {
189-
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
190-
up = model.UpdateProgressWithRange(up, 50, 100)
191-
_, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA1)
189+
_, fullHash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA1)
192190
if err != nil {
193191
return nil, err
194192
}

drivers/115/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ func (d *Pan115) UploadByMultipart(ctx context.Context, params *driver115.Upload
321321
err error
322322
)
323323

324-
tmpF, err := s.CacheFullInTempFile()
324+
tmpF, err := s.CacheFullAndWriter(&up, nil)
325325
if err != nil {
326326
return nil, err
327327
}

drivers/115_open/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,7 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
239239
}
240240
sha1 := file.GetHash().GetHash(utils.SHA1)
241241
if len(sha1) != utils.SHA1.Width {
242-
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
243-
up = model.UpdateProgressWithRange(up, 50, 100)
244-
_, sha1, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.SHA1)
242+
_, sha1, err = stream.CacheFullAndHash(file, &up, utils.SHA1)
245243
if err != nil {
246244
return err
247245
}

drivers/115_open/upload.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,14 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
8686

8787
fileSize := stream.GetSize()
8888
chunkSize := calPartSize(fileSize)
89-
partNum := (stream.GetSize() + chunkSize - 1) / chunkSize
90-
parts := make([]oss.UploadPart, partNum)
91-
offset := int64(0)
92-
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize))
89+
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize), &up)
9390
if err != nil {
9491
return err
9592
}
93+
94+
partNum := (stream.GetSize() + chunkSize - 1) / chunkSize
95+
parts := make([]oss.UploadPart, partNum)
96+
offset := int64(0)
9697
for i := int64(1); i <= partNum; i++ {
9798
if utils.IsCanceled(ctx) {
9899
return ctx.Err()
@@ -119,7 +120,7 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
119120
retry.Attempts(3),
120121
retry.DelayType(retry.BackOffDelay),
121122
retry.Delay(time.Second))
122-
ss.RecycleSectionReader(rd)
123+
ss.FreeSectionReader(rd)
123124
if err != nil {
124125
return err
125126
}

drivers/123/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,7 @@ func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, file model.FileStrea
182182
etag := file.GetHash().GetHash(utils.MD5)
183183
var err error
184184
if len(etag) < utils.MD5.Width {
185-
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
186-
up = model.UpdateProgressWithRange(up, 50, 100)
187-
_, etag, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5)
185+
_, etag, err = stream.CacheFullAndHash(file, &up, utils.MD5)
188186
if err != nil {
189187
return err
190188
}

drivers/123/upload.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
8181
if size > chunkSize {
8282
chunkCount = int((size + chunkSize - 1) / chunkSize)
8383
}
84+
85+
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
86+
if err != nil {
87+
return err
88+
}
89+
8490
lastChunkSize := size % chunkSize
8591
if lastChunkSize == 0 {
8692
lastChunkSize = chunkSize
@@ -92,10 +98,6 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
9298
batchSize = 10
9399
getS3UploadUrl = d.getS3PreSignedUrls
94100
}
95-
ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
96-
if err != nil {
97-
return err
98-
}
99101

100102
thread := min(int(chunkCount), d.UploadThread)
101103
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
@@ -180,7 +182,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
180182
return nil
181183
},
182184
After: func(err error) {
183-
ss.RecycleSectionReader(reader)
185+
ss.FreeSectionReader(reader)
184186
},
185187
})
186188
}

drivers/123_open/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,7 @@ func (d *Open123) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
132132
// etag 文件md5
133133
etag := file.GetHash().GetHash(utils.MD5)
134134
if len(etag) < utils.MD5.Width {
135-
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
136-
up = model.UpdateProgressWithRange(up, 50, 100)
137-
_, etag, err = stream.CacheFullInTempFileAndHash(file, cacheFileProgress, utils.MD5)
135+
_, etag, err = stream.CacheFullAndHash(file, &up, utils.MD5)
138136
if err != nil {
139137
return nil, err
140138
}

drivers/123_open/upload.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,19 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
4646
uploadDomain := createResp.Data.Servers[0]
4747
size := file.GetSize()
4848
chunkSize := createResp.Data.SliceSize
49+
50+
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
51+
if err != nil {
52+
return err
53+
}
54+
4955
uploadNums := (size + chunkSize - 1) / chunkSize
5056
thread := min(int(uploadNums), d.UploadThread)
5157
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
5258
retry.Attempts(3),
5359
retry.Delay(time.Second),
5460
retry.DelayType(retry.BackOffDelay))
5561

56-
ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
57-
if err != nil {
58-
return err
59-
}
6062
for partIndex := range uploadNums {
6163
if utils.IsCanceled(uploadCtx) {
6264
break
@@ -157,7 +159,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
157159
return nil
158160
},
159161
After: func(err error) {
160-
ss.RecycleSectionReader(reader)
162+
ss.FreeSectionReader(reader)
161163
},
162164
})
163165
}

drivers/139/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -522,9 +522,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
522522
var err error
523523
fullHash := stream.GetHash().GetHash(utils.SHA256)
524524
if len(fullHash) != utils.SHA256.Width {
525-
cacheFileProgress := model.UpdateProgressWithRange(up, 0, 50)
526-
up = model.UpdateProgressWithRange(up, 50, 100)
527-
_, fullHash, err = streamPkg.CacheFullInTempFileAndHash(stream, cacheFileProgress, utils.SHA256)
525+
_, fullHash, err = streamPkg.CacheFullAndHash(stream, &up, utils.SHA256)
528526
if err != nil {
529527
return err
530528
}

drivers/189_tv/utils.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@ import (
55
"encoding/base64"
66
"encoding/xml"
77
"fmt"
8-
"github.com/skip2/go-qrcode"
98
"io"
109
"net/http"
1110
"strconv"
1211
"strings"
1312
"time"
1413

14+
"github.com/skip2/go-qrcode"
15+
1516
"github.com/OpenListTeam/OpenList/v4/drivers/base"
1617
"github.com/OpenListTeam/OpenList/v4/internal/driver"
1718
"github.com/OpenListTeam/OpenList/v4/internal/model"
1819
"github.com/OpenListTeam/OpenList/v4/internal/op"
20+
"github.com/OpenListTeam/OpenList/v4/internal/stream"
1921
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
2022

2123
"github.com/go-resty/resty/v2"
@@ -311,11 +313,14 @@ func (y *Cloud189TV) RapidUpload(ctx context.Context, dstDir model.Obj, stream m
311313

312314
// 旧版本上传,家庭云不支持覆盖
313315
func (y *Cloud189TV) OldUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up driver.UpdateProgress, isFamily bool, overwrite bool) (model.Obj, error) {
314-
tempFile, err := file.CacheFullInTempFile()
315-
if err != nil {
316-
return nil, err
316+
fileMd5 := file.GetHash().GetHash(utils.MD5)
317+
var tempFile = file.GetFile()
318+
var err error
319+
if len(fileMd5) != utils.MD5.Width {
320+
tempFile, fileMd5, err = stream.CacheFullAndHash(file, &up, utils.MD5)
321+
} else if tempFile == nil {
322+
tempFile, err = file.CacheFullAndWriter(&up, nil)
317323
}
318-
fileMd5, err := utils.HashFile(utils.MD5, tempFile)
319324
if err != nil {
320325
return nil, err
321326
}
@@ -345,7 +350,7 @@ func (y *Cloud189TV) OldUpload(ctx context.Context, dstDir model.Obj, file model
345350
header["Edrive-UploadFileId"] = fmt.Sprint(status.UploadFileId)
346351
}
347352

348-
_, err := y.put(ctx, status.FileUploadUrl, header, true, io.NopCloser(tempFile), isFamily)
353+
_, err := y.put(ctx, status.FileUploadUrl, header, true, tempFile, isFamily)
349354
if err, ok := err.(*RespErr); ok && err.Code != "InputStreamReadError" {
350355
return nil, err
351356
}

0 commit comments

Comments
 (0)