Skip to content

Commit 319c986

Browse files
committed
chore: Handle sftp reconnections
Signed-off-by: Javier Aliaga <[email protected]>
1 parent c30af7d commit 319c986

File tree

6 files changed

+195
-12
lines changed

6 files changed

+195
-12
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# SFTP with Toxiproxy Setup
2+
3+
This setup includes an SFTP server and Toxiproxy for simulating network conditions during testing.
4+
5+
## Services
6+
7+
- **SFTP**: A simple SFTP server with a test user
8+
- **Toxiproxy**: A TCP proxy that allows simulating network conditions like latency, bandwidth restrictions, and connection failures
9+
10+
## Getting Started
11+
12+
1. Start the services:
13+
14+
```bash
15+
docker-compose up -d
16+
```
17+
18+
2. Connect to SFTP via the Toxiproxy port:
19+
20+
```bash
21+
sftp -P 2222 foo@localhost
22+
# Password: pass
23+
```
24+
25+
3. Control Toxiproxy via its API (port 8474):
26+
27+
```bash
28+
# Add 1000ms latency to SFTP connections
29+
curl -X POST -H "Content-Type: application/json" \
30+
http://localhost:8474/proxies/sftp/toxics \
31+
-d '{"type":"latency", "attributes":{"latency":1000, "jitter":0}}'
32+
33+
# Simulate connection timeout
34+
curl -X POST -H "Content-Type: application/json" \
35+
http://localhost:8474/proxies/sftp/toxics \
36+
-d '{"type":"timeout", "attributes":{"timeout":1000}}'
37+
38+
# Remove all toxics
39+
curl -X DELETE http://localhost:8474/proxies/sftp/toxics
40+
```
41+
42+
See the [Toxiproxy documentation](https://github.com/Shopify/toxiproxy) for more information on available toxics and configuration options.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
version: '3.8'
2+
3+
services:
4+
toxiproxy:
5+
image: ghcr.io/shopify/toxiproxy:2.5.0
6+
ports:
7+
- "8474:8474" # Toxiproxy API
8+
- "2222:2222" # Proxied SFTP port
9+
environment:
10+
- HOSTNAME=0.0.0.0
11+
command: ["-host", "0.0.0.0"]
12+
volumes:
13+
- ./toxiproxy.json:/config/toxiproxy.json
14+
depends_on:
15+
- sftp
16+
networks:
17+
- sftp-network
18+
19+
sftp:
20+
build:
21+
context: .
22+
dockerfile: Dockerfile
23+
environment:
24+
- SFTP_USERS=foo:pass:1001:1001:upload
25+
volumes:
26+
- ./upload:/home/testuser/upload
27+
expose:
28+
- "2222:22"
29+
networks:
30+
- sftp-network
31+
32+
networks:
33+
sftp-network:
34+
driver: bridge

bindings/sftp/sftp.go

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"reflect"
10+
"sync"
1011

1112
sftpClient "github.com/pkg/sftp"
1213
"golang.org/x/crypto/ssh"
@@ -25,9 +26,30 @@ const (
2526

2627
// Sftp is a binding for file operations on sftp server.
2728
type Sftp struct {
28-
metadata *sftpMetadata
29-
logger logger.Logger
30-
sftpClient *sftpClient.Client
29+
metadata *sftpMetadata
30+
logger logger.Logger
31+
sftpClient *sftpClient.Client
32+
sshClient *ssh.Client
33+
clientConfig *ssh.ClientConfig
34+
lock sync.RWMutex
35+
}
36+
37+
func (sftp *Sftp) Client() (*sftpClient.Client, error) {
38+
sftp.lock.RLock()
39+
current := sftp.sftpClient
40+
sftp.lock.RUnlock()
41+
42+
if current != nil {
43+
if _, err := current.Getwd(); err == nil {
44+
return current, nil
45+
}
46+
}
47+
48+
err := sftp.handleReconnection()
49+
if err != nil {
50+
return nil, err
51+
}
52+
return sftp.sftpClient, nil
3153
}
3254

3355
// sftpMetadata defines the sftp metadata.
@@ -122,9 +144,11 @@ func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error {
122144

123145
newSftpClient, err := sftpClient.NewClient(sshClient)
124146
if err != nil {
147+
_ = sshClient.Close()
125148
return fmt.Errorf("sftp binding error: error create sftp client: %w", err)
126149
}
127150

151+
sftp.clientConfig = config
128152
sftp.metadata = m
129153
sftp.sftpClient = newSftpClient
130154

@@ -163,12 +187,17 @@ func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindi
163187

164188
dir, fileName := sftpClient.Split(path)
165189

166-
err = sftp.sftpClient.MkdirAll(dir)
190+
c, err := sftp.Client()
191+
if err != nil {
192+
return nil, fmt.Errorf("sftp binding error: error getting sftp client: %w", err)
193+
}
194+
195+
err = c.MkdirAll(dir)
167196
if err != nil {
168197
return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err)
169198
}
170199

171-
file, err := sftp.sftpClient.Create(path)
200+
file, err := c.Create(path)
172201
if err != nil {
173202
return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err)
174203
}
@@ -211,7 +240,12 @@ func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*binding
211240
return nil, fmt.Errorf("sftp binding error: %w", err)
212241
}
213242

214-
files, err := sftp.sftpClient.ReadDir(path)
243+
c, err := sftp.Client()
244+
if err != nil {
245+
return nil, fmt.Errorf("sftp binding error: error getting sftp client: %w", err)
246+
}
247+
248+
files, err := c.ReadDir(path)
215249
if err != nil {
216250
return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err)
217251
}
@@ -246,7 +280,12 @@ func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings
246280
return nil, fmt.Errorf("sftp binding error: %w", err)
247281
}
248282

249-
file, err := sftp.sftpClient.Open(path)
283+
c, err := sftp.Client()
284+
if err != nil {
285+
return nil, fmt.Errorf("sftp binding error: error getting sftp client: %w", err)
286+
}
287+
288+
file, err := c.Open(path)
250289
if err != nil {
251290
return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err)
252291
}
@@ -272,7 +311,11 @@ func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindi
272311
return nil, fmt.Errorf("sftp binding error: %w", err)
273312
}
274313

275-
err = sftp.sftpClient.Remove(path)
314+
c, err := sftp.Client()
315+
if err != nil {
316+
return nil, fmt.Errorf("sftp binding error: error getting sftp client: %w", err)
317+
}
318+
err = c.Remove(path)
276319
if err != nil {
277320
return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err)
278321
}
@@ -296,6 +339,8 @@ func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bin
296339
}
297340

298341
func (sftp *Sftp) Close() error {
342+
sftp.lock.Lock()
343+
defer sftp.lock.Unlock()
299344
return sftp.sftpClient.Close()
300345
}
301346

@@ -330,3 +375,41 @@ func (sftp *Sftp) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
330375
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
331376
return
332377
}
378+
379+
func (sftp *Sftp) handleReconnection() error {
380+
sftp.lock.Lock()
381+
defer sftp.lock.Unlock()
382+
383+
// Re-check after acquiring the write lock
384+
if sftp.sftpClient != nil {
385+
if _, err := sftp.sftpClient.Getwd(); err == nil {
386+
return nil
387+
}
388+
_ = sftp.sftpClient.Close()
389+
sftp.sftpClient = nil
390+
}
391+
if sftp.sshClient != nil {
392+
_ = sftp.sshClient.Close()
393+
sftp.sshClient = nil
394+
}
395+
396+
if sftp.metadata == nil || sftp.clientConfig == nil {
397+
return fmt.Errorf("sftp binding error: client not initialized")
398+
}
399+
400+
sshClient, err := ssh.Dial("tcp", sftp.metadata.Address, sftp.clientConfig)
401+
if err != nil {
402+
return fmt.Errorf("sftp binding error: error create ssh client: %w", err)
403+
}
404+
405+
newSftpClient, err := sftpClient.NewClient(sshClient)
406+
if err != nil {
407+
_ = sshClient.Close()
408+
return fmt.Errorf("sftp binding error: error create sftp client: %w", err)
409+
}
410+
411+
sftp.sshClient = sshClient
412+
sftp.sftpClient = newSftpClient
413+
414+
return nil
415+
}

bindings/sftp/sftp_integration_test.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"testing"
77

8+
toxiproxy "github.com/Shopify/toxiproxy/client"
89
"github.com/stretchr/testify/assert"
910
"github.com/stretchr/testify/require"
1011

@@ -14,11 +15,13 @@ import (
1415
var connectionStringEnvKey = "DAPR_TEST_SFTP_CONNSTRING"
1516

1617
// Run docker from the file location as the upload folder is relative to the test
17-
// docker run -v ./upload:/home/foo/upload -p 2222:22 -d atmoz/sftp foo:pass:1001
18+
// cd integration
19+
// docker-compose up -d
20+
// export DAPR_TEST_SFTP_CONNSTRING=sftp:22
1821
func TestIntegrationCases(t *testing.T) {
1922
connectionString := os.Getenv(connectionStringEnvKey)
2023
if connectionString == "" {
21-
t.Skipf(`sftp binding integration tests skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s="localhost:2222")'`, connectionStringEnvKey)
24+
t.Skipf("sftp binding integration skipped. To enable this test, define the connection string using environment variable '%[1]s' (example 'export %[1]s=\"localhost:2222\")'", connectionStringEnvKey)
2225
}
2326

2427
t.Run("List operation", testListOperation)
@@ -27,21 +30,40 @@ func TestIntegrationCases(t *testing.T) {
2730

2831
func testListOperation(t *testing.T) {
2932
c := Sftp{}
33+
34+
client := toxiproxy.NewClient("localhost:8474")
35+
p, err := client.CreateProxy("sftp", "0.0.0.0:2222", os.Getenv(connectionStringEnvKey))
36+
require.NoError(t, err)
37+
defer p.Delete()
38+
3039
m := bindings.Metadata{}
40+
3141
m.Properties = map[string]string{
3242
"rootPath": "/upload",
33-
"address": os.Getenv(connectionStringEnvKey),
43+
"address": "0.0.0.0:2222",
3444
"username": "foo",
3545
"password": "pass",
3646
"insecureIgnoreHostKey": "true",
3747
}
38-
err := c.Init(t.Context(), m)
48+
49+
err = c.Init(t.Context(), m)
3950
require.NoError(t, err)
4051

4152
r, err := c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation})
4253
require.NoError(t, err)
4354
assert.NotNil(t, r.Data)
4455

56+
tx, err := p.AddToxic("reset", "reset_peer", "downstream", 1, toxiproxy.Attributes{})
57+
require.NoError(t, err)
58+
defer p.RemoveToxic(tx.Name)
59+
60+
_, err = c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation})
61+
require.Error(t, err)
62+
63+
p.RemoveToxic(tx.Name)
64+
r, err = c.Invoke(t.Context(), &bindings.InvokeRequest{Operation: bindings.ListOperation})
65+
require.NoError(t, err)
66+
4567
var d []listResponse
4668
err = json.Unmarshal(r.Data, &d)
4769
require.NoError(t, err)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/Azure/go-amqp v1.0.5
2626
github.com/DATA-DOG/go-sqlmock v1.5.0
2727
github.com/IBM/sarama v1.45.2
28+
github.com/Shopify/toxiproxy v2.1.4+incompatible
2829
github.com/aerospike/aerospike-client-go/v6 v6.12.0
2930
github.com/alibaba/sentinel-golang v1.0.4
3031
github.com/alibabacloud-go/darabonba-openapi v0.2.1

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
156156
github.com/RoaringBitmap/roaring v1.1.0 h1:b10lZrZXaY6Q6EKIRrmOF519FIyQQ5anPgGr3niw2yY=
157157
github.com/RoaringBitmap/roaring v1.1.0/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
158158
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
159+
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
159160
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
160161
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
161162
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=

0 commit comments

Comments
 (0)