Skip to content

Commit d7628c5

Browse files
committed
fix(teldrive): single upload wrong chunk allocate
1 parent e192149 commit d7628c5

File tree

2 files changed

+16
-202
lines changed

2 files changed

+16
-202
lines changed

drivers/teldrive/driver.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,11 +183,7 @@ func (d *Teldrive) Put(ctx context.Context, dstDir model.Obj, file model.FileStr
183183
return d.touch(file.GetName(), dstDir.GetPath())
184184
}
185185

186-
if totalParts <= 1 {
187-
return d.doSingleUpload(ctx, dstDir, file, up, totalParts, chunkSize, fileId)
188-
}
189-
190-
return d.doMultiUpload(ctx, dstDir, file, up, maxRetried, totalParts, chunkSize, fileId)
186+
return d.doSingleUpload(ctx, dstDir, file, up, maxRetried, totalParts, chunkSize, fileId)
191187
}
192188

193189
func (d *Teldrive) GetArchiveMeta(ctx context.Context, obj model.Obj, args model.ArchiveArgs) (model.ArchiveMeta, error) {

drivers/teldrive/upload.go

Lines changed: 15 additions & 197 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"fmt"
55
"io"
66
"net/http"
7-
"sort"
87
"strconv"
9-
"sync"
108
"time"
119

1210
"github.com/OpenListTeam/OpenList/v4/drivers/base"
@@ -16,10 +14,7 @@ import (
1614
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1715
"github.com/avast/retry-go"
1816
"github.com/go-resty/resty/v2"
19-
"github.com/pkg/errors"
2017
"golang.org/x/net/context"
21-
"golang.org/x/sync/errgroup"
22-
"golang.org/x/sync/semaphore"
2318
)
2419

2520
// create empty file
@@ -101,12 +96,10 @@ func (d *Teldrive) getFilePart(fileId string) ([]FilePart, error) {
10196
return uploadedParts, nil
10297
}
10398

104-
func (d *Teldrive) singleUploadRequest(fileId string, callback base.ReqCallback, resp interface{}) error {
99+
func (d *Teldrive) singleUploadRequest(ctx context.Context, fileId string, callback base.ReqCallback, resp interface{}) error {
105100
url := d.Address + "/api/uploads/" + fileId
106101
client := resty.New().SetTimeout(0)
107102

108-
ctx := context.Background()
109-
110103
req := client.R().
111104
SetContext(ctx)
112105
req.SetHeader("Cookie", d.Cookie)
@@ -135,16 +128,17 @@ func (d *Teldrive) singleUploadRequest(fileId string, callback base.ReqCallback,
135128
}
136129

137130
func (d *Teldrive) doSingleUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress,
138-
totalParts int, chunkSize int64, fileId string) error {
131+
maxRetried, totalParts int, chunkSize int64, fileId string) error {
139132

140133
totalSize := file.GetSize()
141134
var fileParts []FilePart
142135
var uploaded int64 = 0
143-
ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up)
136+
chunkSize = min(totalSize, chunkSize)
137+
ss, err := stream.NewStreamSectionReader(file, int(chunkSize), &up)
144138
if err != nil {
145139
return err
146140
}
147-
141+
chunkCnt := 0
148142
for uploaded < totalSize {
149143
if utils.IsCanceled(ctx) {
150144
return ctx.Err()
@@ -155,19 +149,18 @@ func (d *Teldrive) doSingleUpload(ctx context.Context, dstDir model.Obj, file mo
155149
return err
156150
}
157151
filePart := &FilePart{}
152+
chunkCnt += 1
158153
if err := retry.Do(func() error {
159-
160154
if _, err := rd.Seek(0, io.SeekStart); err != nil {
161155
return err
162156
}
163-
164-
if err := d.singleUploadRequest(fileId, func(req *resty.Request) {
157+
if err := d.singleUploadRequest(ctx, fileId, func(req *resty.Request) {
165158
uploadParams := map[string]string{
166159
"partName": func() string {
167-
digits := len(strconv.Itoa(totalParts))
168-
return file.GetName() + fmt.Sprintf(".%0*d", digits, 1)
160+
digits := len(strconv.Itoa(chunkCnt))
161+
return file.GetName() + fmt.Sprintf(".%0*d", digits, chunkCnt)
169162
}(),
170-
"partNo": strconv.Itoa(1),
163+
"partNo": strconv.Itoa(chunkCnt),
171164
"fileName": file.GetName(),
172165
}
173166
req.SetQueryParams(uploadParams)
@@ -176,199 +169,24 @@ func (d *Teldrive) doSingleUpload(ctx context.Context, dstDir model.Obj, file mo
176169
}, filePart); err != nil {
177170
return err
178171
}
179-
180172
return nil
181173
},
182174
retry.Context(ctx),
183-
retry.Attempts(3),
175+
retry.Attempts(uint(maxRetried)),
184176
retry.DelayType(retry.BackOffDelay),
185177
retry.Delay(time.Second)); err != nil {
186178
return err
187179
}
188-
189180
if filePart.Name != "" {
190181
fileParts = append(fileParts, *filePart)
191182
uploaded += curChunkSize
192-
up(float64(uploaded) / float64(totalSize))
183+
up(float64(uploaded) / float64(totalSize) * 100)
193184
ss.FreeSectionReader(rd)
185+
} else {
186+
// For common situation this code won't reach
187+
return fmt.Errorf("[Teldrive] upload chunk %d failed: filePart Somehow missing", chunkCnt)
194188
}
195-
196189
}
197190

198191
return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize)
199192
}
200-
201-
func (d *Teldrive) doMultiUpload(ctx context.Context, dstDir model.Obj, file model.FileStreamer, up model.UpdateProgress,
202-
maxRetried, totalParts int, chunkSize int64, fileId string) error {
203-
204-
concurrent := d.UploadConcurrency
205-
g, ctx := errgroup.WithContext(ctx)
206-
sem := semaphore.NewWeighted(int64(concurrent))
207-
chunkChan := make(chan chunkTask, concurrent*2)
208-
resultChan := make(chan FilePart, concurrent)
209-
totalSize := file.GetSize()
210-
211-
ss, err := stream.NewStreamSectionReader(file, int(totalSize), &up)
212-
if err != nil {
213-
return err
214-
}
215-
ssLock := sync.Mutex{}
216-
g.Go(func() error {
217-
defer close(chunkChan)
218-
219-
chunkIdx := 0
220-
for chunkIdx < totalParts {
221-
select {
222-
case <-ctx.Done():
223-
return ctx.Err()
224-
default:
225-
}
226-
227-
offset := int64(chunkIdx) * chunkSize
228-
curChunkSize := min(totalSize-offset, chunkSize)
229-
230-
ssLock.Lock()
231-
reader, err := ss.GetSectionReader(offset, curChunkSize)
232-
ssLock.Unlock()
233-
234-
if err != nil {
235-
return err
236-
}
237-
task := chunkTask{
238-
chunkIdx: chunkIdx + 1,
239-
chunkSize: curChunkSize,
240-
fileName: file.GetName(),
241-
reader: reader,
242-
ss: ss,
243-
}
244-
// freeSectionReader will be called in d.uploadSingleChunk
245-
select {
246-
case chunkChan <- task:
247-
chunkIdx++
248-
case <-ctx.Done():
249-
return ctx.Err()
250-
}
251-
}
252-
return nil
253-
})
254-
for i := 0; i < int(concurrent); i++ {
255-
g.Go(func() error {
256-
for task := range chunkChan {
257-
if err := sem.Acquire(ctx, 1); err != nil {
258-
return err
259-
}
260-
261-
filePart, err := d.uploadSingleChunk(ctx, fileId, task, totalParts, maxRetried)
262-
sem.Release(1)
263-
264-
if err != nil {
265-
return fmt.Errorf("upload chunk %d failed: %w", task.chunkIdx, err)
266-
}
267-
268-
select {
269-
case resultChan <- *filePart:
270-
case <-ctx.Done():
271-
return ctx.Err()
272-
}
273-
}
274-
return nil
275-
})
276-
}
277-
var fileParts []FilePart
278-
var collectErr error
279-
collectDone := make(chan struct{})
280-
281-
go func() {
282-
defer close(collectDone)
283-
fileParts = make([]FilePart, 0, totalParts)
284-
285-
done := make(chan error, 1)
286-
go func() {
287-
done <- g.Wait()
288-
close(resultChan)
289-
}()
290-
291-
for {
292-
select {
293-
case filePart, ok := <-resultChan:
294-
if !ok {
295-
collectErr = <-done
296-
return
297-
}
298-
fileParts = append(fileParts, filePart)
299-
case err := <-done:
300-
collectErr = err
301-
return
302-
}
303-
}
304-
}()
305-
306-
<-collectDone
307-
308-
if collectErr != nil {
309-
return fmt.Errorf("multi-upload failed: %w", collectErr)
310-
}
311-
sort.Slice(fileParts, func(i, j int) bool {
312-
return fileParts[i].PartNo < fileParts[j].PartNo
313-
})
314-
315-
return d.createFileOnUploadSuccess(file.GetName(), fileId, dstDir.GetPath(), fileParts, totalSize)
316-
}
317-
318-
func (d *Teldrive) uploadSingleChunk(ctx context.Context, fileId string, task chunkTask, totalParts, maxRetried int) (*FilePart, error) {
319-
filePart := &FilePart{}
320-
retryCount := 0
321-
defer task.ss.FreeSectionReader(task.reader)
322-
323-
for {
324-
select {
325-
case <-ctx.Done():
326-
return nil, ctx.Err()
327-
default:
328-
}
329-
330-
if existingPart, err := d.checkFilePartExist(fileId, task.chunkIdx); err == nil && existingPart.Name != "" {
331-
return &existingPart, nil
332-
}
333-
334-
err := d.singleUploadRequest(fileId, func(req *resty.Request) {
335-
uploadParams := map[string]string{
336-
"partName": func() string {
337-
digits := len(strconv.Itoa(totalParts))
338-
return task.fileName + fmt.Sprintf(".%0*d", digits, task.chunkIdx)
339-
}(),
340-
"partNo": strconv.Itoa(task.chunkIdx),
341-
"fileName": task.fileName,
342-
}
343-
req.SetQueryParams(uploadParams)
344-
req.SetBody(driver.NewLimitedUploadStream(ctx, task.reader))
345-
req.SetHeader("Content-Length", strconv.Itoa(int(task.chunkSize)))
346-
}, filePart)
347-
348-
if err == nil {
349-
return filePart, nil
350-
}
351-
352-
if retryCount >= maxRetried {
353-
return nil, fmt.Errorf("upload failed after %d retries: %w", maxRetried, err)
354-
}
355-
356-
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
357-
continue
358-
}
359-
360-
retryCount++
361-
utils.Log.Errorf("[Teldrive] upload error: %v, retrying %d times", err, retryCount)
362-
363-
backoffDuration := time.Duration(retryCount*retryCount) * time.Second
364-
if backoffDuration > 30*time.Second {
365-
backoffDuration = 30 * time.Second
366-
}
367-
368-
select {
369-
case <-time.After(backoffDuration):
370-
case <-ctx.Done():
371-
return nil, ctx.Err()
372-
}
373-
}
374-
}

0 commit comments

Comments
 (0)