Skip to content

Commit ee97856

Browse files
author
cyk
committed
feat(upload): 增强分片上传支持,修复超时和ETag提取逻辑
1 parent 0d3f423 commit ee97856

File tree

3 files changed

+183
-40
lines changed

3 files changed

+183
-40
lines changed

drivers/quark_open/driver.go

Lines changed: 103 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"hash"
99
"io"
1010
"net/http"
11+
"strings"
1112
"time"
1213

1314
"github.com/OpenListTeam/OpenList/v4/drivers/base"
@@ -18,6 +19,7 @@ import (
1819
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1920
"github.com/avast/retry-go"
2021
"github.com/go-resty/resty/v2"
22+
log "github.com/sirupsen/logrus"
2123
)
2224

2325
type QuarkOpen struct {
@@ -144,30 +146,84 @@ func (d *QuarkOpen) Remove(ctx context.Context, obj model.Obj) error {
144146

145147
func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
146148
md5Str, sha1Str := stream.GetHash().GetHash(utils.MD5), stream.GetHash().GetHash(utils.SHA1)
147-
var (
148-
md5 hash.Hash
149-
sha1 hash.Hash
150-
)
151-
writers := []io.Writer{}
152-
if len(md5Str) != utils.MD5.Width {
153-
md5 = utils.MD5.NewFunc()
154-
writers = append(writers, md5)
155-
}
156-
if len(sha1Str) != utils.SHA1.Width {
157-
sha1 = utils.SHA1.NewFunc()
158-
writers = append(writers, sha1)
159-
}
160149

161-
if len(writers) > 0 {
162-
_, err := stream.CacheFullAndWriter(&up, io.MultiWriter(writers...))
163-
if err != nil {
164-
return err
165-
}
166-
if md5 != nil {
167-
md5Str = hex.EncodeToString(md5.Sum(nil))
168-
}
169-
if sha1 != nil {
170-
sha1Str = hex.EncodeToString(sha1.Sum(nil))
150+
// 检查是否需要计算hash
151+
needMD5 := len(md5Str) != utils.MD5.Width
152+
needSHA1 := len(sha1Str) != utils.SHA1.Width
153+
154+
if needMD5 || needSHA1 {
155+
// 检查是否为可重复读取的流
156+
_, isSeekable := stream.(*streamPkg.SeekableStream)
157+
158+
if isSeekable {
159+
// 可重复读取的流,使用 RangeRead 一次性计算所有hash,避免重复读取
160+
var md5 hash.Hash
161+
var sha1 hash.Hash
162+
writers := []io.Writer{}
163+
164+
if needMD5 {
165+
md5 = utils.MD5.NewFunc()
166+
writers = append(writers, md5)
167+
}
168+
if needSHA1 {
169+
sha1 = utils.SHA1.NewFunc()
170+
writers = append(writers, sha1)
171+
}
172+
173+
// 使用 RangeRead 分块读取文件,同时计算多个hash
174+
multiWriter := io.MultiWriter(writers...)
175+
size := stream.GetSize()
176+
chunkSize := int64(10 * utils.MB) // 10MB per chunk
177+
buf := make([]byte, chunkSize)
178+
var offset int64 = 0
179+
180+
for offset < size {
181+
readSize := min(chunkSize, size-offset)
182+
183+
n, err := streamPkg.ReadFullWithRangeRead(stream, buf[:readSize], offset)
184+
if err != nil {
185+
return fmt.Errorf("calculate hash failed at offset %d: %w", offset, err)
186+
}
187+
188+
multiWriter.Write(buf[:n])
189+
offset += int64(n)
190+
191+
// 更新进度(hash计算占用40%的进度)
192+
up(40 * float64(offset) / float64(size))
193+
}
194+
195+
if md5 != nil {
196+
md5Str = hex.EncodeToString(md5.Sum(nil))
197+
}
198+
if sha1 != nil {
199+
sha1Str = hex.EncodeToString(sha1.Sum(nil))
200+
}
201+
} else {
202+
// 不可重复读取的流(如网络流),需要缓存并计算hash
203+
var md5 hash.Hash
204+
var sha1 hash.Hash
205+
writers := []io.Writer{}
206+
207+
if needMD5 {
208+
md5 = utils.MD5.NewFunc()
209+
writers = append(writers, md5)
210+
}
211+
if needSHA1 {
212+
sha1 = utils.SHA1.NewFunc()
213+
writers = append(writers, sha1)
214+
}
215+
216+
_, err := stream.CacheFullAndWriter(&up, io.MultiWriter(writers...))
217+
if err != nil {
218+
return err
219+
}
220+
221+
if md5 != nil {
222+
md5Str = hex.EncodeToString(md5.Sum(nil))
223+
}
224+
if sha1 != nil {
225+
sha1Str = hex.EncodeToString(sha1.Sum(nil))
226+
}
171227
}
172228
}
173229
// pre
@@ -210,24 +266,43 @@ func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.File
210266
if err != nil {
211267
return err
212268
}
269+
270+
// 上传重试逻辑,包含URL刷新
271+
var etag string
213272
err = retry.Do(func() error {
214273
rd.Seek(0, io.SeekStart)
215-
etag, err := d.upPart(ctx, upUrlInfo, i, driver.NewLimitedUploadStream(ctx, rd))
216-
if err != nil {
217-
return err
274+
var uploadErr error
275+
etag, uploadErr = d.upPart(ctx, upUrlInfo, i, driver.NewLimitedUploadStream(ctx, rd))
276+
277+
// 检查是否为URL过期错误
278+
if uploadErr != nil && strings.Contains(uploadErr.Error(), "expire") {
279+
log.Warnf("[quark_open] Upload URL expired for part %d, refreshing...", i)
280+
// 刷新上传URL
281+
newUpUrlInfo, refreshErr := d.upUrl(ctx, pre, partInfo)
282+
if refreshErr != nil {
283+
return fmt.Errorf("failed to refresh upload url: %w", refreshErr)
284+
}
285+
upUrlInfo = newUpUrlInfo
286+
log.Infof("[quark_open] Upload URL refreshed successfully")
287+
288+
// 使用新URL重试上传
289+
rd.Seek(0, io.SeekStart)
290+
etag, uploadErr = d.upPart(ctx, upUrlInfo, i, driver.NewLimitedUploadStream(ctx, rd))
218291
}
219-
etags = append(etags, etag)
220-
return nil
292+
293+
return uploadErr
221294
},
222295
retry.Context(ctx),
223296
retry.Attempts(3),
224297
retry.DelayType(retry.BackOffDelay),
225298
retry.Delay(time.Second))
299+
226300
ss.FreeSectionReader(rd)
227301
if err != nil {
228302
return fmt.Errorf("failed to upload part %d: %w", i, err)
229303
}
230304

305+
etags = append(etags, etag)
231306
up(95 * float64(offset+size) / float64(total))
232307
}
233308

drivers/quark_open/meta.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ type Addition struct {
1313
APIAddress string `json:"api_url_address" default:"https://api.oplist.org/quarkyun/renewapi"`
1414
AccessToken string `json:"access_token" required:"false" default:""`
1515
RefreshToken string `json:"refresh_token" required:"true"`
16-
AppID string `json:"app_id" required:"true" help:"Keep it empty if you don't have one"`
17-
SignKey string `json:"sign_key" required:"true" help:"Keep it empty if you don't have one"`
16+
AppID string `json:"app_id" required:"false" default:"" help:"Optional - Auto-filled from online API, or use your own"`
17+
SignKey string `json:"sign_key" required:"false" default:"" help:"Optional - Auto-filled from online API, or use your own"`
1818
}
1919

2020
type Conf struct {

drivers/quark_open/util.go

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/OpenListTeam/OpenList/v4/drivers/base"
2121
"github.com/OpenListTeam/OpenList/v4/internal/model"
2222
"github.com/OpenListTeam/OpenList/v4/internal/op"
23+
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
2324
"github.com/go-resty/resty/v2"
2425
log "github.com/sirupsen/logrus"
2526
)
@@ -283,8 +284,15 @@ func (d *QuarkOpen) getProofRange(proofSeed string, fileSize int64) (*ProofRange
283284

284285
func (d *QuarkOpen) _getPartInfo(stream model.FileStreamer, partSize int64) []base.Json {
285286
// 计算分片信息
286-
partInfo := make([]base.Json, 0)
287287
total := stream.GetSize()
288+
289+
// 确保partSize合理:最小4MB,避免分片过多
290+
const minPartSize int64 = 4 * utils.MB
291+
if partSize < minPartSize {
292+
partSize = minPartSize
293+
}
294+
295+
partInfo := make([]base.Json, 0)
288296
left := total
289297
partNumber := 1
290298

@@ -304,6 +312,7 @@ func (d *QuarkOpen) _getPartInfo(stream model.FileStreamer, partSize int64) []ba
304312
partNumber++
305313
}
306314

315+
log.Infof("[quark_open] Upload plan: file_size=%d, part_size=%d, part_count=%d", total, partSize, len(partInfo))
307316
return partInfo
308317
}
309318

@@ -340,13 +349,43 @@ func (d *QuarkOpen) upPart(ctx context.Context, upUrlInfo UpUrlInfo, partNumber
340349
req.Header.Set("Accept-Encoding", "gzip")
341350
req.Header.Set("User-Agent", "Go-http-client/1.1")
342351

352+
// ✅ 关键修复:使用更长的超时时间(10分钟)
353+
// 慢速网络下大文件分片上传可能需要很长时间
354+
client := &http.Client{
355+
Timeout: 10 * time.Minute,
356+
Transport: base.HttpClient.Transport,
357+
}
358+
343359
// 发送请求
344-
resp, err := base.HttpClient.Do(req)
360+
resp, err := client.Do(req)
345361
if err != nil {
346362
return "", err
347363
}
348364
defer resp.Body.Close()
349365

366+
// 检查是否为URL过期错误(403, 410等状态码)
367+
if resp.StatusCode == 403 || resp.StatusCode == 410 {
368+
body, _ := io.ReadAll(resp.Body)
369+
return "", fmt.Errorf("upload url expired (status: %d): %s", resp.StatusCode, string(body))
370+
}
371+
372+
// ✅ 关键修复:409 PartAlreadyExist 不是错误!
373+
// 夸克使用Sequential模式,超时重试时如果分片已存在,说明第一次其实成功了
374+
if resp.StatusCode == 409 {
375+
body, _ := io.ReadAll(resp.Body)
376+
// 从响应体中提取已存在分片的ETag
377+
if strings.Contains(string(body), "PartAlreadyExist") {
378+
// 尝试从XML响应中提取ETag
379+
if etag := extractEtagFromXML(string(body)); etag != "" {
380+
log.Infof("[quark_open] Part %d already exists (409), using existing ETag: %s", partNumber+1, etag)
381+
return etag, nil
382+
}
383+
// 如果无法提取ETag,返回错误
384+
log.Warnf("[quark_open] Part %d already exists but cannot extract ETag from response: %s", partNumber+1, string(body))
385+
return "", fmt.Errorf("part already exists but ETag not found in response")
386+
}
387+
}
388+
350389
if resp.StatusCode != 200 {
351390
body, _ := io.ReadAll(resp.Body)
352391
return "", fmt.Errorf("up status: %d, error: %s", resp.StatusCode, string(body))
@@ -355,6 +394,23 @@ func (d *QuarkOpen) upPart(ctx context.Context, upUrlInfo UpUrlInfo, partNumber
355394
return resp.Header.Get("Etag"), nil
356395
}
357396

397+
// extractEtagFromXML 从OSS的XML错误响应中提取ETag
398+
// 示例: <PartEtag>"2F796AC486BB2891E3237D8BFDE020B5"</PartEtag>
399+
func extractEtagFromXML(xmlBody string) string {
400+
start := strings.Index(xmlBody, "<PartEtag>")
401+
if start == -1 {
402+
return ""
403+
}
404+
start += len("<PartEtag>")
405+
end := strings.Index(xmlBody[start:], "</PartEtag>")
406+
if end == -1 {
407+
return ""
408+
}
409+
etag := xmlBody[start : start+end]
410+
// 移除引号
411+
return strings.Trim(etag, "\"")
412+
}
413+
358414
func (d *QuarkOpen) upFinish(ctx context.Context, pre UpPreResp, partInfo []base.Json, etags []string) error {
359415
// 创建 part_info_list
360416
partInfoList := make([]base.Json, len(partInfo))
@@ -417,25 +473,36 @@ func (d *QuarkOpen) generateReqSign(method string, pathname string, signKey stri
417473
}
418474

419475
func (d *QuarkOpen) refreshToken() error {
420-
refresh, access, err := d._refreshToken()
476+
refresh, access, appID, signKey, err := d._refreshToken()
421477
for i := 0; i < 3; i++ {
422478
if err == nil {
423479
break
424480
} else {
425481
log.Errorf("[quark_open] failed to refresh token: %s", err)
426482
}
427-
refresh, access, err = d._refreshToken()
483+
refresh, access, appID, signKey, err = d._refreshToken()
428484
}
429485
if err != nil {
430486
return err
431487
}
432488
log.Infof("[quark_open] token exchange: %s -> %s", d.RefreshToken, refresh)
433489
d.RefreshToken, d.AccessToken = refresh, access
490+
491+
// 如果在线API返回了AppID和SignKey,保存它们(不为空时才更新)
492+
if appID != "" && appID != d.AppID {
493+
d.AppID = appID
494+
log.Infof("[quark_open] AppID updated from online API: %s", appID)
495+
}
496+
if signKey != "" && signKey != d.SignKey {
497+
d.SignKey = signKey
498+
log.Infof("[quark_open] SignKey updated from online API")
499+
}
500+
434501
op.MustSaveDriverStorage(d)
435502
return nil
436503
}
437504

438-
func (d *QuarkOpen) _refreshToken() (string, string, error) {
505+
func (d *QuarkOpen) _refreshToken() (string, string, string, string, error) {
439506
if d.UseOnlineAPI && d.APIAddress != "" {
440507
u := d.APIAddress
441508
var resp RefreshTokenOnlineAPIResp
@@ -448,19 +515,20 @@ func (d *QuarkOpen) _refreshToken() (string, string, error) {
448515
}).
449516
Get(u)
450517
if err != nil {
451-
return "", "", err
518+
return "", "", "", "", err
452519
}
453520
if resp.RefreshToken == "" || resp.AccessToken == "" {
454521
if resp.ErrorMessage != "" {
455-
return "", "", fmt.Errorf("failed to refresh token: %s", resp.ErrorMessage)
522+
return "", "", "", "", fmt.Errorf("failed to refresh token: %s", resp.ErrorMessage)
456523
}
457-
return "", "", fmt.Errorf("empty token returned from official API, a wrong refresh token may have been used")
524+
return "", "", "", "", fmt.Errorf("empty token returned from official API, a wrong refresh token may have been used")
458525
}
459-
return resp.RefreshToken, resp.AccessToken, nil
526+
// 返回所有字段,包括AppID和SignKey
527+
return resp.RefreshToken, resp.AccessToken, resp.AppID, resp.SignKey, nil
460528
}
461529

462530
// TODO 本地刷新逻辑
463-
return "", "", fmt.Errorf("local refresh token logic is not implemented yet, please use online API or contact the developer")
531+
return "", "", "", "", fmt.Errorf("local refresh token logic is not implemented yet, please use online API or contact the developer")
464532
}
465533

466534
// 生成认证 Cookie

0 commit comments

Comments
 (0)