88 "hash"
99 "io"
1010 "net/http"
11+ "strings"
1112 "time"
1213
1314 "github.com/OpenListTeam/OpenList/v4/drivers/base"
@@ -18,6 +19,7 @@ import (
1819 "github.com/OpenListTeam/OpenList/v4/pkg/utils"
1920 "github.com/avast/retry-go"
2021 "github.com/go-resty/resty/v2"
22+ log "github.com/sirupsen/logrus"
2123)
2224
2325type QuarkOpen struct {
@@ -144,30 +146,84 @@ func (d *QuarkOpen) Remove(ctx context.Context, obj model.Obj) error {
144146
145147func (d * QuarkOpen ) Put (ctx context.Context , dstDir model.Obj , stream model.FileStreamer , up driver.UpdateProgress ) error {
146148 md5Str , sha1Str := stream .GetHash ().GetHash (utils .MD5 ), stream .GetHash ().GetHash (utils .SHA1 )
147- var (
148- md5 hash.Hash
149- sha1 hash.Hash
150- )
151- writers := []io.Writer {}
152- if len (md5Str ) != utils .MD5 .Width {
153- md5 = utils .MD5 .NewFunc ()
154- writers = append (writers , md5 )
155- }
156- if len (sha1Str ) != utils .SHA1 .Width {
157- sha1 = utils .SHA1 .NewFunc ()
158- writers = append (writers , sha1 )
159- }
160149
161- if len (writers ) > 0 {
162- _ , err := stream .CacheFullAndWriter (& up , io .MultiWriter (writers ... ))
163- if err != nil {
164- return err
165- }
166- if md5 != nil {
167- md5Str = hex .EncodeToString (md5 .Sum (nil ))
168- }
169- if sha1 != nil {
170- sha1Str = hex .EncodeToString (sha1 .Sum (nil ))
150+ // 检查是否需要计算hash
151+ needMD5 := len (md5Str ) != utils .MD5 .Width
152+ needSHA1 := len (sha1Str ) != utils .SHA1 .Width
153+
154+ if needMD5 || needSHA1 {
155+ // 检查是否为可重复读取的流
156+ _ , isSeekable := stream .(* streamPkg.SeekableStream )
157+
158+ if isSeekable {
159+ // 可重复读取的流,使用 RangeRead 一次性计算所有hash,避免重复读取
160+ var md5 hash.Hash
161+ var sha1 hash.Hash
162+ writers := []io.Writer {}
163+
164+ if needMD5 {
165+ md5 = utils .MD5 .NewFunc ()
166+ writers = append (writers , md5 )
167+ }
168+ if needSHA1 {
169+ sha1 = utils .SHA1 .NewFunc ()
170+ writers = append (writers , sha1 )
171+ }
172+
173+ // 使用 RangeRead 分块读取文件,同时计算多个hash
174+ multiWriter := io .MultiWriter (writers ... )
175+ size := stream .GetSize ()
176+ chunkSize := int64 (10 * utils .MB ) // 10MB per chunk
177+ buf := make ([]byte , chunkSize )
178+ var offset int64 = 0
179+
180+ for offset < size {
181+ readSize := min (chunkSize , size - offset )
182+
183+ n , err := streamPkg .ReadFullWithRangeRead (stream , buf [:readSize ], offset )
184+ if err != nil {
185+ return fmt .Errorf ("calculate hash failed at offset %d: %w" , offset , err )
186+ }
187+
188+ multiWriter .Write (buf [:n ])
189+ offset += int64 (n )
190+
191+ // 更新进度(hash计算占用40%的进度)
192+ up (40 * float64 (offset ) / float64 (size ))
193+ }
194+
195+ if md5 != nil {
196+ md5Str = hex .EncodeToString (md5 .Sum (nil ))
197+ }
198+ if sha1 != nil {
199+ sha1Str = hex .EncodeToString (sha1 .Sum (nil ))
200+ }
201+ } else {
202+ // 不可重复读取的流(如网络流),需要缓存并计算hash
203+ var md5 hash.Hash
204+ var sha1 hash.Hash
205+ writers := []io.Writer {}
206+
207+ if needMD5 {
208+ md5 = utils .MD5 .NewFunc ()
209+ writers = append (writers , md5 )
210+ }
211+ if needSHA1 {
212+ sha1 = utils .SHA1 .NewFunc ()
213+ writers = append (writers , sha1 )
214+ }
215+
216+ _ , err := stream .CacheFullAndWriter (& up , io .MultiWriter (writers ... ))
217+ if err != nil {
218+ return err
219+ }
220+
221+ if md5 != nil {
222+ md5Str = hex .EncodeToString (md5 .Sum (nil ))
223+ }
224+ if sha1 != nil {
225+ sha1Str = hex .EncodeToString (sha1 .Sum (nil ))
226+ }
171227 }
172228 }
173229 // pre
@@ -210,24 +266,43 @@ func (d *QuarkOpen) Put(ctx context.Context, dstDir model.Obj, stream model.File
210266 if err != nil {
211267 return err
212268 }
269+
270+ // 上传重试逻辑,包含URL刷新
271+ var etag string
213272 err = retry .Do (func () error {
214273 rd .Seek (0 , io .SeekStart )
215- etag , err := d .upPart (ctx , upUrlInfo , i , driver .NewLimitedUploadStream (ctx , rd ))
216- if err != nil {
217- return err
274+ var uploadErr error
275+ etag , uploadErr = d .upPart (ctx , upUrlInfo , i , driver .NewLimitedUploadStream (ctx , rd ))
276+
277+ // 检查是否为URL过期错误
278+ if uploadErr != nil && strings .Contains (uploadErr .Error (), "expire" ) {
279+ log .Warnf ("[quark_open] Upload URL expired for part %d, refreshing..." , i )
280+ // 刷新上传URL
281+ newUpUrlInfo , refreshErr := d .upUrl (ctx , pre , partInfo )
282+ if refreshErr != nil {
283+ return fmt .Errorf ("failed to refresh upload url: %w" , refreshErr )
284+ }
285+ upUrlInfo = newUpUrlInfo
286+ log .Infof ("[quark_open] Upload URL refreshed successfully" )
287+
288+ // 使用新URL重试上传
289+ rd .Seek (0 , io .SeekStart )
290+ etag , uploadErr = d .upPart (ctx , upUrlInfo , i , driver .NewLimitedUploadStream (ctx , rd ))
218291 }
219- etags = append ( etags , etag )
220- return nil
292+
293+ return uploadErr
221294 },
222295 retry .Context (ctx ),
223296 retry .Attempts (3 ),
224297 retry .DelayType (retry .BackOffDelay ),
225298 retry .Delay (time .Second ))
299+
226300 ss .FreeSectionReader (rd )
227301 if err != nil {
228302 return fmt .Errorf ("failed to upload part %d: %w" , i , err )
229303 }
230304
305+ etags = append (etags , etag )
231306 up (95 * float64 (offset + size ) / float64 (total ))
232307 }
233308
0 commit comments