Skip to content

Commit cbbb5ad

Browse files
fix(stream): http chucked upload issue (#1152)
* fix(stream): http chucked upload issue * fix(stream): use MmapThreshold * fix(stream): improve caching mechanism and handle size=0 case * fix bug * fix(buffer): optimize ReadAt method for improved performance * fix(upload): handle Content-Length and File-Size headers for better size management * fix(189pc): 移除重复限速 * fix(upload): handle negative file size during streaming uploads * fix(upload): update header key from File-Size to X-File-Size for size retrieval --------- Co-authored-by: j2rong4cn <[email protected]>
1 parent c1d03c5 commit cbbb5ad

File tree

8 files changed

+207
-45
lines changed

8 files changed

+207
-45
lines changed

drivers/189pc/utils.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -785,8 +785,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
785785

786786
// step.4 上传切片
787787
uploadUrl := uploadUrls[0]
788-
_, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false,
789-
driver.NewLimitedUploadStream(ctx, rateLimitedRd), isFamily)
788+
_, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, rateLimitedRd, isFamily)
790789
if err != nil {
791790
return err
792791
}

internal/op/fs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,11 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file mod
630630
up = func(p float64) {}
631631
}
632632

633+
// 如果小于0,则通过缓存获取完整大小,可能发生于流式上传
634+
if file.GetSize() < 0 {
635+
log.Warnf("file size < 0, try to get full size from cache")
636+
file.CacheFullAndWriter(nil, nil)
637+
}
633638
switch s := storage.(type) {
634639
case driver.PutResult:
635640
var newObj model.Obj

internal/stream/stream.go

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,60 @@ func (f *FileStream) CacheFullAndWriter(up *model.UpdateProgress, writer io.Writ
137137
if writer != nil {
138138
reader = io.TeeReader(reader, writer)
139139
}
140+
141+
if f.GetSize() < 0 {
142+
if f.peekBuff == nil {
143+
f.peekBuff = &buffer.Reader{}
144+
}
145+
// 检查是否有数据
146+
buf := []byte{0}
147+
n, err := io.ReadFull(reader, buf)
148+
if n > 0 {
149+
f.peekBuff.Append(buf[:n])
150+
}
151+
if err == io.ErrUnexpectedEOF {
152+
f.size = f.peekBuff.Size()
153+
f.Reader = f.peekBuff
154+
return f.peekBuff, nil
155+
} else if err != nil {
156+
return nil, err
157+
}
158+
if conf.MaxBufferLimit-n > conf.MmapThreshold && conf.MmapThreshold > 0 {
159+
m, err := mmap.Alloc(conf.MaxBufferLimit - n)
160+
if err == nil {
161+
f.Add(utils.CloseFunc(func() error {
162+
return mmap.Free(m)
163+
}))
164+
n, err = io.ReadFull(reader, m)
165+
if n > 0 {
166+
f.peekBuff.Append(m[:n])
167+
}
168+
if err == io.ErrUnexpectedEOF {
169+
f.size = f.peekBuff.Size()
170+
f.Reader = f.peekBuff
171+
return f.peekBuff, nil
172+
} else if err != nil {
173+
return nil, err
174+
}
175+
}
176+
}
177+
178+
tmpF, err := utils.CreateTempFile(reader, 0)
179+
if err != nil {
180+
return nil, err
181+
}
182+
f.Add(utils.CloseFunc(func() error {
183+
return errors.Join(tmpF.Close(), os.RemoveAll(tmpF.Name()))
184+
}))
185+
peekF, err := buffer.NewPeekFile(f.peekBuff, tmpF)
186+
if err != nil {
187+
return nil, err
188+
}
189+
f.size = peekF.Size()
190+
f.Reader = peekF
191+
return peekF, nil
192+
}
193+
140194
f.Reader = reader
141195
return f.cache(f.GetSize())
142196
}
@@ -162,7 +216,7 @@ func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
162216
}
163217

164218
size := httpRange.Start + httpRange.Length
165-
if f.peekBuff != nil && size <= int64(f.peekBuff.Len()) {
219+
if f.peekBuff != nil && size <= int64(f.peekBuff.Size()) {
166220
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
167221
}
168222

@@ -194,7 +248,7 @@ func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
194248
f.peekBuff = &buffer.Reader{}
195249
f.oriReader = f.Reader
196250
}
197-
bufSize := maxCacheSize - int64(f.peekBuff.Len())
251+
bufSize := maxCacheSize - int64(f.peekBuff.Size())
198252
var buf []byte
199253
if conf.MmapThreshold > 0 && bufSize >= int64(conf.MmapThreshold) {
200254
m, err := mmap.Alloc(int(bufSize))
@@ -213,7 +267,7 @@ func (f *FileStream) cache(maxCacheSize int64) (model.File, error) {
213267
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", bufSize, n, err)
214268
}
215269
f.peekBuff.Append(buf)
216-
if int64(f.peekBuff.Len()) >= f.GetSize() {
270+
if int64(f.peekBuff.Size()) >= f.GetSize() {
217271
f.Reader = f.peekBuff
218272
f.oriReader = nil
219273
} else {

pkg/buffer/bytes.go

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,83 +8,86 @@ import (
88
// 用于存储不复用的[]byte
99
type Reader struct {
1010
bufs [][]byte
11-
length int
12-
offset int
11+
size int64
12+
offset int64
1313
}
1414

15-
func (r *Reader) Len() int {
16-
return r.length
15+
func (r *Reader) Size() int64 {
16+
return r.size
1717
}
1818

1919
func (r *Reader) Append(buf []byte) {
20-
r.length += len(buf)
20+
r.size += int64(len(buf))
2121
r.bufs = append(r.bufs, buf)
2222
}
2323

2424
func (r *Reader) Read(p []byte) (int, error) {
25-
n, err := r.ReadAt(p, int64(r.offset))
25+
n, err := r.ReadAt(p, r.offset)
2626
if n > 0 {
27-
r.offset += n
27+
r.offset += int64(n)
2828
}
2929
return n, err
3030
}
3131

3232
func (r *Reader) ReadAt(p []byte, off int64) (int, error) {
33-
if off < 0 || off >= int64(r.length) {
33+
if off < 0 || off >= r.size {
3434
return 0, io.EOF
3535
}
3636

37-
n, length := 0, int64(0)
37+
n := 0
3838
readFrom := false
3939
for _, buf := range r.bufs {
40-
newLength := length + int64(len(buf))
4140
if readFrom {
42-
w := copy(p[n:], buf)
43-
n += w
44-
} else if off < newLength {
41+
nn := copy(p[n:], buf)
42+
n += nn
43+
if n == len(p) {
44+
return n, nil
45+
}
46+
} else if newOff := off - int64(len(buf)); newOff >= 0 {
47+
off = newOff
48+
} else {
49+
nn := copy(p, buf[off:])
50+
if nn == len(p) {
51+
return nn, nil
52+
}
53+
n += nn
4554
readFrom = true
46-
w := copy(p[n:], buf[int(off-length):])
47-
n += w
4855
}
49-
if n == len(p) {
50-
return n, nil
51-
}
52-
length = newLength
5356
}
5457

5558
return n, io.EOF
5659
}
5760

5861
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
59-
var abs int
6062
switch whence {
6163
case io.SeekStart:
62-
abs = int(offset)
6364
case io.SeekCurrent:
64-
abs = r.offset + int(offset)
65+
offset = r.offset + offset
6566
case io.SeekEnd:
66-
abs = r.length + int(offset)
67+
offset = r.size + offset
6768
default:
6869
return 0, errors.New("Seek: invalid whence")
6970
}
7071

71-
if abs < 0 || abs > r.length {
72+
if offset < 0 || offset > r.size {
7273
return 0, errors.New("Seek: invalid offset")
7374
}
7475

75-
r.offset = abs
76-
return int64(abs), nil
76+
r.offset = offset
77+
return offset, nil
7778
}
7879

7980
func (r *Reader) Reset() {
8081
clear(r.bufs)
8182
r.bufs = nil
82-
r.length = 0
83+
r.size = 0
8384
r.offset = 0
8485
}
8586

8687
func NewReader(buf ...[]byte) *Reader {
87-
b := &Reader{}
88+
b := &Reader{
89+
bufs: make([][]byte, 0, len(buf)),
90+
}
8891
for _, b1 := range buf {
8992
b.Append(b1)
9093
}

pkg/buffer/bytes_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ func TestReader_ReadAt(t *testing.T) {
1313
}
1414
bs := &Reader{}
1515
bs.Append([]byte("github.com"))
16-
bs.Append([]byte("/"))
17-
bs.Append([]byte("OpenList"))
16+
bs.Append([]byte("/OpenList"))
1817
bs.Append([]byte("Team/"))
1918
bs.Append([]byte("OpenList"))
2019
tests := []struct {
@@ -71,7 +70,7 @@ func TestReader_ReadAt(t *testing.T) {
7170
off: 24,
7271
},
7372
want: func(a args, n int, err error) error {
74-
if n != bs.Len()-int(a.off) {
73+
if n != int(bs.Size()-a.off) {
7574
return errors.New("read length not match")
7675
}
7776
if string(a.p[:n]) != "OpenList" {

pkg/buffer/file.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package buffer
2+
3+
import (
4+
"errors"
5+
"io"
6+
"os"
7+
)
8+
9+
type PeekFile struct {
10+
peek *Reader
11+
file *os.File
12+
offset int64
13+
size int64
14+
}
15+
16+
func (p *PeekFile) Read(b []byte) (n int, err error) {
17+
n, err = p.ReadAt(b, p.offset)
18+
if n > 0 {
19+
p.offset += int64(n)
20+
}
21+
return n, err
22+
}
23+
24+
func (p *PeekFile) ReadAt(b []byte, off int64) (n int, err error) {
25+
if off < p.peek.Size() {
26+
n, err = p.peek.ReadAt(b, off)
27+
if err == nil || n == len(b) {
28+
return n, nil
29+
}
30+
// EOF
31+
}
32+
var nn int
33+
nn, err = p.file.ReadAt(b[n:], off+int64(n)-p.peek.Size())
34+
return n + nn, err
35+
}
36+
37+
func (p *PeekFile) Seek(offset int64, whence int) (int64, error) {
38+
switch whence {
39+
case io.SeekStart:
40+
case io.SeekCurrent:
41+
if offset == 0 {
42+
return p.offset, nil
43+
}
44+
offset = p.offset + offset
45+
case io.SeekEnd:
46+
offset = p.size + offset
47+
default:
48+
return 0, errors.New("Seek: invalid whence")
49+
}
50+
51+
if offset < 0 || offset > p.size {
52+
return 0, errors.New("Seek: invalid offset")
53+
}
54+
if offset <= p.peek.Size() {
55+
_, err := p.peek.Seek(offset, io.SeekStart)
56+
if err != nil {
57+
return 0, err
58+
}
59+
_, err = p.file.Seek(0, io.SeekStart)
60+
if err != nil {
61+
return 0, err
62+
}
63+
} else {
64+
_, err := p.peek.Seek(p.peek.Size(), io.SeekStart)
65+
if err != nil {
66+
return 0, err
67+
}
68+
_, err = p.file.Seek(offset-p.peek.Size(), io.SeekStart)
69+
if err != nil {
70+
return 0, err
71+
}
72+
}
73+
74+
p.offset = offset
75+
return offset, nil
76+
}
77+
78+
func (p *PeekFile) Size() int64 {
79+
return p.size
80+
}
81+
82+
func NewPeekFile(peek *Reader, file *os.File) (*PeekFile, error) {
83+
stat, err := file.Stat()
84+
if err == nil {
85+
return &PeekFile{peek: peek, file: file, size: stat.Size() + peek.Size()}, nil
86+
}
87+
return nil, err
88+
}

server/handles/fsup.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,17 @@ func FsStream(c *gin.Context) {
5656
}
5757
}
5858
dir, name := stdpath.Split(path)
59-
sizeStr := c.GetHeader("Content-Length")
60-
if sizeStr == "" {
61-
sizeStr = "0"
62-
}
63-
size, err := strconv.ParseInt(sizeStr, 10, 64)
64-
if err != nil {
65-
common.ErrorResp(c, err, 400)
66-
return
59+
// 如果请求头 Content-Length 和 X-File-Size 都没有,则 size=-1,表示未知大小的流式上传
60+
size := c.Request.ContentLength
61+
if size < 0 {
62+
sizeStr := c.GetHeader("X-File-Size")
63+
if sizeStr != "" {
64+
size, err = strconv.ParseInt(sizeStr, 10, 64)
65+
if err != nil {
66+
common.ErrorResp(c, err, 400)
67+
return
68+
}
69+
}
6770
}
6871
h := make(map[*utils.HashType]string)
6972
if md5 := c.GetHeader("X-File-Md5"); md5 != "" {

server/webdav/webdav.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/url"
1515
"os"
1616
"path"
17+
"strconv"
1718
"strings"
1819
"time"
1920

@@ -341,9 +342,19 @@ func (h *Handler) handlePut(w http.ResponseWriter, r *http.Request) (status int,
341342
if err != nil {
342343
return http.StatusForbidden, err
343344
}
345+
size := r.ContentLength
346+
if size < 0 {
347+
sizeStr := r.Header.Get("X-File-Size")
348+
if sizeStr != "" {
349+
size, err = strconv.ParseInt(sizeStr, 10, 64)
350+
if err != nil {
351+
return http.StatusBadRequest, err
352+
}
353+
}
354+
}
344355
obj := model.Object{
345356
Name: path.Base(reqPath),
346-
Size: r.ContentLength,
357+
Size: size,
347358
Modified: h.getModTime(r),
348359
Ctime: h.getCreateTime(r),
349360
}

0 commit comments

Comments
 (0)