Skip to content

Commit ab9422d

Browse files
sftp binding component (#3505)
Signed-off-by: Mustafa Arslan <[email protected]> Signed-off-by: Bernd Verst <[email protected]> Co-authored-by: Bernd Verst <[email protected]>
1 parent c6bac52 commit ab9422d

File tree

4 files changed

+433
-0
lines changed

4 files changed

+433
-0
lines changed

bindings/sftp/sftp.go

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
package sftp
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"reflect"
10+
11+
sftpClient "github.com/pkg/sftp"
12+
"golang.org/x/crypto/ssh"
13+
"golang.org/x/crypto/ssh/knownhosts"
14+
15+
"github.com/dapr/components-contrib/bindings"
16+
"github.com/dapr/components-contrib/metadata"
17+
"github.com/dapr/kit/logger"
18+
kitmd "github.com/dapr/kit/metadata"
19+
)
20+
21+
const (
22+
metadataRootPath = "rootPath"
23+
metadataFileName = "fileName"
24+
)
25+
26+
// Sftp is a binding for file operations on sftp server.
27+
type Sftp struct {
28+
metadata *sftpMetadata
29+
logger logger.Logger
30+
sftpClient *sftpClient.Client
31+
}
32+
33+
// sftpMetadata defines the sftp metadata.
34+
type sftpMetadata struct {
35+
RootPath string `json:"rootPath"`
36+
Address string `json:"address"`
37+
Username string `json:"username"`
38+
Password string `json:"password"`
39+
PrivateKey []byte `json:"privateKey"`
40+
PrivateKeyPassphrase []byte `json:"privateKeyPassphrase"`
41+
HostPublicKey []byte `json:"hostPublicKey"`
42+
KnownHostsFile string `json:"knownHostsFile"`
43+
InsecureIgnoreHostKey bool `json:"insecureIgnoreHostKey"`
44+
}
45+
46+
type createResponse struct {
47+
FileName string `json:"fileName"`
48+
}
49+
50+
type listResponse struct {
51+
FileName string `json:"fileName"`
52+
IsDirectory bool `json:"isDirectory"`
53+
}
54+
55+
func NewSftp(logger logger.Logger) bindings.OutputBinding {
56+
return &Sftp{logger: logger}
57+
}
58+
59+
func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error {
60+
m, err := sftp.parseMetadata(metadata)
61+
if err != nil {
62+
return fmt.Errorf("failed to parse metadata: %w", err)
63+
}
64+
65+
var auth []ssh.AuthMethod
66+
var hostKeyCallback ssh.HostKeyCallback
67+
68+
if m.InsecureIgnoreHostKey {
69+
//nolint:gosec
70+
hostKeyCallback = ssh.InsecureIgnoreHostKey()
71+
} else if len(m.KnownHostsFile) > 0 {
72+
hostKeyCallback, err = knownhosts.New(m.KnownHostsFile)
73+
if err != nil {
74+
return fmt.Errorf("sftp binding error: read known host file error: %w", err)
75+
}
76+
} else if len(m.HostPublicKey) > 0 {
77+
var hostPublicKey ssh.PublicKey
78+
hostPublicKey, _, _, _, err = ssh.ParseAuthorizedKey(m.HostPublicKey)
79+
if err != nil {
80+
return fmt.Errorf("sftp binding error: parse host public key error: %w", err)
81+
}
82+
83+
hostKeyCallback = ssh.FixedHostKey(hostPublicKey)
84+
}
85+
86+
if hostKeyCallback == nil {
87+
return errors.New("sftp binding error: no host validation method provided")
88+
}
89+
90+
if len(m.PrivateKey) > 0 {
91+
var signer ssh.Signer
92+
93+
if len(m.PrivateKeyPassphrase) > 0 {
94+
signer, err = ssh.ParsePrivateKeyWithPassphrase(m.PrivateKey, m.PrivateKeyPassphrase)
95+
if err != nil {
96+
return fmt.Errorf("sftp binding error: parse private key error: %w", err)
97+
}
98+
} else {
99+
signer, err = ssh.ParsePrivateKey(m.PrivateKey)
100+
if err != nil {
101+
return fmt.Errorf("sftp binding error: parse private key error: %w", err)
102+
}
103+
}
104+
105+
auth = append(auth, ssh.PublicKeys(signer))
106+
}
107+
108+
if len(m.Password) > 0 {
109+
auth = append(auth, ssh.Password(m.Password))
110+
}
111+
112+
config := &ssh.ClientConfig{
113+
User: m.Username,
114+
Auth: auth,
115+
HostKeyCallback: hostKeyCallback,
116+
}
117+
118+
sshClient, err := ssh.Dial("tcp", m.Address, config)
119+
if err != nil {
120+
return fmt.Errorf("sftp binding error: error create ssh client: %w", err)
121+
}
122+
123+
newSftpClient, err := sftpClient.NewClient(sshClient)
124+
if err != nil {
125+
return fmt.Errorf("sftp binding error: error create sftp client: %w", err)
126+
}
127+
128+
sftp.metadata = m
129+
sftp.sftpClient = newSftpClient
130+
131+
return nil
132+
}
133+
134+
func (sftp *Sftp) parseMetadata(meta bindings.Metadata) (*sftpMetadata, error) {
135+
var m sftpMetadata
136+
err := kitmd.DecodeMetadata(meta.Properties, &m)
137+
if err != nil {
138+
return nil, err
139+
}
140+
141+
return &m, nil
142+
}
143+
144+
func (sftp *Sftp) Operations() []bindings.OperationKind {
145+
return []bindings.OperationKind{
146+
bindings.CreateOperation,
147+
bindings.GetOperation,
148+
bindings.DeleteOperation,
149+
bindings.ListOperation,
150+
}
151+
}
152+
153+
func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
154+
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
155+
if err != nil {
156+
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
157+
}
158+
159+
path, err := metadata.getPath(req.Metadata)
160+
if err != nil {
161+
return nil, fmt.Errorf("sftp binding error: %w", err)
162+
}
163+
164+
dir, fileName := sftpClient.Split(path)
165+
166+
err = sftp.sftpClient.MkdirAll(dir)
167+
if err != nil {
168+
return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err)
169+
}
170+
171+
file, err := sftp.sftpClient.Create(path)
172+
if err != nil {
173+
return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err)
174+
}
175+
176+
_, err = file.Write(req.Data)
177+
if err != nil {
178+
return nil, fmt.Errorf("sftp binding error: error write file: %w", err)
179+
}
180+
181+
jsonResponse, err := json.Marshal(createResponse{
182+
FileName: fileName,
183+
})
184+
if err != nil {
185+
return nil, fmt.Errorf("sftp binding error: error marshalling create response: %w", err)
186+
}
187+
188+
return &bindings.InvokeResponse{
189+
Data: jsonResponse,
190+
Metadata: map[string]string{
191+
metadataFileName: fileName,
192+
},
193+
}, nil
194+
}
195+
196+
func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
197+
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
198+
if err != nil {
199+
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
200+
}
201+
202+
path, err := metadata.getPath(req.Metadata)
203+
if err != nil {
204+
return nil, fmt.Errorf("sftp binding error: %w", err)
205+
}
206+
207+
files, err := sftp.sftpClient.ReadDir(path)
208+
if err != nil {
209+
return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err)
210+
}
211+
212+
resp := make([]listResponse, len(files))
213+
214+
for i, file := range files {
215+
resp[i] = listResponse{
216+
FileName: file.Name(),
217+
IsDirectory: file.IsDir(),
218+
}
219+
}
220+
221+
jsonResponse, err := json.Marshal(resp)
222+
if err != nil {
223+
return nil, fmt.Errorf("sftp binding error: cannot marshal list to json: %w", err)
224+
}
225+
226+
return &bindings.InvokeResponse{
227+
Data: jsonResponse,
228+
}, nil
229+
}
230+
231+
func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
232+
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
233+
if err != nil {
234+
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
235+
}
236+
237+
path, err := metadata.getPath(req.Metadata)
238+
if err != nil {
239+
return nil, fmt.Errorf("sftp binding error: %w", err)
240+
}
241+
242+
file, err := sftp.sftpClient.Open(path)
243+
if err != nil {
244+
return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err)
245+
}
246+
247+
b, err := io.ReadAll(file)
248+
if err != nil {
249+
return nil, fmt.Errorf("sftp binding error: error read file %s: %w", path, err)
250+
}
251+
252+
return &bindings.InvokeResponse{
253+
Data: b,
254+
}, nil
255+
}
256+
257+
func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
258+
metadata, err := sftp.metadata.mergeWithRequestMetadata(req)
259+
if err != nil {
260+
return nil, fmt.Errorf("sftp binding error: error merging metadata: %w", err)
261+
}
262+
263+
path, err := metadata.getPath(req.Metadata)
264+
if err != nil {
265+
return nil, fmt.Errorf("sftp binding error: %w", err)
266+
}
267+
268+
err = sftp.sftpClient.Remove(path)
269+
if err != nil {
270+
return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err)
271+
}
272+
273+
return nil, nil
274+
}
275+
276+
func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
277+
switch req.Operation {
278+
case bindings.CreateOperation:
279+
return sftp.create(ctx, req)
280+
case bindings.GetOperation:
281+
return sftp.get(ctx, req)
282+
case bindings.DeleteOperation:
283+
return sftp.delete(ctx, req)
284+
case bindings.ListOperation:
285+
return sftp.list(ctx, req)
286+
default:
287+
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
288+
}
289+
}
290+
291+
func (sftp *Sftp) Close() error {
292+
return sftp.sftpClient.Close()
293+
}
294+
295+
func (metadata sftpMetadata) getPath(requestMetadata map[string]string) (path string, err error) {
296+
if val, ok := kitmd.GetMetadataProperty(requestMetadata, metadataFileName); ok && val != "" {
297+
path = sftpClient.Join(metadata.RootPath, val)
298+
} else {
299+
path = metadata.RootPath
300+
}
301+
302+
if path == "" {
303+
err = errors.New("required metadata rootPath or fileName missing")
304+
}
305+
306+
return
307+
}
308+
309+
// Helper to merge config and request metadata.
310+
func (metadata sftpMetadata) mergeWithRequestMetadata(req *bindings.InvokeRequest) (sftpMetadata, error) {
311+
merged := metadata
312+
313+
if val, ok := kitmd.GetMetadataProperty(req.Metadata, metadataRootPath); ok && val != "" {
314+
merged.RootPath = val
315+
}
316+
317+
return merged, nil
318+
}
319+
320+
// GetComponentMetadata returns the metadata of the component.
321+
func (sftp *Sftp) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
322+
metadataStruct := sftpMetadata{}
323+
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
324+
return
325+
}

0 commit comments

Comments
 (0)