Skip to content

Commit ef7f8cc

Browse files
committed
feat: add support for ftp
1 parent de24e8d commit ef7f8cc

File tree

11 files changed

+521
-67
lines changed

11 files changed

+521
-67
lines changed

cli/disk/ftp.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package disk
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"net/url"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"github.com/jlaffaye/ftp"
12+
"github.com/pkg/errors"
13+
"github.com/rs/zerolog/log"
14+
)
15+
16+
var _ Disk = (*ftpDisk)(nil)
17+
18+
type ftpDisk struct {
19+
path string
20+
client *ftp.ServerConn
21+
stepLock sync.Mutex
22+
}
23+
24+
type ftpEntry struct {
25+
*ftp.Entry
26+
}
27+
28+
func (f ftpEntry) IsDir() bool {
29+
return f.Entry.Type == ftp.EntryTypeFolder
30+
}
31+
32+
func (f ftpEntry) Name() string {
33+
return f.Entry.Name
34+
}
35+
36+
func newFTP(path string) (Disk, error) {
37+
u, err := url.Parse(path)
38+
if err != nil {
39+
return nil, errors.Wrap(err, "failed to parse ftp url")
40+
}
41+
42+
c, err := ftp.Dial(u.Host, ftp.DialWithTimeout(time.Second*5))
43+
if err != nil {
44+
return nil, errors.Wrap(err, "failed to dial host "+u.Host)
45+
}
46+
47+
password, _ := u.User.Password()
48+
if err := c.Login(u.User.Username(), password); err != nil {
49+
return nil, errors.Wrap(err, "failed to login")
50+
}
51+
52+
log.Debug().Msg("logged into ftp")
53+
54+
return &ftpDisk{
55+
path: u.Path,
56+
client: c,
57+
}, nil
58+
}
59+
60+
func (l *ftpDisk) Exists(path string) error {
61+
l.stepLock.Lock()
62+
defer l.stepLock.Unlock()
63+
64+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("checking if file exists")
65+
_, err := l.client.FileSize(path)
66+
return errors.Wrap(err, "failed to check if file exists")
67+
}
68+
69+
func (l *ftpDisk) Read(path string) ([]byte, error) {
70+
l.stepLock.Lock()
71+
defer l.stepLock.Unlock()
72+
73+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("reading file")
74+
75+
f, err := l.client.Retr(path)
76+
if err != nil {
77+
return nil, errors.Wrap(err, "failed to retrieve path")
78+
}
79+
80+
defer f.Close()
81+
82+
data, err := io.ReadAll(f)
83+
return data, errors.Wrap(err, "failed to read file")
84+
}
85+
86+
func (l *ftpDisk) Write(path string, data []byte) error {
87+
l.stepLock.Lock()
88+
defer l.stepLock.Unlock()
89+
90+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("writing to file")
91+
return errors.Wrap(l.client.Stor(path, bytes.NewReader(data)), "failed to write file")
92+
}
93+
94+
func (l *ftpDisk) Remove(path string) error {
95+
l.stepLock.Lock()
96+
defer l.stepLock.Unlock()
97+
98+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("deleting path")
99+
return errors.Wrap(l.client.Delete(path), "failed to delete path")
100+
}
101+
102+
func (l *ftpDisk) MkDir(path string) error {
103+
l.stepLock.Lock()
104+
defer l.stepLock.Unlock()
105+
106+
log.Debug().Str("schema", "ftp").Msg("going to root directory")
107+
err := l.client.ChangeDir("/")
108+
if err != nil {
109+
return errors.Wrap(err, "failed to change directory")
110+
}
111+
112+
split := strings.Split(path[1:], "/")
113+
for _, s := range split {
114+
dir, err := l.ReadDirLock("", false)
115+
if err != nil {
116+
return err
117+
}
118+
119+
foundDir := false
120+
for _, entry := range dir {
121+
if entry.IsDir() && entry.Name() == s {
122+
foundDir = true
123+
break
124+
}
125+
}
126+
127+
if !foundDir {
128+
log.Debug().Str("dir", s).Str("schema", "ftp").Msg("making directory")
129+
if err := l.client.MakeDir(s); err != nil {
130+
return errors.Wrap(err, "failed to make directory")
131+
}
132+
}
133+
134+
log.Debug().Str("dir", s).Str("schema", "ftp").Msg("entering directory")
135+
if err := l.client.ChangeDir(s); err != nil {
136+
return errors.Wrap(err, "failed to enter directory")
137+
}
138+
}
139+
140+
return nil
141+
}
142+
143+
func (l *ftpDisk) ReadDir(path string) ([]Entry, error) {
144+
return l.ReadDirLock(path, true)
145+
}
146+
147+
func (l *ftpDisk) ReadDirLock(path string, lock bool) ([]Entry, error) {
148+
if lock {
149+
l.stepLock.Lock()
150+
defer l.stepLock.Unlock()
151+
}
152+
153+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("reading directory")
154+
155+
dir, err := l.client.List(path)
156+
if err != nil {
157+
return nil, errors.Wrap(err, "failed to list files in directory")
158+
}
159+
160+
entries := make([]Entry, len(dir))
161+
for i, entry := range dir {
162+
entries[i] = ftpEntry{
163+
Entry: entry,
164+
}
165+
}
166+
167+
return entries, nil
168+
}
169+
170+
func (l *ftpDisk) IsNotExist(err error) bool {
171+
return strings.Contains(err.Error(), "Could not get file") || strings.Contains(err.Error(), "Failed to open file")
172+
}
173+
174+
func (l *ftpDisk) IsExist(err error) bool {
175+
return strings.Contains(err.Error(), "Create directory operation failed")
176+
}
177+
178+
func (l *ftpDisk) Open(path string, flag int) (io.WriteCloser, error) {
179+
reader, writer := io.Pipe()
180+
181+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("opening for writing")
182+
183+
go func() {
184+
l.stepLock.Lock()
185+
defer l.stepLock.Unlock()
186+
187+
err := l.client.Stor(path, reader)
188+
if err != nil {
189+
log.Err(err).Msg("failed to store file")
190+
}
191+
log.Debug().Str("path", path).Str("schema", "ftp").Msg("write success")
192+
}()
193+
194+
return writer, nil
195+
}

cli/disk/local.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package disk
2+
3+
import (
4+
"io"
5+
"os"
6+
)
7+
8+
var _ Disk = (*localDisk)(nil)
9+
10+
type localDisk struct {
11+
path string
12+
}
13+
14+
type localEntry struct {
15+
os.DirEntry
16+
}
17+
18+
func newLocal(path string) (Disk, error) {
19+
return localDisk{path: path}, nil
20+
}
21+
22+
func (l localDisk) Exists(path string) error {
23+
_, err := os.Stat(path)
24+
return err //nolint
25+
}
26+
27+
func (l localDisk) Read(path string) ([]byte, error) {
28+
return os.ReadFile(path) //nolint
29+
}
30+
31+
func (l localDisk) Write(path string, data []byte) error {
32+
return os.WriteFile(path, data, 0777) //nolint
33+
}
34+
35+
func (l localDisk) Remove(path string) error {
36+
return os.RemoveAll(path) //nolint
37+
}
38+
39+
func (l localDisk) MkDir(path string) error {
40+
return os.MkdirAll(path, 0777) //nolint
41+
}
42+
43+
func (l localDisk) ReadDir(path string) ([]Entry, error) {
44+
dir, err := os.ReadDir(path)
45+
if err != nil {
46+
return nil, err //nolint
47+
}
48+
49+
entries := make([]Entry, len(dir))
50+
for i, entry := range dir {
51+
entries[i] = localEntry{
52+
DirEntry: entry,
53+
}
54+
}
55+
56+
return entries, nil
57+
}
58+
59+
func (l localDisk) IsNotExist(err error) bool {
60+
return os.IsNotExist(err)
61+
}
62+
63+
func (l localDisk) IsExist(err error) bool {
64+
return os.IsExist(err)
65+
}
66+
67+
func (l localDisk) Open(path string, flag int) (io.WriteCloser, error) {
68+
return os.OpenFile(path, flag, 0777) //nolint
69+
}

cli/disk/main.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package disk
2+
3+
import (
4+
"io"
5+
"net/url"
6+
7+
"github.com/pkg/errors"
8+
"github.com/rs/zerolog/log"
9+
)
10+
11+
type Disk interface {
12+
Exists(path string) error
13+
Read(path string) ([]byte, error)
14+
Write(path string, data []byte) error
15+
Remove(path string) error
16+
MkDir(path string) error
17+
ReadDir(path string) ([]Entry, error)
18+
IsNotExist(err error) bool
19+
IsExist(err error) bool
20+
Open(path string, flag int) (io.WriteCloser, error)
21+
}
22+
23+
type Entry interface {
24+
IsDir() bool
25+
Name() string
26+
}
27+
28+
func FromPath(path string) (Disk, error) {
29+
parsed, err := url.Parse(path)
30+
if err != nil {
31+
return nil, errors.Wrap(err, "failed to parse path")
32+
}
33+
34+
log.Info().Msg(path)
35+
log.Info().Msg(parsed.Scheme)
36+
switch parsed.Scheme {
37+
case "ftp":
38+
log.Info().Str("path", path).Msg("connecting to ftp")
39+
return newFTP(path)
40+
case "sftp":
41+
log.Info().Str("path", path).Msg("connecting to sftp")
42+
return newSFTP(path)
43+
}
44+
45+
log.Info().Str("path", path).Msg("using local disk")
46+
return newLocal(path)
47+
}

cli/disk/sftp.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package disk
2+
3+
import (
4+
"io"
5+
)
6+
7+
var _ Disk = (*sftpDisk)(nil)
8+
9+
type sftpDisk struct {
10+
path string
11+
}
12+
13+
func newSFTP(path string) (Disk, error) {
14+
return sftpDisk{path: path}, nil
15+
}
16+
17+
func (l sftpDisk) Exists(path string) error {
18+
//TODO implement me
19+
panic("implement me")
20+
}
21+
22+
func (l sftpDisk) Read(path string) ([]byte, error) {
23+
//TODO implement me
24+
panic("implement me")
25+
}
26+
27+
func (l sftpDisk) Write(path string, data []byte) error {
28+
//TODO implement me
29+
panic("implement me")
30+
}
31+
32+
func (l sftpDisk) Remove(path string) error {
33+
//TODO implement me
34+
panic("implement me")
35+
}
36+
37+
func (l sftpDisk) MkDir(path string) error {
38+
//TODO implement me
39+
panic("implement me")
40+
}
41+
42+
func (l sftpDisk) ReadDir(path string) ([]Entry, error) {
43+
//TODO implement me
44+
panic("implement me")
45+
}
46+
47+
func (l sftpDisk) IsNotExist(err error) bool {
48+
//TODO implement me
49+
panic("implement me")
50+
}
51+
52+
func (l sftpDisk) IsExist(err error) bool {
53+
//TODO implement me
54+
panic("implement me")
55+
}
56+
57+
func (l sftpDisk) Open(path string, flag int) (io.WriteCloser, error) {
58+
//TODO implement me
59+
panic("implement me")
60+
}

0 commit comments

Comments
 (0)