Skip to content

Commit 2edc446

Browse files
authored
feat(stream): support using temporary files as large buffer (#1399)
feat(stream): refactor StreamSectionReader to support using temporary files as large buffer
1 parent c3c7983 commit 2edc446

File tree

7 files changed

+138
-50
lines changed

7 files changed

+138
-50
lines changed

drivers/123/upload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
124124
if cur == chunkCount {
125125
curSize = lastChunkSize
126126
}
127-
var reader *stream.SectionReader
127+
var reader io.ReadSeeker
128128
var rateLimitedRd io.Reader
129129
threadG.GoWithLifecycle(errgroup.Lifecycle{
130130
Before: func(ctx context.Context) error {

drivers/123_open/upload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
6767
partNumber := partIndex + 1 // 分片号从1开始
6868
offset := partIndex * chunkSize
6969
size := min(chunkSize, size-offset)
70-
var reader *stream.SectionReader
70+
var reader io.ReadSeeker
7171
var rateLimitedRd io.Reader
7272
sliceMD5 := ""
7373
// 表单

drivers/189pc/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
752752
partSize = lastPartSize
753753
}
754754
partInfo := ""
755-
var reader *stream.SectionReader
755+
var reader io.ReadSeeker
756756
var rateLimitedRd io.Reader
757757
threadG.GoWithLifecycle(errgroup.Lifecycle{
758758
Before: func(ctx context.Context) error {

drivers/doubao/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ func (d *Doubao) UploadByMultipart(ctx context.Context, config *UploadConfig, fi
577577
if partIndex == totalParts-1 {
578578
size = fileSize - offset
579579
}
580-
var reader *stream.SectionReader
580+
var reader io.ReadSeeker
581581
var rateLimitedRd io.Reader
582582
crc32Value := ""
583583
threadG.GoWithLifecycle(errgroup.Lifecycle{

drivers/mediafire/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ func (d *Mediafire) uploadUnits(ctx context.Context, file model.FileStreamer, ch
467467
size = fileSize - start
468468
}
469469

470-
var reader *stream.SectionReader
470+
var reader io.ReadSeeker
471471
var rateLimitedRd io.Reader
472472
var unitHash string
473473

drivers/teldrive/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package teldrive
22

33
import (
44
"context"
5+
"io"
56
"time"
67

78
"github.com/OpenListTeam/OpenList/v4/internal/model"
@@ -50,8 +51,8 @@ type chunkTask struct {
5051
chunkIdx int
5152
fileName string
5253
chunkSize int64
53-
reader *stream.SectionReader
54-
ss *stream.StreamSectionReader
54+
reader io.ReadSeeker
55+
ss stream.StreamSectionReaderIF
5556
}
5657

5758
type CopyManager struct {

internal/stream/util.go

Lines changed: 130 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"net/http"
11+
"os"
1112

1213
"github.com/OpenListTeam/OpenList/v4/internal/conf"
1314
"github.com/OpenListTeam/OpenList/v4/internal/model"
@@ -151,32 +152,58 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
151152
return tmpF, hex.EncodeToString(h.Sum(nil)), nil
152153
}
153154

154-
type StreamSectionReader struct {
155-
file model.FileStreamer
156-
off int64
157-
bufPool *pool.Pool[[]byte]
155+
type StreamSectionReaderIF interface {
156+
// 线程不安全
157+
GetSectionReader(off, length int64) (io.ReadSeeker, error)
158+
FreeSectionReader(sr io.ReadSeeker)
159+
// 线程不安全
160+
DiscardSection(off int64, length int64) error
158161
}
159162

160-
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) {
161-
ss := &StreamSectionReader{file: file}
163+
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (StreamSectionReaderIF, error) {
162164
if file.GetFile() != nil {
163-
return ss, nil
165+
return &cachedSectionReader{file.GetFile()}, nil
164166
}
165167

166168
maxBufferSize = min(maxBufferSize, int(file.GetSize()))
167169
if maxBufferSize > conf.MaxBufferLimit {
168-
_, err := file.CacheFullAndWriter(up, nil)
170+
f, err := os.CreateTemp(conf.Conf.TempDir, "file-*")
169171
if err != nil {
170172
return nil, err
171173
}
174+
175+
if f.Truncate((file.GetSize()+int64(maxBufferSize-1))/int64(maxBufferSize)*int64(maxBufferSize)) != nil {
176+
// fallback to full cache
177+
_, _ = f.Close(), os.Remove(f.Name())
178+
cache, err := file.CacheFullAndWriter(up, nil)
179+
if err != nil {
180+
return nil, err
181+
}
182+
return &cachedSectionReader{cache}, nil
183+
}
184+
185+
ss := &fileSectionReader{Reader: file, temp: f}
186+
ss.bufPool = &pool.Pool[*offsetWriterWithBase]{
187+
New: func() *offsetWriterWithBase {
188+
base := ss.fileOff
189+
ss.fileOff += int64(maxBufferSize)
190+
return &offsetWriterWithBase{io.NewOffsetWriter(ss.temp, base), base}
191+
},
192+
}
193+
file.Add(utils.CloseFunc(func() error {
194+
ss.bufPool.Reset()
195+
return errors.Join(ss.temp.Close(), os.Remove(ss.temp.Name()))
196+
}))
172197
return ss, nil
173198
}
199+
200+
ss := &directSectionReader{file: file}
174201
if conf.MmapThreshold > 0 && maxBufferSize >= conf.MmapThreshold {
175202
ss.bufPool = &pool.Pool[[]byte]{
176203
New: func() []byte {
177204
buf, err := mmap.Alloc(maxBufferSize)
178205
if err == nil {
179-
file.Add(utils.CloseFunc(func() error {
206+
ss.file.Add(utils.CloseFunc(func() error {
180207
return mmap.Free(buf)
181208
}))
182209
} else {
@@ -200,53 +227,113 @@ func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *mode
200227
return ss, nil
201228
}
202229

230+
type cachedSectionReader struct {
231+
cache io.ReaderAt
232+
}
233+
234+
func (*cachedSectionReader) DiscardSection(off int64, length int64) error {
235+
return nil
236+
}
237+
func (s *cachedSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
238+
return io.NewSectionReader(s.cache, off, length), nil
239+
}
240+
func (*cachedSectionReader) FreeSectionReader(sr io.ReadSeeker) {}
241+
242+
type fileSectionReader struct {
243+
io.Reader
244+
off int64
245+
temp *os.File
246+
fileOff int64
247+
bufPool *pool.Pool[*offsetWriterWithBase]
248+
}
249+
250+
type offsetWriterWithBase struct {
251+
*io.OffsetWriter
252+
base int64
253+
}
254+
203255
// 线程不安全
204-
func (ss *StreamSectionReader) DiscardSection(off int64, length int64) error {
205-
if ss.file.GetFile() == nil {
206-
if off != ss.off {
207-
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
208-
}
209-
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
210-
if err != nil {
211-
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
212-
}
256+
func (ss *fileSectionReader) DiscardSection(off int64, length int64) error {
257+
if off != ss.off {
258+
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
259+
}
260+
_, err := utils.CopyWithBufferN(io.Discard, ss.Reader, length)
261+
if err != nil {
262+
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
213263
}
214264
ss.off += length
215265
return nil
216266
}
217267

218-
// 线程不安全
219-
func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionReader, error) {
220-
var cache io.ReaderAt = ss.file.GetFile()
221-
var buf []byte
222-
if cache == nil {
223-
if off != ss.off {
224-
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
225-
}
226-
tempBuf := ss.bufPool.Get()
227-
buf = tempBuf[:length]
228-
n, err := io.ReadFull(ss.file, buf)
229-
if int64(n) != length {
230-
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
231-
}
232-
ss.off += int64(n)
233-
off = 0
234-
cache = bytes.NewReader(buf)
268+
type fileBufferSectionReader struct {
269+
io.ReadSeeker
270+
fileBuf *offsetWriterWithBase
271+
}
272+
273+
func (ss *fileSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
274+
if off != ss.off {
275+
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
276+
}
277+
fileBuf := ss.bufPool.Get()
278+
_, _ = fileBuf.Seek(0, io.SeekStart)
279+
n, err := utils.CopyWithBufferN(fileBuf, ss.Reader, length)
280+
if err != nil {
281+
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
235282
}
236-
return &SectionReader{io.NewSectionReader(cache, off, length), buf}, nil
283+
ss.off += length
284+
return &fileBufferSectionReader{io.NewSectionReader(ss.temp, fileBuf.base, length), fileBuf}, nil
237285
}
238286

239-
func (ss *StreamSectionReader) FreeSectionReader(sr *SectionReader) {
240-
if sr != nil {
241-
if sr.buf != nil {
242-
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
243-
sr.buf = nil
244-
}
287+
func (ss *fileSectionReader) FreeSectionReader(rs io.ReadSeeker) {
288+
if sr, ok := rs.(*fileBufferSectionReader); ok {
289+
ss.bufPool.Put(sr.fileBuf)
290+
sr.fileBuf = nil
245291
sr.ReadSeeker = nil
246292
}
247293
}
248294

249-
type SectionReader struct {
295+
type directSectionReader struct {
296+
file model.FileStreamer
297+
off int64
298+
bufPool *pool.Pool[[]byte]
299+
}
300+
301+
// 线程不安全
302+
func (ss *directSectionReader) DiscardSection(off int64, length int64) error {
303+
if off != ss.off {
304+
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
305+
}
306+
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
307+
if err != nil {
308+
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
309+
}
310+
ss.off += length
311+
return nil
312+
}
313+
314+
type bufferSectionReader struct {
250315
io.ReadSeeker
251316
buf []byte
252317
}
318+
319+
// 线程不安全
320+
func (ss *directSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
321+
if off != ss.off {
322+
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
323+
}
324+
tempBuf := ss.bufPool.Get()
325+
buf := tempBuf[:length]
326+
n, err := io.ReadFull(ss.file, buf)
327+
if int64(n) != length {
328+
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
329+
}
330+
ss.off += int64(n)
331+
return &bufferSectionReader{bytes.NewReader(buf), buf}, nil
332+
}
333+
func (ss *directSectionReader) FreeSectionReader(rs io.ReadSeeker) {
334+
if sr, ok := rs.(*bufferSectionReader); ok {
335+
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
336+
sr.buf = nil
337+
sr.ReadSeeker = nil
338+
}
339+
}

0 commit comments

Comments
 (0)