1
1
package _123_open
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
6
+ "encoding/json"
7
+ "fmt"
5
8
"io"
9
+ "mime/multipart"
6
10
"net/http"
11
+ "strconv"
7
12
"strings"
8
13
"time"
9
14
@@ -17,6 +22,7 @@ import (
17
22
"github.com/go-resty/resty/v2"
18
23
)
19
24
25
+ // 创建文件 V2
20
26
func (d * Open123 ) create (parentFileID int64 , filename string , etag string , size int64 , duplicate int , containDir bool ) (* UploadCreateResp , error ) {
21
27
var resp UploadCreateResp
22
28
_ , err := d .Request (UploadCreate , http .MethodPost , func (req * resty.Request ) {
@@ -35,48 +41,9 @@ func (d *Open123) create(parentFileID int64, filename string, etag string, size
35
41
return & resp , nil
36
42
}
37
43
38
- func (d * Open123 ) url (preuploadID string , sliceNo int64 ) (string , error ) {
39
- // get upload url
40
- var resp UploadUrlResp
41
- _ , err := d .Request (UploadUrl , http .MethodPost , func (req * resty.Request ) {
42
- req .SetBody (base.Json {
43
- "preuploadId" : preuploadID ,
44
- "sliceNo" : sliceNo ,
45
- })
46
- }, & resp )
47
- if err != nil {
48
- return "" , err
49
- }
50
- return resp .Data .PresignedURL , nil
51
- }
52
-
53
- func (d * Open123 ) complete (preuploadID string ) (* UploadCompleteResp , error ) {
54
- var resp UploadCompleteResp
55
- _ , err := d .Request (UploadComplete , http .MethodPost , func (req * resty.Request ) {
56
- req .SetBody (base.Json {
57
- "preuploadID" : preuploadID ,
58
- })
59
- }, & resp )
60
- if err != nil {
61
- return nil , err
62
- }
63
- return & resp , nil
64
- }
65
-
66
- func (d * Open123 ) async (preuploadID string ) (* UploadAsyncResp , error ) {
67
- var resp UploadAsyncResp
68
- _ , err := d .Request (UploadAsync , http .MethodPost , func (req * resty.Request ) {
69
- req .SetBody (base.Json {
70
- "preuploadID" : preuploadID ,
71
- })
72
- }, & resp )
73
- if err != nil {
74
- return nil , err
75
- }
76
- return & resp , nil
77
- }
78
-
44
+ // 上传分片 V2
79
45
func (d * Open123 ) Upload (ctx context.Context , file model.FileStreamer , createResp * UploadCreateResp , up driver.UpdateProgress ) error {
46
+ uploadDomain := createResp .Data .Servers [0 ]
80
47
size := file .GetSize ()
81
48
chunkSize := createResp .Data .SliceSize
82
49
uploadNums := (size + chunkSize - 1 ) / chunkSize
@@ -90,7 +57,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
90
57
if err != nil {
91
58
return err
92
59
}
93
- for partIndex := int64 ( 0 ); partIndex < uploadNums ; partIndex ++ {
60
+ for partIndex := range uploadNums {
94
61
if utils .IsCanceled (uploadCtx ) {
95
62
break
96
63
}
@@ -100,36 +67,90 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
100
67
size := min (chunkSize , size - offset )
101
68
var reader * stream.SectionReader
102
69
var rateLimitedRd io.Reader
70
+ sliceMD5 := ""
103
71
threadG .GoWithLifecycle (errgroup.Lifecycle {
104
72
Before : func (ctx context.Context ) error {
105
73
if reader == nil {
106
74
var err error
75
+ // 每个分片一个reader
107
76
reader , err = ss .GetSectionReader (offset , size )
108
77
if err != nil {
109
78
return err
110
79
}
80
+ // 计算当前分片的MD5
81
+ sliceMD5 , err = utils .HashReader (utils .MD5 , reader )
82
+ if err != nil {
83
+ return err
84
+ }
111
85
rateLimitedRd = driver .NewLimitedUploadStream (ctx , reader )
112
86
}
113
87
return nil
114
88
},
115
89
Do : func (ctx context.Context ) error {
90
+ // 重置分片reader位置,因为HashReader、上一次失败已经读取到分片EOF
116
91
reader .Seek (0 , io .SeekStart )
117
- uploadPartUrl , err := d .url (createResp .Data .PreuploadID , partNumber )
92
+
93
+ // 创建表单数据
94
+ var b bytes.Buffer
95
+ w := multipart .NewWriter (& b )
96
+ // 添加表单字段
97
+ err = w .WriteField ("preuploadID" , createResp .Data .PreuploadID )
98
+ if err != nil {
99
+ return err
100
+ }
101
+ err = w .WriteField ("sliceNo" , strconv .FormatInt (partNumber , 10 ))
102
+ if err != nil {
103
+ return err
104
+ }
105
+ err = w .WriteField ("sliceMD5" , sliceMD5 )
106
+ if err != nil {
107
+ return err
108
+ }
109
+ // 写入文件内容
110
+ fw , err := w .CreateFormFile ("slice" , fmt .Sprintf ("%s.part%d" , file .GetName (), partNumber ))
111
+ if err != nil {
112
+ return err
113
+ }
114
+ _ , err = utils .CopyWithBuffer (fw , rateLimitedRd )
115
+ if err != nil {
116
+ return err
117
+ }
118
+ err = w .Close ()
118
119
if err != nil {
119
120
return err
120
121
}
121
122
122
- req , err := http .NewRequestWithContext (ctx , http .MethodPut , uploadPartUrl , rateLimitedRd )
123
+ // 创建请求并设置header
124
+ req , err := http .NewRequestWithContext (ctx , http .MethodPost , uploadDomain + "/upload/v2/file/slice" , & b )
123
125
if err != nil {
124
126
return err
125
127
}
126
- req .ContentLength = size
128
+
129
+ // 设置请求头
130
+ req .Header .Add ("Authorization" , "Bearer " + d .AccessToken )
131
+ req .Header .Add ("Content-Type" , w .FormDataContentType ())
132
+ req .Header .Add ("Platform" , "open_platform" )
127
133
128
134
res , err := base .HttpClient .Do (req )
129
135
if err != nil {
130
136
return err
131
137
}
132
- _ = res .Body .Close ()
138
+ defer res .Body .Close ()
139
+ if res .StatusCode != 200 {
140
+ return fmt .Errorf ("slice %d upload failed, status code: %d" , partNumber , res .StatusCode )
141
+ }
142
+ var resp BaseResp
143
+ respBody , err := io .ReadAll (res .Body )
144
+ if err != nil {
145
+ return err
146
+ }
147
+ err = json .Unmarshal (respBody , & resp )
148
+ if err != nil {
149
+ return err
150
+ }
151
+ if resp .Code != 0 {
152
+ return fmt .Errorf ("slice %d upload failed: %s" , partNumber , resp .Message )
153
+ }
133
154
134
155
progress := 10.0 + 85.0 * float64 (threadG .Success ())/ float64 (uploadNums )
135
156
up (progress )
@@ -145,23 +166,19 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
145
166
return err
146
167
}
147
168
148
- uploadCompleteResp , err := d .complete (createResp .Data .PreuploadID )
149
- if err != nil {
150
- return err
151
- }
152
- if uploadCompleteResp .Data .Async == false || uploadCompleteResp .Data .Completed {
153
- return nil
154
- }
169
+ return nil
170
+ }
155
171
156
- for {
157
- uploadAsyncResp , err := d .async (createResp .Data .PreuploadID )
158
- if err != nil {
159
- return err
160
- }
161
- if uploadAsyncResp .Data .Completed {
162
- break
163
- }
172
+ // 上传完毕
173
+ func (d * Open123 ) complete (preuploadID string ) (* UploadCompleteResp , error ) {
174
+ var resp UploadCompleteResp
175
+ _ , err := d .Request (UploadComplete , http .MethodPost , func (req * resty.Request ) {
176
+ req .SetBody (base.Json {
177
+ "preuploadID" : preuploadID ,
178
+ })
179
+ }, & resp )
180
+ if err != nil {
181
+ return nil , err
164
182
}
165
- up (100 )
166
- return nil
183
+ return & resp , nil
167
184
}
0 commit comments