Skip to content

Commit 25ad4cc

Browse files
author
cyk
committed
refactor(stream): 移除过时的链接刷新逻辑,添加自愈读取器以处理0字节读取
1 parent ddf9c2f commit 25ad4cc

File tree

2 files changed

+90
-51
lines changed

2 files changed

+90
-51
lines changed

internal/stream/stream.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,6 @@ func (ss *SeekableStream) generateReader() error {
346346
return nil
347347
}
348348

349-
// ForceRefreshLink 实现 LinkRefresher 接口,用于在读取失败时刷新链接
350-
func (ss *SeekableStream) ForceRefreshLink(ctx context.Context) bool {
351-
if rr, ok := ss.rangeReader.(*RefreshableRangeReader); ok {
352-
return rr.ForceRefresh(ctx)
353-
}
354-
return false
355-
}
356-
357349
func (ss *SeekableStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writer) (model.File, error) {
358350
if err := ss.generateReader(); err != nil {
359351
return nil, err

internal/stream/util.go

Lines changed: 90 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,6 @@ func (f RangeReaderFunc) RangeRead(ctx context.Context, httpRange http_range.Ran
3838
return f(ctx, httpRange)
3939
}
4040

41-
// LinkRefresher 接口用于在读取数据失败时强制刷新链接
42-
type LinkRefresher interface {
43-
// ForceRefreshLink 强制刷新下载链接
44-
// 返回 true 表示刷新成功,false 表示无法刷新
45-
ForceRefreshLink(ctx context.Context) bool
46-
}
47-
4841
// IsLinkExpiredError checks if the error indicates an expired download link
4942
func IsLinkExpiredError(err error) bool {
5043
if err == nil {
@@ -131,7 +124,16 @@ func (r *RefreshableRangeReader) RangeRead(ctx context.Context, httpRange http_r
131124
}
132125
}
133126

134-
return rc, nil
127+
// Wrap the ReadCloser with self-healing capability to detect 0-byte reads
128+
// This handles cases where cloud providers return 200 OK but empty body for expired links
129+
return &selfHealingReadCloser{
130+
ReadCloser: rc,
131+
refresher: r,
132+
ctx: ctx,
133+
httpRange: httpRange,
134+
firstRead: false,
135+
closed: false,
136+
}, nil
135137
}
136138

137139
func (r *RefreshableRangeReader) refreshAndRetry(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
@@ -149,19 +151,6 @@ func (r *RefreshableRangeReader) refreshAndRetry(ctx context.Context, httpRange
149151
return reader.RangeRead(ctx, httpRange)
150152
}
151153

152-
// ForceRefresh 强制刷新链接,用于读取数据失败(如读取 0 字节)的情况
153-
// 返回 true 表示刷新成功,false 表示无法刷新(没有 Refresher 或达到最大刷新次数)
154-
func (r *RefreshableRangeReader) ForceRefresh(ctx context.Context) bool {
155-
if r.link.Refresher == nil {
156-
return false
157-
}
158-
159-
r.mu.Lock()
160-
defer r.mu.Unlock()
161-
162-
return r.doRefreshLocked(ctx) == nil
163-
}
164-
165154
// doRefreshLocked 执行实际的刷新逻辑(需要持有锁)
166155
func (r *RefreshableRangeReader) doRefreshLocked(ctx context.Context) error {
167156
if r.refreshCount >= MAX_LINK_REFRESH_COUNT {
@@ -183,6 +172,84 @@ func (r *RefreshableRangeReader) doRefreshLocked(ctx context.Context) error {
183172
return nil
184173
}
185174

175+
// selfHealingReadCloser wraps an io.ReadCloser and automatically refreshes the link
176+
// if it detects 0-byte reads (common with expired links from some cloud providers)
177+
type selfHealingReadCloser struct {
178+
io.ReadCloser
179+
refresher *RefreshableRangeReader
180+
ctx context.Context
181+
httpRange http_range.Range
182+
firstRead bool
183+
closed bool
184+
mu sync.Mutex
185+
}
186+
187+
func (s *selfHealingReadCloser) Read(p []byte) (n int, err error) {
188+
s.mu.Lock()
189+
defer s.mu.Unlock()
190+
191+
if s.closed {
192+
return 0, errors.New("read from closed reader")
193+
}
194+
195+
n, err = s.ReadCloser.Read(p)
196+
197+
// Detect 0-byte read on first attempt (indicates link may be expired but returned 200 OK)
198+
if !s.firstRead && n == 0 && (err == io.EOF || err == io.ErrUnexpectedEOF) {
199+
s.firstRead = true
200+
log.Warnf("Detected 0-byte read on first attempt, attempting to refresh link...")
201+
202+
// Try to refresh the link
203+
s.refresher.mu.Lock()
204+
refreshErr := s.refresher.doRefreshLocked(s.ctx)
205+
s.refresher.mu.Unlock()
206+
207+
if refreshErr != nil {
208+
log.Errorf("Failed to refresh link after 0-byte read: %v", refreshErr)
209+
return n, err
210+
}
211+
212+
// Close old connection
213+
s.ReadCloser.Close()
214+
215+
// Get new reader and retry
216+
s.refresher.mu.Lock()
217+
reader, getErr := s.refresher.getInnerReader()
218+
s.refresher.mu.Unlock()
219+
220+
if getErr != nil {
221+
log.Errorf("Failed to get inner reader after refresh: %v", getErr)
222+
return n, err
223+
}
224+
225+
newRc, rangeErr := reader.RangeRead(s.ctx, s.httpRange)
226+
if rangeErr != nil {
227+
log.Errorf("Failed to create new range reader after refresh: %v", rangeErr)
228+
return n, err
229+
}
230+
231+
s.ReadCloser = newRc
232+
log.Infof("Successfully refreshed link and reconnected after 0-byte read")
233+
234+
// Retry read with new connection
235+
return s.ReadCloser.Read(p)
236+
}
237+
238+
s.firstRead = true
239+
return n, err
240+
}
241+
242+
func (s *selfHealingReadCloser) Close() error {
243+
s.mu.Lock()
244+
defer s.mu.Unlock()
245+
246+
if s.closed {
247+
return nil
248+
}
249+
s.closed = true
250+
return s.ReadCloser.Close()
251+
}
252+
186253
func GetRangeReaderFromLink(size int64, link *model.Link) (model.RangeReaderIF, error) {
187254
// If link has a Refresher, wrap with RefreshableRangeReader for automatic refresh on expiry
188255
if link.Refresher != nil {
@@ -341,7 +408,7 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
341408
// off: 读取的起始偏移量
342409
// 返回值: 实际读取的字节数和错误
343410
// 支持自动重试(最多5次),快速重试策略(1秒、2秒、3秒、4秒、5秒)
344-
// 支持链接刷新:当检测到 0 字节读取时,会自动刷新下载链接
411+
// 注意:链接刷新现在由 RefreshableRangeReader 内部的 selfHealingReadCloser 自动处理
345412
func ReadFullWithRangeRead(file model.FileStreamer, buf []byte, off int64) (int, error) {
346413
length := int64(len(buf))
347414
var lastErr error
@@ -369,28 +436,8 @@ func ReadFullWithRangeRead(file model.FileStreamer, buf []byte, off int64) (int,
369436
lastErr = fmt.Errorf("failed to read all data via RangeRead at offset %d: (expect=%d, actual=%d) %w", off, length, n, err)
370437
log.Debugf("RangeRead retry %d read failed: %v", retry+1, lastErr)
371438

372-
// 检测是否可能是链接过期(读取 0 字节或 EOF)
373-
if n == 0 && (err == io.EOF || err == io.ErrUnexpectedEOF) {
374-
// 尝试刷新链接
375-
if refresher, ok := file.(LinkRefresher); ok {
376-
// 获取 context - 从 FileStream 或 SeekableStream 中获取
377-
var ctx context.Context
378-
if fs, ok := file.(*FileStream); ok {
379-
ctx = fs.Ctx
380-
} else if ss, ok := file.(*SeekableStream); ok {
381-
ctx = ss.Ctx
382-
} else {
383-
ctx = context.Background()
384-
}
385-
386-
if refresher.ForceRefreshLink(ctx) {
387-
log.Infof("Link refreshed after 0-byte read, retrying immediately...")
388-
continue // 立即重试,不延迟
389-
}
390-
}
391-
}
392-
393439
// 快速重试:1秒、2秒、3秒、4秒、5秒(读取失败快速重试)
440+
// 注意:0字节读取导致的链接过期现在由 selfHealingReadCloser 自动处理
394441
time.Sleep(time.Duration(retry+1) * time.Second)
395442
}
396443

0 commit comments

Comments
 (0)