Skip to content

Commit 70b44fa

Browse files
committed
fix(stream): improve caching mechanism and handle size=0 case
1 parent d6dd66c commit 70b44fa

File tree

4 files changed

+190
-57
lines changed

4 files changed

+190
-57
lines changed

internal/stream/stream.go

Lines changed: 84 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"github.com/OpenListTeam/OpenList/v4/internal/conf"
8-
"github.com/rclone/rclone/lib/mmap"
97
"io"
108
"math"
119
"os"
1210
"sync"
1311

12+
"github.com/OpenListTeam/OpenList/v4/internal/conf"
1413
"github.com/OpenListTeam/OpenList/v4/internal/errs"
1514
"github.com/OpenListTeam/OpenList/v4/internal/model"
1615
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
1716
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1817
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
18+
"github.com/rclone/rclone/lib/mmap"
1919
"go4.org/readerutil"
2020
)
2121

@@ -137,6 +137,61 @@ func (f *FileStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writ
137137
if writer != nil {
138138
reader = io.TeeReader(reader, writer)
139139
}
140+
141+
if f.GetSize() == 0 {
142+
if f.peekBuff == nil {
143+
f.peekBuff = &buffer.Reader{}
144+
}
145+
// 检查是否有数据
146+
buf := make([]byte, 64*utils.KB)
147+
n, err := io.ReadFull(reader, buf)
148+
if err == io.ErrUnexpectedEOF {
149+
if n > 0 {
150+
f.peekBuff.Append(buf[:n])
151+
}
152+
f.size = f.peekBuff.Size()
153+
f.Reader = f.peekBuff
154+
return f.peekBuff, nil
155+
} else if err != nil {
156+
return nil, err
157+
}
158+
f.peekBuff.Append(buf[:n])
159+
if conf.MaxBufferLimit-n > conf.MmapThreshold && conf.MmapThreshold > 0 {
160+
m, err := mmap.Alloc(conf.MaxBufferLimit - n)
161+
if err == nil {
162+
f.Add(utils.CloseFunc(func() error {
163+
return mmap.Free(m)
164+
}))
165+
n, err = io.ReadFull(reader, m)
166+
if err == io.ErrUnexpectedEOF {
167+
if n > 0 {
168+
f.peekBuff.Append(m[:n])
169+
}
170+
f.size = f.peekBuff.Size()
171+
f.Reader = f.peekBuff
172+
return f.peekBuff, nil
173+
} else if err != nil {
174+
return nil, err
175+
}
176+
}
177+
}
178+
179+
tmpF, err := utils.CreateTempFile(reader, 0)
180+
if err != nil {
181+
return nil, err
182+
}
183+
f.Add(utils.CloseFunc(func() error {
184+
return errors.Join(tmpF.Close(), os.RemoveAll(tmpF.Name()))
185+
}))
186+
peekF, err := buffer.NewPeekFile(f.peekBuff, tmpF)
187+
if err != nil {
188+
return nil, err
189+
}
190+
f.size = peekF.Size()
191+
f.Reader = peekF
192+
return peekF, nil
193+
}
194+
140195
f.Reader = reader
141196
return f.cache(f.GetSize())
142197
}
@@ -162,7 +217,7 @@ func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
162217
}
163218

164219
size := httpRange.Start + httpRange.Length
165-
if f.peekBuff != nil && size <= int64(f.peekBuff.Len()) {
220+
if f.peekBuff != nil && size <= int64(f.peekBuff.Size()) {
166221
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
167222
}
168223

@@ -179,57 +234,47 @@ func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
179234
// 即使被写入的数据量与Buffer.Cap一致,Buffer也会扩大
180235

181236
func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
182-
limit := int64(conf.MaxBufferLimit)
237+
if maxCacheSize > int64(conf.MaxBufferLimit) {
238+
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
239+
if err != nil {
240+
return nil, err
241+
}
242+
f.Add(tmpF)
243+
f.tmpFile = tmpF
244+
f.Reader = tmpF
245+
return tmpF, nil
246+
}
183247

184248
if f.peekBuff == nil {
185249
f.peekBuff = &buffer.Reader{}
186250
f.oriReader = f.Reader
187251
}
252+
bufSize := maxCacheSize - int64(f.peekBuff.Size())
188253
var buf []byte
189-
bufSize := 64 << 10 // 64KB as default
190-
if conf.MmapThreshold > 0 && bufSize >= conf.MmapThreshold {
191-
m, err := mmap.Alloc(bufSize)
254+
if conf.MmapThreshold > 0 && bufSize >= int64(conf.MmapThreshold) {
255+
m, err := mmap.Alloc(int(bufSize))
192256
if err == nil {
193257
f.Add(utils.CloseFunc(func() error {
194258
return mmap.Free(m)
195259
}))
196260
buf = m
197261
}
198262
}
199-
200-
var readBytes int
201-
// precache first `limit` byte
202-
for int64(readBytes) < limit {
203-
if buf == nil {
204-
buf = make([]byte, bufSize)
205-
}
206-
207-
want := limit - int64(readBytes)
208-
if want > int64(len(buf)) {
209-
want = int64(len(buf))
210-
}
211-
n, err := f.oriReader.Read(buf[:want])
212-
if n > 0 {
213-
f.peekBuff.Append(buf[:n])
214-
readBytes += n
215-
}
216-
if err == io.EOF {
217-
f.Reader = f.peekBuff
218-
f.oriReader = nil
219-
// should update real file size here to solve `GetSize == 0` issue
220-
f.size = int64(readBytes)
221-
return f.peekBuff, nil
222-
}
263+
if buf == nil {
264+
buf = make([]byte, bufSize)
223265
}
224-
// if file is larger than MaxBufferLimit, fallback to disk
225-
tmpF, err := utils.CreateTempFile(io.MultiReader(f.peekBuff, f.oriReader), f.GetSize())
226-
if err != nil {
227-
return nil, err
266+
n, err := io.ReadFull(f.oriReader, buf)
267+
if bufSize != int64(n) {
268+
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
269+
}
270+
f.peekBuff.Append(buf)
271+
if int64(f.peekBuff.Size()) >= f.GetSize() {
272+
f.Reader = f.peekBuff
273+
f.oriReader = nil
274+
} else {
275+
f.Reader = io.MultiReader(f.peekBuff, f.oriReader)
228276
}
229-
f.Add(tmpF)
230-
f.tmpFile = tmpF
231-
f.Reader = tmpF
232-
return tmpF, nil
277+
return f.peekBuff, nil
233278
}
234279

235280
func (f *FileStream) SetTmpFile(file model.File) {

pkg/buffer/bytes.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,29 @@ import (
88
// 用于存储不复用的[]byte
99
type Reader struct {
1010
bufs [][]byte
11-
length int
12-
offset int
11+
size int64
12+
offset int64
1313
}
1414

15-
func (r *Reader) Len() int {
16-
return r.length
15+
func (r *Reader) Size() int64 {
16+
return r.size
1717
}
1818

1919
func (r *Reader) Append(buf []byte) {
20-
r.length += len(buf)
20+
r.size += int64(len(buf))
2121
r.bufs = append(r.bufs, buf)
2222
}
2323

2424
func (r *Reader) Read(p []byte) (int, error) {
25-
n, err := r.ReadAt(p, int64(r.offset))
25+
n, err := r.ReadAt(p, r.offset)
2626
if n > 0 {
27-
r.offset += n
27+
r.offset += int64(n)
2828
}
2929
return n, err
3030
}
3131

3232
func (r *Reader) ReadAt(p []byte, off int64) (int, error) {
33-
if off < 0 || off >= int64(r.length) {
33+
if off < 0 || off >= r.size {
3434
return 0, io.EOF
3535
}
3636

@@ -56,35 +56,35 @@ func (r *Reader) ReadAt(p []byte, off int64) (int, error) {
5656
}
5757

5858
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
59-
var abs int
6059
switch whence {
6160
case io.SeekStart:
62-
abs = int(offset)
6361
case io.SeekCurrent:
64-
abs = r.offset + int(offset)
62+
offset = r.offset + offset
6563
case io.SeekEnd:
66-
abs = r.length + int(offset)
64+
offset = r.size + offset
6765
default:
6866
return 0, errors.New("Seek: invalid whence")
6967
}
7068

71-
if abs < 0 || abs > r.length {
69+
if offset < 0 || offset > r.size {
7270
return 0, errors.New("Seek: invalid offset")
7371
}
7472

75-
r.offset = abs
76-
return int64(abs), nil
73+
r.offset = offset
74+
return offset, nil
7775
}
7876

7977
func (r *Reader) Reset() {
8078
clear(r.bufs)
8179
r.bufs = nil
82-
r.length = 0
80+
r.size = 0
8381
r.offset = 0
8482
}
8583

8684
func NewReader(buf ...[]byte) *Reader {
87-
b := &Reader{}
85+
b := &Reader{
86+
bufs: make([][]byte, 0, len(buf)),
87+
}
8888
for _, b1 := range buf {
8989
b.Append(b1)
9090
}

pkg/buffer/bytes_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestReader_ReadAt(t *testing.T) {
7171
off: 24,
7272
},
7373
want: func(a args, n int, err error) error {
74-
if n != bs.Len()-int(a.off) {
74+
if n != int(bs.Size()-a.off) {
7575
return errors.New("read length not match")
7676
}
7777
if string(a.p[:n]) != "OpenList" {

pkg/buffer/file.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package buffer
2+
3+
import (
4+
"errors"
5+
"io"
6+
"os"
7+
)
8+
9+
type PeekFile struct {
10+
peek *Reader
11+
file *os.File
12+
offset int64
13+
size int64
14+
}
15+
16+
func (p *PeekFile) Read(b []byte) (n int, err error) {
17+
n, err = p.ReadAt(b, p.offset)
18+
if n > 0 {
19+
p.offset += int64(n)
20+
}
21+
return n, err
22+
}
23+
24+
func (p *PeekFile) ReadAt(b []byte, off int64) (n int, err error) {
25+
if off < p.peek.Size() {
26+
n, err = p.peek.ReadAt(b, off)
27+
if err == nil || n == len(b) {
28+
return n, nil
29+
}
30+
// EOF
31+
}
32+
var nn int
33+
nn, err = p.file.ReadAt(b[n:], off+int64(n)-p.peek.Size())
34+
return n + nn, err
35+
}
36+
37+
func (p *PeekFile) Seek(offset int64, whence int) (int64, error) {
38+
switch whence {
39+
case io.SeekStart:
40+
case io.SeekCurrent:
41+
if offset == 0 {
42+
return p.offset, nil
43+
}
44+
offset = p.offset + offset
45+
case io.SeekEnd:
46+
offset = p.size + offset
47+
default:
48+
return 0, errors.New("Seek: invalid whence")
49+
}
50+
51+
if offset < 0 || offset > p.size {
52+
return 0, errors.New("Seek: invalid offset")
53+
}
54+
if offset <= p.peek.Size() {
55+
_, err := p.peek.Seek(offset, io.SeekStart)
56+
if err != nil {
57+
return 0, err
58+
}
59+
_, err = p.file.Seek(0, io.SeekStart)
60+
if err != nil {
61+
return 0, err
62+
}
63+
} else {
64+
_, err := p.peek.Seek(p.peek.Size(), io.SeekStart)
65+
if err != nil {
66+
return 0, err
67+
}
68+
_, err = p.file.Seek(offset-p.peek.Size(), io.SeekStart)
69+
if err != nil {
70+
return 0, err
71+
}
72+
}
73+
74+
p.offset = offset
75+
return offset, nil
76+
}
77+
78+
func (p *PeekFile) Size() int64 {
79+
return p.size
80+
}
81+
82+
func NewPeekFile(peek *Reader, file *os.File) (*PeekFile, error) {
83+
stat, err := file.Stat()
84+
if err == nil {
85+
return &PeekFile{peek: peek, file: file, size: stat.Size() + peek.Size()}, nil
86+
}
87+
return nil, err
88+
}

0 commit comments

Comments
 (0)