Skip to content

Commit c38dc6d

Browse files
neverleeneverlee
andauthored
fix(115_open): support multipart upload (#8229)
Co-authored-by: neverlee <neverlea@formail.com>
1 parent 5668e4a commit c38dc6d

File tree

2 files changed

+141
-14
lines changed

2 files changed

+141
-14
lines changed

drivers/115_open/driver.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package _115_open
22

33
import (
44
"context"
5-
"encoding/base64"
65
"fmt"
76
"io"
87
"net/http"
@@ -16,7 +15,6 @@ import (
1615
"github.com/alist-org/alist/v3/internal/model"
1716
"github.com/alist-org/alist/v3/internal/op"
1817
"github.com/alist-org/alist/v3/pkg/utils"
19-
"github.com/aliyun/aliyun-oss-go-sdk/oss"
2018
sdk "github.com/xhofe/115-sdk-go"
2119
)
2220

@@ -265,18 +263,7 @@ func (d *Open115) Put(ctx context.Context, dstDir model.Obj, file model.FileStre
265263
return err
266264
}
267265
// 4. upload
268-
ossClient, err := oss.New(tokenResp.Endpoint, tokenResp.AccessKeyId, tokenResp.AccessKeySecret, oss.SecurityToken(tokenResp.SecurityToken))
269-
if err != nil {
270-
return err
271-
}
272-
bucket, err := ossClient.Bucket(resp.Bucket)
273-
if err != nil {
274-
return err
275-
}
276-
err = bucket.PutObject(resp.Object, tempF,
277-
oss.Callback(base64.StdEncoding.EncodeToString([]byte(resp.Callback.Value.Callback))),
278-
oss.CallbackVar(base64.StdEncoding.EncodeToString([]byte(resp.Callback.Value.CallbackVar))),
279-
)
266+
err = d.multpartUpload(ctx, tempF, file, up, tokenResp, resp)
280267
if err != nil {
281268
return err
282269
}

drivers/115_open/upload.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package _115_open
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"io"
7+
"time"
8+
9+
"github.com/alist-org/alist/v3/internal/driver"
10+
"github.com/alist-org/alist/v3/internal/model"
11+
"github.com/alist-org/alist/v3/pkg/utils"
12+
"github.com/aliyun/aliyun-oss-go-sdk/oss"
13+
"github.com/avast/retry-go"
14+
sdk "github.com/xhofe/115-sdk-go"
15+
)
16+
17+
func calPartSize(fileSize int64) int64 {
18+
var partSize int64 = 20 * utils.MB
19+
if fileSize > partSize {
20+
if fileSize > 1*utils.TB { // file Size over 1TB
21+
partSize = 5 * utils.GB // file part size 5GB
22+
} else if fileSize > 768*utils.GB { // over 768GB
23+
partSize = 109951163 // ≈ 104.8576MB, split 1TB into 10,000 part
24+
} else if fileSize > 512*utils.GB { // over 512GB
25+
partSize = 82463373 // ≈ 78.6432MB
26+
} else if fileSize > 384*utils.GB { // over 384GB
27+
partSize = 54975582 // ≈ 52.4288MB
28+
} else if fileSize > 256*utils.GB { // over 256GB
29+
partSize = 41231687 // ≈ 39.3216MB
30+
} else if fileSize > 128*utils.GB { // over 128GB
31+
partSize = 27487791 // ≈ 26.2144MB
32+
}
33+
}
34+
return partSize
35+
}
36+
37+
func (d *Open115) singleUpload(ctx context.Context, tempF model.File, tokenResp *sdk.UploadGetTokenResp, initResp *sdk.UploadInitResp) error {
38+
ossClient, err := oss.New(tokenResp.Endpoint, tokenResp.AccessKeyId, tokenResp.AccessKeySecret, oss.SecurityToken(tokenResp.SecurityToken))
39+
if err != nil {
40+
return err
41+
}
42+
bucket, err := ossClient.Bucket(initResp.Bucket)
43+
if err != nil {
44+
return err
45+
}
46+
47+
err = bucket.PutObject(initResp.Object, tempF,
48+
oss.Callback(base64.StdEncoding.EncodeToString([]byte(initResp.Callback.Value.Callback))),
49+
oss.CallbackVar(base64.StdEncoding.EncodeToString([]byte(initResp.Callback.Value.CallbackVar))),
50+
)
51+
52+
return err
53+
}
54+
55+
// type CallbackResult struct {
56+
// State bool `json:"state"`
57+
// Code int `json:"code"`
58+
// Message string `json:"message"`
59+
// Data struct {
60+
// PickCode string `json:"pick_code"`
61+
// FileName string `json:"file_name"`
62+
// FileSize int64 `json:"file_size"`
63+
// FileID string `json:"file_id"`
64+
// ThumbURL string `json:"thumb_url"`
65+
// Sha1 string `json:"sha1"`
66+
// Aid int `json:"aid"`
67+
// Cid string `json:"cid"`
68+
// } `json:"data"`
69+
// }
70+
71+
func (d *Open115) multpartUpload(ctx context.Context, tempF model.File, stream model.FileStreamer, up driver.UpdateProgress, tokenResp *sdk.UploadGetTokenResp, initResp *sdk.UploadInitResp) error {
72+
fileSize := stream.GetSize()
73+
chunkSize := calPartSize(fileSize)
74+
75+
ossClient, err := oss.New(tokenResp.Endpoint, tokenResp.AccessKeyId, tokenResp.AccessKeySecret, oss.SecurityToken(tokenResp.SecurityToken))
76+
if err != nil {
77+
return err
78+
}
79+
bucket, err := ossClient.Bucket(initResp.Bucket)
80+
if err != nil {
81+
return err
82+
}
83+
84+
imur, err := bucket.InitiateMultipartUpload(initResp.Object, oss.Sequential())
85+
if err != nil {
86+
return err
87+
}
88+
89+
partNum := (stream.GetSize() + chunkSize - 1) / chunkSize
90+
parts := make([]oss.UploadPart, partNum)
91+
offset := int64(0)
92+
for i := int64(1); i <= partNum; i++ {
93+
if utils.IsCanceled(ctx) {
94+
return ctx.Err()
95+
}
96+
97+
partSize := chunkSize
98+
if i == partNum {
99+
partSize = fileSize - (i-1)*chunkSize
100+
}
101+
rd := utils.NewMultiReadable(io.LimitReader(stream, partSize))
102+
err = retry.Do(func() error {
103+
_ = rd.Reset()
104+
rateLimitedRd := driver.NewLimitedUploadStream(ctx, rd)
105+
part, err := bucket.UploadPart(imur, rateLimitedRd, partSize, int(i))
106+
if err != nil {
107+
return err
108+
}
109+
parts[i-1] = part
110+
return nil
111+
},
112+
retry.Attempts(3),
113+
retry.DelayType(retry.BackOffDelay),
114+
retry.Delay(time.Second))
115+
if err != nil {
116+
return err
117+
}
118+
119+
if i == partNum {
120+
offset = fileSize
121+
} else {
122+
offset += partSize
123+
}
124+
up(float64(offset) / float64(fileSize))
125+
}
126+
127+
// callbackRespBytes := make([]byte, 1024)
128+
_, err = bucket.CompleteMultipartUpload(
129+
imur,
130+
parts,
131+
oss.Callback(base64.StdEncoding.EncodeToString([]byte(initResp.Callback.Value.Callback))),
132+
oss.CallbackVar(base64.StdEncoding.EncodeToString([]byte(initResp.Callback.Value.CallbackVar))),
133+
// oss.CallbackResult(&callbackRespBytes),
134+
)
135+
if err != nil {
136+
return err
137+
}
138+
139+
return nil
140+
}

0 commit comments

Comments
 (0)