Skip to content

Commit d465da4

Browse files
authored
fix(ftp-server): cannot get obj in uploading state inconsistency window (#1293)
* fix(ftp-server): cannot get obj in uploading state inconsistency window * fix * fix: duplicate obj when upload completed but client access does not * fix * feat: support stat remove and move
1 parent 84ed487 commit d465da4

File tree

9 files changed

+360
-26
lines changed

9 files changed

+360
-26
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ require (
6060
github.com/spf13/cobra v1.9.1
6161
github.com/stretchr/testify v1.10.0
6262
github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5
63+
github.com/tchap/go-patricia/v2 v2.3.3
6364
github.com/u2takey/ffmpeg-go v0.5.0
6465
github.com/upyun/go-sdk/v3 v3.0.4
6566
github.com/winfsp/cgofuse v1.6.0
@@ -254,7 +255,7 @@ require (
254255
github.com/yusufpapurcu/wmi v1.2.4 // indirect
255256
go.etcd.io/bbolt v1.4.0 // indirect
256257
golang.org/x/arch v0.18.0 // indirect
257-
golang.org/x/sync v0.16.0 // indirect
258+
golang.org/x/sync v0.16.0
258259
golang.org/x/sys v0.34.0 // indirect
259260
golang.org/x/term v0.33.0 // indirect
260261
golang.org/x/text v0.27.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,8 @@ github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 h1:Sa+sR8aaAMFwxh
624624
github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5/go.mod h1:UdZiFUFu6e2WjjtjxivwXWcwc1N/8zgbkBR9QNucUOY=
625625
github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543 h1:6Y51mutOvRGRx6KqyMNo//xk8B8o6zW9/RVmy1VamOs=
626626
github.com/taruti/bytepool v0.0.0-20160310082835-5e3a9ea56543/go.mod h1:jpwqYA8KUVEvSUJHkCXsnBRJCSKP1BMa81QZ6kvRpow=
627+
github.com/tchap/go-patricia/v2 v2.3.3 h1:xfNEsODumaEcCcY3gI0hYPZ/PcpVv5ju6RMAhgwZDDc=
628+
github.com/tchap/go-patricia/v2 v2.3.3/go.mod h1:VZRHKAb53DLaG+nA9EaYYiaEx6YztwDlLElMsnSHD4k=
627629
github.com/tklauser/go-sysconf v0.3.15 h1:VE89k0criAymJ/Os65CSn1IXaol+1wrsFHEB8Ol49K4=
628630
github.com/tklauser/go-sysconf v0.3.15/go.mod h1:Dmjwr6tYFIseJw7a3dRLJfsHAMXZ3nEnL/aZY+0IuI4=
629631
github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfjso=

server/ftp.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type FtpMainDriver struct {
3232
}
3333

3434
func NewMainDriver() (*FtpMainDriver, error) {
35+
ftp.InitStage()
3536
transferType := ftpserver.TransferTypeASCII
3637
if conf.Conf.FTP.DefaultTransferBinary {
3738
transferType = ftpserver.TransferTypeBinary

server/ftp/afero.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package ftp
33
import (
44
"context"
55
"errors"
6+
"fmt"
7+
"io"
68
"os"
79
"strings"
810
"time"
@@ -97,6 +99,23 @@ func (a *AferoAdapter) GetHandle(name string, flags int, offset int64) (ftpserve
9799
if err != nil {
98100
return nil, err
99101
}
102+
if f, err := Borrow(a.ctx, path); !errors.Is(err, errs.ObjectNotFound) {
103+
if err != nil {
104+
return nil, err
105+
}
106+
if (flags & os.O_EXCL) != 0 {
107+
return nil, errors.New("file already exists")
108+
}
109+
if (flags & os.O_WRONLY) != 0 {
110+
return nil, errors.New("cannot write to uploading file")
111+
}
112+
_, err = f.Seek(offset, io.SeekStart)
113+
if err != nil {
114+
_ = f.Close()
115+
return nil, fmt.Errorf("failed seek borrow: %+v", err)
116+
}
117+
return f, nil
118+
}
100119
_, err = fs.Get(a.ctx, path, &fs.GetArgs{})
101120
exists := err == nil
102121
if (flags&os.O_CREATE) == 0 && !exists {

server/ftp/fsmanage.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package ftp
22

33
import (
44
"context"
5-
"fmt"
65
stdpath "path"
76

87
"github.com/OpenListTeam/OpenList/v4/internal/conf"
@@ -43,6 +42,9 @@ func Remove(ctx context.Context, path string) error {
4342
if err != nil {
4443
return err
4544
}
45+
if err = RemoveStage(reqPath); !errors.Is(err, errs.ObjectNotFound) {
46+
return err
47+
}
4648
return fs.Remove(ctx, reqPath)
4749
}
4850

@@ -62,23 +64,24 @@ func Rename(ctx context.Context, oldPath, newPath string) error {
6264
if !user.CanRename() || !user.CanFTPManage() {
6365
return errs.PermissionDenied
6466
}
67+
if err = MoveStage(srcPath, dstPath); !errors.Is(err, errs.ObjectNotFound) {
68+
return err
69+
}
6570
return fs.Rename(ctx, srcPath, dstBase)
6671
} else {
6772
if !user.CanFTPManage() || !user.CanMove() || (srcBase != dstBase && !user.CanRename()) {
6873
return errs.PermissionDenied
6974
}
70-
if _, err = fs.Move(ctx, srcPath, dstDir); err != nil {
71-
if srcBase != dstBase {
72-
return err
73-
}
74-
if _, err1 := fs.Copy(ctx, srcPath, dstDir); err1 != nil {
75-
return fmt.Errorf("failed move for %+v, and failed try copying for %+v", err, err1)
76-
}
77-
return nil
75+
if err = MoveStage(srcPath, dstPath); !errors.Is(err, errs.ObjectNotFound) {
76+
return err
7877
}
7978
if srcBase != dstBase {
80-
return fs.Rename(ctx, stdpath.Join(dstDir, srcBase), dstBase)
79+
err = fs.Rename(ctx, srcPath, dstBase, true)
80+
if err != nil {
81+
return err
82+
}
8183
}
82-
return nil
84+
_, err = fs.Move(ctx, stdpath.Join(srcDir, dstBase), dstDir)
85+
return err
8386
}
8487
}

server/ftp/fsread.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,19 @@ func OpenDownload(ctx context.Context, reqPath string, offset int64) (*FileDownl
6363
func (f *FileDownloadProxy) Read(p []byte) (n int, err error) {
6464
n, err = f.File.Read(p)
6565
if err != nil {
66-
return
66+
return n, err
6767
}
6868
err = stream.ClientDownloadLimit.WaitN(f.ctx, n)
69-
return
69+
return n, err
7070
}
7171

7272
func (f *FileDownloadProxy) ReadAt(p []byte, off int64) (n int, err error) {
7373
n, err = f.File.ReadAt(p, off)
7474
if err != nil {
75-
return
75+
return n, err
7676
}
7777
err = stream.ClientDownloadLimit.WaitN(f.ctx, n)
78-
return
78+
return n, err
7979
}
8080

8181
func (f *FileDownloadProxy) Write(p []byte) (n int, err error) {
@@ -95,7 +95,7 @@ func (o *OsFileInfoAdapter) Size() int64 {
9595
}
9696

9797
func (o *OsFileInfoAdapter) Mode() fs2.FileMode {
98-
var mode fs2.FileMode = 0755
98+
var mode fs2.FileMode = 0o755
9999
if o.IsDir() {
100100
mode |= fs2.ModeDir
101101
}
@@ -130,6 +130,9 @@ func Stat(ctx context.Context, path string) (os.FileInfo, error) {
130130
if !common.CanAccess(user, meta, reqPath, ctx.Value(conf.MetaPassKey).(string)) {
131131
return nil, errs.PermissionDenied
132132
}
133+
if ret, err := StatStage(reqPath); !errors.Is(err, errs.ObjectNotFound) {
134+
return ret, err
135+
}
133136
obj, err := fs.Get(ctx, reqPath, &fs.GetArgs{})
134137
if err != nil {
135138
return nil, err
@@ -157,6 +160,13 @@ func List(ctx context.Context, path string) ([]os.FileInfo, error) {
157160
if err != nil {
158161
return nil, err
159162
}
163+
uploading := ListStage(reqPath)
164+
for _, o := range objs {
165+
delete(uploading, o.GetName())
166+
}
167+
for _, u := range uploading {
168+
objs = append(objs, u)
169+
}
160170
ret := make([]os.FileInfo, len(objs))
161171
for i, obj := range objs {
162172
ret[i] = &OsFileInfoAdapter{obj: obj}

server/ftp/fsup.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ftp
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
78
"net/http"
89
"os"
@@ -62,10 +63,10 @@ func (f *FileUploadProxy) Read(p []byte) (n int, err error) {
6263
func (f *FileUploadProxy) Write(p []byte) (n int, err error) {
6364
n, err = f.buffer.Write(p)
6465
if err != nil {
65-
return
66+
return n, err
6667
}
6768
err = stream.ClientUploadLimit.WaitN(f.ctx, n)
68-
return
69+
return n, err
6970
}
7071

7172
func (f *FileUploadProxy) Seek(offset int64, whence int) (int64, error) {
@@ -89,6 +90,25 @@ func (f *FileUploadProxy) Close() error {
8990
if _, err := f.buffer.Seek(0, io.SeekStart); err != nil {
9091
return err
9192
}
93+
user := f.ctx.Value(conf.UserKey).(*model.User)
94+
sf, borrow, err := MakeStage(f.ctx, f.buffer, size, f.path, func(target string) {
95+
ctx := context.WithValue(context.Background(), conf.UserKey, user)
96+
dstDir, dstBase := stdpath.Split(target)
97+
if dir == dstDir {
98+
_ = fs.Rename(ctx, f.path, dstBase)
99+
} else {
100+
if name != dstBase {
101+
e := fs.Rename(ctx, f.path, dstBase, true)
102+
if e != nil {
103+
return
104+
}
105+
}
106+
_, _ = fs.Move(ctx, stdpath.Join(dir, dstBase), dstDir)
107+
}
108+
})
109+
if err != nil {
110+
return fmt.Errorf("failed make stage for [%s]: %+v", f.path, err)
111+
}
92112
if f.trunc {
93113
_ = fs.Remove(f.ctx, f.path)
94114
}
@@ -100,10 +120,18 @@ func (f *FileUploadProxy) Close() error {
100120
},
101121
Mimetype: contentType,
102122
WebPutAsTask: true,
123+
Reader: f.buffer,
103124
}
104-
s.SetTmpFile(f.buffer)
105-
_, err = fs.PutAsTask(f.ctx, dir, s)
106-
return err
125+
s.Add(borrow)
126+
task, err := fs.PutAsTask(f.ctx, dir, s)
127+
if err != nil {
128+
_ = s.Close()
129+
return err
130+
}
131+
sf.SetRemoveCallback(func() {
132+
fs.UploadTaskManager.Cancel(task.GetID())
133+
})
134+
return nil
107135
}
108136

109137
type FileUploadWithLengthProxy struct {
@@ -182,10 +210,10 @@ func (f *FileUploadWithLengthProxy) write(p []byte) (n int, err error) {
182210
func (f *FileUploadWithLengthProxy) Write(p []byte) (n int, err error) {
183211
n, err = f.write(p)
184212
if err != nil {
185-
return
213+
return n, err
186214
}
187215
err = stream.ClientUploadLimit.WaitN(f.ctx, n)
188-
return
216+
return n, err
189217
}
190218

191219
func (f *FileUploadWithLengthProxy) Seek(offset int64, whence int) (int64, error) {
@@ -214,6 +242,6 @@ func (f *FileUploadWithLengthProxy) Close() error {
214242
WebPutAsTask: false,
215243
Reader: bytes.NewReader(data),
216244
}
217-
return fs.PutDirectly(f.ctx, dir, s, true)
245+
return fs.PutDirectly(f.ctx, dir, s)
218246
}
219247
}

0 commit comments

Comments
 (0)