Skip to content

Commit 0c8fbb5

Browse files
author
cyk
committed
feat(upload): Quark增强上传功能,支持URL过期错误处理和多哈希计算优化
1 parent 201d49f commit 0c8fbb5

File tree

2 files changed

+109
-28
lines changed

2 files changed

+109
-28
lines changed

drivers/quark_open/driver.go

Lines changed: 103 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"hash"
99
"io"
1010
"net/http"
11+
"strings"
1112
"time"
1213

1314
"github.com/OpenListTeam/OpenList/v4/drivers/base"
@@ -18,6 +19,7 @@ import (
1819
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1920
"github.com/avast/retry-go"
2021
"github.com/go-resty/resty/v2"
22+
log "github.com/sirupsen/logrus"
2123
)
2224

2325
type QuarkOpen struct {
@@ -144,30 +146,84 @@ func (d *QuarkOpen) Remove(ctx context.Context, obj model.Obj) error {
144146

145147
func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
146148
md5Str, sha1Str := stream.GetHash().GetHash(utils.MD5), stream.GetHash().GetHash(utils.SHA1)
147-
var (
148-
md5 hash.Hash
149-
sha1 hash.Hash
150-
)
151-
writers := []io.Writer{}
152-
if len(md5Str) != utils.MD5.Width {
153-
md5 = utils.MD5.NewFunc()
154-
writers = append(writers, md5)
155-
}
156-
if len(sha1Str) != utils.SHA1.Width {
157-
sha1 = utils.SHA1.NewFunc()
158-
writers = append(writers, sha1)
159-
}
160149

161-
if len(writers) > 0 {
162-
_, err := stream.CacheFullAndWriter(&up, io.MultiWriter(writers...))
163-
if err != nil {
164-
return err
165-
}
166-
if md5 != nil {
167-
md5Str = hex.EncodeToString(md5.Sum(nil))
168-
}
169-
if sha1 != nil {
170-
sha1Str = hex.EncodeToString(sha1.Sum(nil))
150+
// 检查是否需要计算hash
151+
needMD5 := len(md5Str) != utils.MD5.Width
152+
needSHA1 := len(sha1Str) != utils.SHA1.Width
153+
154+
if needMD5 || needSHA1 {
155+
// 检查是否为可重复读取的流
156+
_, isSeekable := stream.(*streamPkg.SeekableStream)
157+
158+
if isSeekable {
159+
// 可重复读取的流,使用 RangeRead 一次性计算所有hash,避免重复读取
160+
var md5 hash.Hash
161+
var sha1 hash.Hash
162+
writers := []io.Writer{}
163+
164+
if needMD5 {
165+
md5 = utils.MD5.NewFunc()
166+
writers = append(writers, md5)
167+
}
168+
if needSHA1 {
169+
sha1 = utils.SHA1.NewFunc()
170+
writers = append(writers, sha1)
171+
}
172+
173+
// 使用 RangeRead 分块读取文件,同时计算多个hash
174+
multiWriter := io.MultiWriter(writers...)
175+
size := stream.GetSize()
176+
chunkSize := int64(10 * utils.MB) // 10MB per chunk
177+
buf := make([]byte, chunkSize)
178+
var offset int64 = 0
179+
180+
for offset < size {
181+
readSize := min(chunkSize, size-offset)
182+
183+
n, err := streamPkg.ReadFullWithRangeRead(stream, buf[:readSize], offset)
184+
if err != nil {
185+
return fmt.Errorf("calculate hash failed at offset %d: %w", offset, err)
186+
}
187+
188+
multiWriter.Write(buf[:n])
189+
offset += int64(n)
190+
191+
// 更新进度(hash计算占用40%的进度)
192+
up(40 * float64(offset) / float64(size))
193+
}
194+
195+
if md5 != nil {
196+
md5Str = hex.EncodeToString(md5.Sum(nil))
197+
}
198+
if sha1 != nil {
199+
sha1Str = hex.EncodeToString(sha1.Sum(nil))
200+
}
201+
} else {
202+
// 不可重复读取的流(如网络流),需要缓存并计算hash
203+
var md5 hash.Hash
204+
var sha1 hash.Hash
205+
writers := []io.Writer{}
206+
207+
if needMD5 {
208+
md5 = utils.MD5.NewFunc()
209+
writers = append(writers, md5)
210+
}
211+
if needSHA1 {
212+
sha1 = utils.SHA1.NewFunc()
213+
writers = append(writers, sha1)
214+
}
215+
216+
_, err := stream.CacheFullAndWriter(&up, io.MultiWriter(writers...))
217+
if err != nil {
218+
return err
219+
}
220+
221+
if md5 != nil {
222+
md5Str = hex.EncodeToString(md5.Sum(nil))
223+
}
224+
if sha1 != nil {
225+
sha1Str = hex.EncodeToString(sha1.Sum(nil))
226+
}
171227
}
172228
}
173229
// pre
@@ -210,24 +266,43 @@ func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.File
210266
if err != nil {
211267
return err
212268
}
269+
270+
// 上传重试逻辑,包含URL刷新
271+
var etag string
213272
err = retry.Do(func() error {
214273
rd.Seek(0, io.SeekStart)
215-
etag, err := d.upPart(ctx, upUrlInfo, i, driver.NewLimitedUploadStream(ctx, rd))
216-
if err != nil {
217-
return err
274+
var uploadErr error
275+
etag, uploadErr = d.upPart(ctx, upUrlInfo, i, driver.NewLimitedUploadStream(ctx, rd))
276+
277+
// 检查是否为URL过期错误
278+
if uploadErr != nil && strings.Contains(uploadErr.Error(), "expire") {
279+
log.Warnf("[quark_open] Upload URL expired for part %d, refreshing...", i)
280+
// 刷新上传URL
281+
newUpUrlInfo, refreshErr := d.upUrl(ctx, pre, partInfo)
282+
if refreshErr != nil {
283+
return fmt.Errorf("failed to refresh upload url: %w", refreshErr)
284+
}
285+
upUrlInfo = newUpUrlInfo
286+
log.Infof("[quark_open] Upload URL refreshed successfully")
287+
288+
// 使用新URL重试上传
289+
rd.Seek(0, io.SeekStart)
290+
etag, uploadErr = d.upPart(ctx, upUrlInfo, i, driver.NewLimitedUploadStream(ctx, rd))
218291
}
219-
etags = append(etags, etag)
220-
return nil
292+
293+
return uploadErr
221294
},
222295
retry.Context(ctx),
223296
retry.Attempts(3),
224297
retry.DelayType(retry.BackOffDelay),
225298
retry.Delay(time.Second))
299+
226300
ss.FreeSectionReader(rd)
227301
if err != nil {
228302
return fmt.Errorf("failed to upload part %d: %w", i, err)
229303
}
230304

305+
etags = append(etags, etag)
231306
up(95 * float64(offset+size) / float64(total))
232307
}
233308

drivers/quark_open/util.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,12 @@ func (d *QuarkOpen) upPart(ctx context.Context, upUrlInfo UpUrlInfo, partNumber
347347
}
348348
defer resp.Body.Close()
349349

350+
// 检查是否为URL过期错误(403, 410等状态码)
351+
if resp.StatusCode == 403 || resp.StatusCode == 410 {
352+
body, _ := io.ReadAll(resp.Body)
353+
return "", fmt.Errorf("upload url expired (status: %d): %s", resp.StatusCode, string(body))
354+
}
355+
350356
if resp.StatusCode != 200 {
351357
body, _ := io.ReadAll(resp.Body)
352358
return "", fmt.Errorf("up status: %d, error: %s", resp.StatusCode, string(body))

0 commit comments

Comments
 (0)