Skip to content
75 changes: 40 additions & 35 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenListTeam/OpenList/v4/internal/conf"
"io"
"math"
"os"
"sync"

"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/errs"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/rclone/rclone/lib/mmap"
"go4.org/readerutil"
)

Expand Down Expand Up @@ -179,47 +178,53 @@ func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
// 即使被写入的数据量与Buffer.Cap一致,Buffer也会扩大

func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
if maxCacheSize > int64(conf.MaxBufferLimit) {
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
if err != nil {
return nil, err
}
f.Add(tmpF)
f.tmpFile = tmpF
f.Reader = tmpF
return tmpF, nil
}
limit := int64(conf.MaxBufferLimit)
// TODO: 这里不会改,我写成了buf := make([]byte, 64<<10)的形式
//var buf []byte
//if conf.MmapThreshold > 0 && limit >= int64(conf.MmapThreshold) {
// m, err := mmap.Alloc(int(limit))
// if err == nil {
// f.Add(utils.CloseFunc(func() error {
// return mmap.Free(m)
// }))
// buf = m
// }
//}

if f.peekBuff == nil {
f.peekBuff = &buffer.Reader{}
f.oriReader = f.Reader
}
bufSize := maxCacheSize - int64(f.peekBuff.Len())
var buf []byte
if conf.MmapThreshold > 0 && bufSize >= int64(conf.MmapThreshold) {
m, err := mmap.Alloc(int(bufSize))
if err == nil {
f.Add(utils.CloseFunc(func() error {
return mmap.Free(m)
}))
buf = m
var readBytes int
// precache first `limit` byte
for int64(readBytes) < limit {
buf := make([]byte, 64<<10)
want := limit - int64(readBytes)
if want > int64(len(buf)) {
want = int64(len(buf))
}
n, err := f.oriReader.Read(buf[:want])
if n > 0 {
f.peekBuff.Append(buf[:n])
readBytes += n
}
if err == io.EOF {
f.Reader = f.peekBuff
f.oriReader = nil
// should update real file size here to solve `GetSize == 0` issue
f.size = int64(readBytes)
return f.peekBuff, nil
}
}
if buf == nil {
buf = make([]byte, bufSize)
}
n, err := io.ReadFull(f.oriReader, buf)
if bufSize != int64(n) {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
}
f.peekBuff.Append(buf)
if int64(f.peekBuff.Len()) >= f.GetSize() {
f.Reader = f.peekBuff
f.oriReader = nil
} else {
f.Reader = io.MultiReader(f.peekBuff, f.oriReader)
// if file is larger than MaxBufferLimit, fallback to disk
tmpF, err := utils.CreateTempFile(io.MultiReader(f.peekBuff, f.oriReader), f.GetSize())
if err != nil {
return nil, err
}
return f.peekBuff, nil
f.Add(tmpF)
f.tmpFile = tmpF
f.Reader = tmpF
return tmpF, nil
}

func (f *FileStream) SetTmpFile(file model.File) {
Expand Down