Skip to content

Commit 87cf95f

Browse files
authored
fix(139): refactor part upload logic (#1184)
* fix(139): refactor part upload logic Signed-off-by: MadDogOwner <[email protected]> * fix(139): handle upload errors Signed-off-by: MadDogOwner <[email protected]> * fix(139): sort upload parts by PartNumber before uploading Signed-off-by: MadDogOwner <[email protected]> * fix(139): improve error handling Signed-off-by: MadDogOwner <[email protected]> * fix(139): add validation for upload part index to prevent out of bounds errors Signed-off-by: MadDogOwner <[email protected]> --------- Signed-off-by: MadDogOwner <[email protected]>
1 parent 8ab26cb commit 87cf95f

File tree

2 files changed

+65
-47
lines changed

2 files changed

+65
-47
lines changed

drivers/139/driver.go

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -534,16 +534,15 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
534534
if size > partSize {
535535
part = (size + partSize - 1) / partSize
536536
}
537+
538+
// 生成所有 partInfos
537539
partInfos := make([]PartInfo, 0, part)
538540
for i := int64(0); i < part; i++ {
539541
if utils.IsCanceled(ctx) {
540542
return ctx.Err()
541543
}
542544
start := i * partSize
543-
byteSize := size - start
544-
if byteSize > partSize {
545-
byteSize = partSize
546-
}
545+
byteSize := min(size-start, partSize)
547546
partNumber := i + 1
548547
partInfo := PartInfo{
549548
PartNumber: partNumber,
@@ -591,17 +590,20 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
591590
// resp.Data.RapidUpload: true 支持快传,但此处直接检测是否返回分片的上传地址
592591
// 快传的情况下同样需要手动处理冲突
593592
if resp.Data.PartInfos != nil {
594-
// 读取前100个分片的上传地址
595-
uploadPartInfos := resp.Data.PartInfos
596-
597-
// 获取后续分片的上传地址
598-
for i := 101; i < len(partInfos); i += 100 {
599-
end := i + 100
600-
if end > len(partInfos) {
601-
end = len(partInfos)
602-
}
603-
batchPartInfos := partInfos[i:end]
593+
// Progress
594+
p := driver.NewProgress(size, up)
595+
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
596+
597+
// 先上传前100个分片
598+
err = d.uploadPersonalParts(ctx, partInfos, resp.Data.PartInfos, rateLimited, p)
599+
if err != nil {
600+
return err
601+
}
604602

603+
// 如果还有剩余分片,分批获取上传地址并上传
604+
for i := 100; i < len(partInfos); i += 100 {
605+
end := min(i+100, len(partInfos))
606+
batchPartInfos := partInfos[i:end]
605607
moredata := base.Json{
606608
"fileId": resp.Data.FileId,
607609
"uploadId": resp.Data.UploadId,
@@ -617,44 +619,13 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
617619
if err != nil {
618620
return err
619621
}
620-
uploadPartInfos = append(uploadPartInfos, moreresp.Data.PartInfos...)
621-
}
622-
623-
// Progress
624-
p := driver.NewProgress(size, up)
625-
626-
rateLimited := driver.NewLimitedUploadStream(ctx, stream)
627-
// 上传所有分片
628-
for _, uploadPartInfo := range uploadPartInfos {
629-
index := uploadPartInfo.PartNumber - 1
630-
partSize := partInfos[index].PartSize
631-
log.Debugf("[139] uploading part %+v/%+v", index, len(uploadPartInfos))
632-
limitReader := io.LimitReader(rateLimited, partSize)
633-
634-
// Update Progress
635-
r := io.TeeReader(limitReader, p)
636-
637-
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadPartInfo.UploadUrl, r)
638-
if err != nil {
639-
return err
640-
}
641-
req.Header.Set("Content-Type", "application/octet-stream")
642-
req.Header.Set("Content-Length", fmt.Sprint(partSize))
643-
req.Header.Set("Origin", "https://yun.139.com")
644-
req.Header.Set("Referer", "https://yun.139.com/")
645-
req.ContentLength = partSize
646-
647-
res, err := base.HttpClient.Do(req)
622+
err = d.uploadPersonalParts(ctx, partInfos, moreresp.Data.PartInfos, rateLimited, p)
648623
if err != nil {
649624
return err
650625
}
651-
_ = res.Body.Close()
652-
log.Debugf("[139] uploaded: %+v", res)
653-
if res.StatusCode != http.StatusOK {
654-
return fmt.Errorf("unexpected status code: %d", res.StatusCode)
655-
}
656626
}
657627

628+
// 全部分片上传完毕后,complete
658629
data = base.Json{
659630
"contentHash": fullHash,
660631
"contentHashAlgorithm": "SHA256",

drivers/139/util.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package _139
22

33
import (
4+
"context"
45
"encoding/base64"
56
"errors"
67
"fmt"
8+
"io"
79
"net/http"
810
"net/url"
911
"path"
@@ -13,6 +15,7 @@ import (
1315
"time"
1416

1517
"github.com/OpenListTeam/OpenList/v4/drivers/base"
18+
"github.com/OpenListTeam/OpenList/v4/internal/driver"
1619
"github.com/OpenListTeam/OpenList/v4/internal/model"
1720
"github.com/OpenListTeam/OpenList/v4/internal/op"
1821
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
@@ -623,3 +626,47 @@ func (d *Yun139) getPersonalCloudHost() string {
623626
}
624627
return d.PersonalCloudHost
625628
}
629+
630+
func (d *Yun139) uploadPersonalParts(ctx context.Context, partInfos []PartInfo, uploadPartInfos []PersonalPartInfo, rateLimited *driver.RateLimitReader, p *driver.Progress) error {
631+
// 确保数组以 PartNumber 从小到大排序
632+
sort.Slice(uploadPartInfos, func(i, j int) bool {
633+
return uploadPartInfos[i].PartNumber < uploadPartInfos[j].PartNumber
634+
})
635+
636+
for _, uploadPartInfo := range uploadPartInfos {
637+
index := uploadPartInfo.PartNumber - 1
638+
if index < 0 || index >= len(partInfos) {
639+
return fmt.Errorf("invalid PartNumber %d: index out of bounds (partInfos length: %d)", uploadPartInfo.PartNumber, len(partInfos))
640+
}
641+
partSize := partInfos[index].PartSize
642+
log.Debugf("[139] uploading part %+v/%+v", index, len(partInfos))
643+
limitReader := io.LimitReader(rateLimited, partSize)
644+
r := io.TeeReader(limitReader, p)
645+
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadPartInfo.UploadUrl, r)
646+
if err != nil {
647+
return err
648+
}
649+
req.Header.Set("Content-Type", "application/octet-stream")
650+
req.Header.Set("Content-Length", fmt.Sprint(partSize))
651+
req.Header.Set("Origin", "https://yun.139.com")
652+
req.Header.Set("Referer", "https://yun.139.com/")
653+
req.ContentLength = partSize
654+
err = func() error {
655+
res, err := base.HttpClient.Do(req)
656+
if err != nil {
657+
return err
658+
}
659+
defer res.Body.Close()
660+
log.Debugf("[139] uploaded: %+v", res)
661+
if res.StatusCode != http.StatusOK {
662+
body, _ := io.ReadAll(res.Body)
663+
return fmt.Errorf("unexpected status code: %d, body: %s", res.StatusCode, string(body))
664+
}
665+
return nil
666+
}()
667+
if err != nil {
668+
return err
669+
}
670+
}
671+
return nil
672+
}

0 commit comments

Comments
 (0)