Skip to content

Commit f4056db

Browse files
committed
feat(fs): implement slice upload
1 parent 115e3da commit f4056db

File tree

1 file changed

+35
-11
lines changed

1 file changed

+35
-11
lines changed

internal/fs/fs.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,27 @@ func Preup(c context.Context, s driver.Driver, actualPath string, req *reqres.Pr
302302
type sliceup struct {
303303
*tables.SliceUpload
304304
tmpFile *os.File
305-
*sync.Mutex
305+
sync.Mutex
306306
}
307307

308308
// 分片上传缓存
309309
var sliceupMap = sync.Map{}
310310

311+
type sliceWriter struct {
312+
file *os.File
313+
offset int64
314+
}
315+
316+
// Write implements io.Writer interface
317+
// 虽然每个分片都定义了一个sliceWriter
318+
// 但是Write方法会在同一个分片复制过程中多次调用,
319+
// 所以要更新自身的offset
320+
func (sw *sliceWriter) Write(p []byte) (int, error) {
321+
n, err := sw.file.WriteAt(p, sw.offset)
322+
sw.offset += int64(n)
323+
return n, err
324+
}
325+
311326
// UploadSlice 上传切片,第一个分片必须先上传
312327
func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadSliceReq, file multipart.File) error {
313328
var msu *sliceup
@@ -335,12 +350,14 @@ func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadS
335350
}
336351
}()
337352

353+
// 检查分片是否已上传过
354+
if tables.IsSliceUploaded(msu.SliceUploadStatus, int(req.SliceNum)) {
355+
log.Warnf("slice already uploaded,req:%+v", req)
356+
return nil
357+
}
358+
338359
if req.SliceHash != "" {
339360
sliceHash := []string{} // 分片hash
340-
if tables.IsSliceUploaded(msu.SliceUploadStatus, int(req.SliceNum)) {
341-
log.Warnf("slice already uploaded,req:%+v", req)
342-
return nil
343-
}
344361

345362
//验证分片hash值
346363
if req.SliceNum == 0 { //第一个分片,slicehash是所有的分片hash
@@ -376,15 +393,18 @@ func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadS
376393
}
377394

378395
default: //其他网盘先缓存到本地
396+
msu.Lock()
379397
if msu.TmpFile == "" {
380398
tf, err := os.CreateTemp(conf.Conf.TempDir, "file-*")
381399
if err != nil {
400+
msu.Unlock()
382401
log.Error("CreateTemp error", req, err)
383402
return err
384403
}
385404
abspath := tf.Name() //这里返回的是绝对路径
386405
err = os.Truncate(abspath, int64(msu.Size))
387406
if err != nil {
407+
msu.Unlock()
388408
log.Error("Truncate error", req, err)
389409
return err
390410
}
@@ -394,19 +414,22 @@ func UploadSlice(ctx context.Context, storage driver.Driver, req *reqres.UploadS
394414
if msu.tmpFile == nil {
395415
msu.tmpFile, err = os.OpenFile(msu.TmpFile, os.O_RDWR, 0644)
396416
if err != nil {
417+
msu.Unlock()
397418
log.Error("OpenFile error", req, msu.TmpFile, err)
398419
return err
399420
}
400421
}
422+
msu.Unlock()
401423

402-
content, err := io.ReadAll(file) //这里一次性读取全部,如果并发较多,可能会占用较多内存
403-
if err != nil {
404-
log.Error("ReadAll error", req, err)
405-
return err
424+
// 流式复制,减少内存占用
425+
sw := &sliceWriter{
426+
file: msu.tmpFile,
427+
offset: int64(req.SliceNum) * int64(msu.SliceSize),
406428
}
407-
_, err = msu.tmpFile.WriteAt(content, int64(req.SliceNum)*int64(msu.SliceSize))
429+
_, err := io.Copy(sw, file)
430+
408431
if err != nil {
409-
log.Error("WriteAt error", req, err)
432+
log.Error("Copy error", req, err)
410433
return err
411434
}
412435
}
@@ -524,6 +547,7 @@ func SliceUpComplete(ctx context.Context, storage driver.Driver, uploadID uint)
524547
log.Error("Put error", msu.SliceUpload, err)
525548
return nil, err
526549
}
550+
os.Remove(msu.TmpFile)
527551
return &reqres.UploadSliceCompleteResp{
528552
Complete: 1,
529553
UploadID: msu.ID,

0 commit comments

Comments
 (0)