Skip to content

Commit 95869e3

Browse files
author
cyk
committed
feat(stream): add readFullWithRangeReadFallback function for improved data reading
1 parent afcee86 commit 95869e3

File tree

1 file changed

+69
-23
lines changed

1 file changed

+69
-23
lines changed

internal/stream/util.go

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"net/http"
1111
"os"
12+
"time"
1213

1314
"github.com/OpenListTeam/OpenList/v4/internal/conf"
1415
"github.com/OpenListTeam/OpenList/v4/internal/errs"
@@ -174,6 +175,45 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
174175
return tmpF, hex.EncodeToString(h.Sum(nil)), nil
175176
}
176177

178+
// readFullWithRangeRead 使用 RangeRead 从文件流中读取数据到 buf
179+
// file: 文件流
180+
// buf: 目标缓冲区
181+
// off: 读取的起始偏移量
182+
// 返回值: 实际读取的字节数和错误
183+
// 支持自动重试(最多3次),每次重试之间有递增延迟(3秒、6秒、9秒)
184+
func readFullWithRangeRead(file model.FileStreamer, buf []byte, off int64) (int, error) {
185+
length := int64(len(buf))
186+
var lastErr error
187+
188+
// 重试最多3次
189+
for retry := 0; retry < 3; retry++ {
190+
reader, err := file.RangeRead(http_range.Range{Start: off, Length: length})
191+
if err != nil {
192+
lastErr = fmt.Errorf("RangeRead failed at offset %d: %w", off, err)
193+
log.Debugf("RangeRead retry %d failed: %v", retry+1, lastErr)
194+
// 递增延迟:3秒、6秒、9秒,等待代理恢复
195+
time.Sleep(time.Duration(retry+1) * 3 * time.Second)
196+
continue
197+
}
198+
199+
n, err := io.ReadFull(reader, buf)
200+
if closer, ok := reader.(io.Closer); ok {
201+
closer.Close()
202+
}
203+
204+
if err == nil {
205+
return n, nil
206+
}
207+
208+
lastErr = fmt.Errorf("failed to read all data via RangeRead at offset %d: (expect=%d, actual=%d) %w", off, length, n, err)
209+
log.Debugf("RangeRead retry %d read failed: %v", retry+1, lastErr)
210+
// 递增延迟:3秒、6秒、9秒,等待网络恢复
211+
time.Sleep(time.Duration(retry+1) * 3 * time.Second)
212+
}
213+
214+
return 0, lastErr
215+
}
216+
177217
// StreamHashFile 流式计算文件哈希值,避免将整个文件加载到内存
178218
// file: 文件流
179219
// hashType: 哈希算法类型
@@ -197,36 +237,28 @@ func StreamHashFile(file model.FileStreamer, hashType *utils.HashType, progressW
197237
hashFunc := hashType.NewFunc()
198238
size := file.GetSize()
199239
chunkSize := int64(10 * 1024 * 1024) // 10MB per chunk
240+
buf := make([]byte, chunkSize)
200241
var offset int64 = 0
201-
const maxRetries = 3
242+
202243
for offset < size {
203244
readSize := chunkSize
204245
if size-offset < chunkSize {
205246
readSize = size - offset
206247
}
207248

208-
var lastErr error
209-
for retry := 0; retry < maxRetries; retry++ {
210-
reader, err := file.RangeRead(http_range.Range{Start: offset, Length: readSize})
249+
// 首先尝试顺序流读取(不消耗额外资源,适用于所有流类型)
250+
n, err := io.ReadFull(file, buf[:readSize])
251+
if err != nil {
252+
// 顺序流读取失败,尝试使用 RangeRead 重试(适用于 SeekableStream)
253+
log.Warnf("StreamHashFile: sequential read failed at offset %d, retrying with RangeRead: %v", offset, err)
254+
n, err = readFullWithRangeRead(file, buf[:readSize], offset)
211255
if err != nil {
212-
lastErr = fmt.Errorf("range read for hash calculation failed: %w", err)
213-
continue
214-
}
215-
_, err = io.Copy(hashFunc, reader)
216-
if closer, ok := reader.(io.Closer); ok {
217-
closer.Close()
256+
return "", fmt.Errorf("calculate hash failed at offset %d: sequential read and RangeRead both failed: %w", offset, err)
218257
}
219-
if err == nil {
220-
lastErr = nil
221-
break
222-
}
223-
lastErr = fmt.Errorf("calculate hash failed at offset %d: %w", offset, err)
224-
}
225-
if lastErr != nil {
226-
return "", lastErr
227258
}
228259

229-
offset += readSize
260+
hashFunc.Write(buf[:n])
261+
offset += int64(n)
230262

231263
if up != nil && progressWeight > 0 {
232264
progress := progressWeight * float64(offset) / float64(size)
@@ -381,12 +413,26 @@ func (ss *directSectionReader) GetSectionReader(off, length int64) (io.ReadSeeke
381413
}
382414
tempBuf := ss.bufPool.Get()
383415
buf := tempBuf[:length]
416+
417+
// 首先尝试顺序流读取(不消耗额外资源,适用于所有流类型)
418+
// 对于 FileStream,RangeRead 会消耗底层 oriReader,所以必须先尝试顺序流读取
384419
n, err := io.ReadFull(ss.file, buf)
385-
ss.fileOffset += int64(n)
386-
if int64(n) != length {
387-
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
420+
if err == nil {
421+
ss.fileOffset = off + int64(n)
422+
return &bufferSectionReader{bytes.NewReader(buf), tempBuf}, nil
388423
}
389-
return &bufferSectionReader{bytes.NewReader(buf), buf}, nil
424+
425+
// 顺序流读取失败,尝试使用 RangeRead 重试(适用于 SeekableStream)
426+
log.Debugf("Sequential read failed at offset %d, retrying with RangeRead: %v", off, err)
427+
n, err = readFullWithRangeRead(ss.file, buf, off)
428+
if err != nil {
429+
ss.bufPool.Put(tempBuf)
430+
return nil, fmt.Errorf("both sequential read and RangeRead failed at offset %d: (expect=%d, actual=%d) %w", off, length, n, err)
431+
}
432+
433+
// 更新 fileOffset
434+
ss.fileOffset = off + int64(n)
435+
return &bufferSectionReader{bytes.NewReader(buf), tempBuf}, nil
390436
}
391437
func (ss *directSectionReader) FreeSectionReader(rs io.ReadSeeker) {
392438
if sr, ok := rs.(*bufferSectionReader); ok {

0 commit comments

Comments
 (0)