Skip to content

Commit dc9c872

Browse files
jgiannuzzincw
authored andcommitted
smb: improve connection pooling efficiency
* Lower pacer minSleep to establish new connections faster * Use Echo requests to check whether connections are working (required an upgrade of go-smb2) * Only remount shares when needed * Use context for connection establishment * When returning a connection to the pool, only check the ones that encountered errors * Close connections in parallel
1 parent 057fdb3 commit dc9c872

File tree

4 files changed

+63
-58
lines changed

4 files changed

+63
-58
lines changed

backend/smb/connpool.go

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,28 @@ package smb
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net"
8+
"os"
79
"time"
810

911
smb2 "github.com/cloudsoda/go-smb2"
1012
"github.com/rclone/rclone/fs"
1113
"github.com/rclone/rclone/fs/accounting"
1214
"github.com/rclone/rclone/fs/config/obscure"
1315
"github.com/rclone/rclone/fs/fshttp"
16+
"golang.org/x/sync/errgroup"
1417
)
1518

1619
// dial starts a client connection to the given SMB server. It is a
1720
// convenience function that connects to the given network address,
1821
// initiates the SMB handshake, and then sets up a Client.
22+
//
23+
// The context is only used for establishing the connection, not after.
1924
func (f *Fs) dial(ctx context.Context, network, addr string) (*conn, error) {
2025
dialer := fshttp.NewDialer(ctx)
21-
tconn, err := dialer.Dial(network, addr)
26+
tconn, err := dialer.DialContext(ctx, network, addr)
2227
if err != nil {
2328
return nil, err
2429
}
@@ -89,15 +94,7 @@ func (c *conn) close() (err error) {
8994

9095
// True if it's closed
9196
func (c *conn) closed() bool {
92-
var nopErr error
93-
if c.smbShare != nil {
94-
// stat the current directory
95-
_, nopErr = c.smbShare.Stat(".")
96-
} else {
97-
// list the shares
98-
_, nopErr = c.smbSession.ListSharenames()
99-
}
100-
return nopErr != nil
97+
return c.smbSession.Echo() != nil
10198
}
10299

103100
// Show that we are using a SMB session
@@ -118,23 +115,20 @@ func (f *Fs) getSessions() int32 {
118115
}
119116

120117
// Open a new connection to the SMB server.
118+
//
119+
// The context is only used for establishing the connection, not after.
121120
func (f *Fs) newConnection(ctx context.Context, share string) (c *conn, err error) {
122-
// As we are pooling these connections we need to decouple
123-
// them from the current context
124-
bgCtx := context.Background()
125-
126-
c, err = f.dial(bgCtx, "tcp", f.opt.Host+":"+f.opt.Port)
121+
c, err = f.dial(ctx, "tcp", f.opt.Host+":"+f.opt.Port)
127122
if err != nil {
128123
return nil, fmt.Errorf("couldn't connect SMB: %w", err)
129124
}
130125
if share != "" {
131126
// mount the specified share as well if user requested
132-
c.smbShare, err = c.smbSession.Mount(share)
127+
err = c.mountShare(share)
133128
if err != nil {
134129
_ = c.smbSession.Logoff()
135130
return nil, fmt.Errorf("couldn't initialize SMB: %w", err)
136131
}
137-
c.smbShare = c.smbShare.WithContext(bgCtx)
138132
}
139133
return c, nil
140134
}
@@ -192,23 +186,30 @@ func (f *Fs) getConnection(ctx context.Context, share string) (c *conn, err erro
192186
// Return a SMB connection to the pool
193187
//
194188
// It nils the pointed to connection out so it can't be reused
195-
func (f *Fs) putConnection(pc **conn) {
196-
c := *pc
197-
*pc = nil
198-
199-
var nopErr error
200-
if c.smbShare != nil {
201-
// stat the current directory
202-
_, nopErr = c.smbShare.Stat(".")
203-
} else {
204-
// list the shares
205-
_, nopErr = c.smbSession.ListSharenames()
189+
//
190+
// if err is not nil then it checks the connection is alive using an
191+
// ECHO request
192+
func (f *Fs) putConnection(pc **conn, err error) {
193+
if pc == nil {
194+
return
206195
}
207-
if nopErr != nil {
208-
fs.Debugf(f, "Connection failed, closing: %v", nopErr)
209-
_ = c.close()
196+
c := *pc
197+
if c == nil {
210198
return
211199
}
200+
*pc = nil
201+
if err != nil {
202+
// If not a regular SMB error then check the connection
203+
if !(errors.Is(err, os.ErrNotExist) || errors.Is(err, os.ErrExist) || errors.Is(err, os.ErrPermission)) {
204+
echoErr := c.smbSession.Echo()
205+
if echoErr != nil {
206+
fs.Debugf(f, "Connection failed, closing: %v", echoErr)
207+
_ = c.close()
208+
return
209+
}
210+
fs.Debugf(f, "Connection OK after error: %v", err)
211+
}
212+
}
212213

213214
f.poolMu.Lock()
214215
f.pool = append(f.pool, c)
@@ -235,15 +236,18 @@ func (f *Fs) drainPool(ctx context.Context) (err error) {
235236
if len(f.pool) != 0 {
236237
fs.Debugf(f, "Closing %d unused connections", len(f.pool))
237238
}
239+
240+
g, _ := errgroup.WithContext(ctx)
238241
for i, c := range f.pool {
239-
if !c.closed() {
240-
cErr := c.close()
241-
if cErr != nil {
242-
err = cErr
242+
g.Go(func() (err error) {
243+
if !c.closed() {
244+
err = c.close()
243245
}
244-
}
245-
f.pool[i] = nil
246+
f.pool[i] = nil
247+
return err
248+
})
246249
}
250+
err = g.Wait()
247251
f.pool = nil
248252
return err
249253
}

backend/smb/smb.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
const (
28-
minSleep = 100 * time.Millisecond
28+
minSleep = 10 * time.Millisecond
2929
maxSleep = 2 * time.Second
3030
decayConstant = 2 // bigger for slower decay, exponential
3131
)
@@ -207,7 +207,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
207207
return nil, err
208208
}
209209
stat, err := cn.smbShare.Stat(f.toSambaPath(dir))
210-
f.putConnection(&cn)
210+
f.putConnection(&cn, err)
211211
if err != nil {
212212
// ignore stat error here
213213
return f, nil
@@ -268,7 +268,7 @@ func (f *Fs) findObjectSeparate(ctx context.Context, share, path string) (fs.Obj
268268
return nil, err
269269
}
270270
stat, err := cn.smbShare.Stat(f.toSambaPath(path))
271-
f.putConnection(&cn)
271+
f.putConnection(&cn, err)
272272
if err != nil {
273273
return nil, translateError(err, false)
274274
}
@@ -290,7 +290,7 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
290290
return err
291291
}
292292
err = cn.smbShare.MkdirAll(f.toSambaPath(path), 0o755)
293-
f.putConnection(&cn)
293+
f.putConnection(&cn, err)
294294
return err
295295
}
296296

@@ -305,7 +305,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
305305
return err
306306
}
307307
err = cn.smbShare.Remove(f.toSambaPath(path))
308-
f.putConnection(&cn)
308+
f.putConnection(&cn, err)
309309
return err
310310
}
311311

@@ -375,7 +375,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (_ fs.Objec
375375
return nil, err
376376
}
377377
err = cn.smbShare.Rename(f.toSambaPath(srcPath), f.toSambaPath(dstPath))
378-
f.putConnection(&cn)
378+
f.putConnection(&cn, err)
379379
if err != nil {
380380
return nil, translateError(err, false)
381381
}
@@ -412,7 +412,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
412412
if err != nil {
413413
return err
414414
}
415-
defer f.putConnection(&cn)
415+
defer f.putConnection(&cn, err)
416416

417417
_, err = cn.smbShare.Stat(dstPath)
418418
if os.IsNotExist(err) {
@@ -430,7 +430,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
430430
if err != nil {
431431
return nil, err
432432
}
433-
defer f.putConnection(&cn)
433+
defer f.putConnection(&cn, err)
434434

435435
if share == "" {
436436
shares, err := cn.smbSession.ListSharenames()
@@ -474,7 +474,7 @@ func (f *Fs) About(ctx context.Context) (_ *fs.Usage, err error) {
474474
return nil, err
475475
}
476476
stat, err := cn.smbShare.Statfs(dir)
477-
f.putConnection(&cn)
477+
f.putConnection(&cn, err)
478478
if err != nil {
479479
return nil, err
480480
}
@@ -556,7 +556,7 @@ func (f *Fs) ensureDirectory(ctx context.Context, share, _path string) error {
556556
return err
557557
}
558558
err = cn.smbShare.MkdirAll(f.toSambaPath(dir), 0o755)
559-
f.putConnection(&cn)
559+
f.putConnection(&cn, err)
560560
return err
561561
}
562562

@@ -604,7 +604,7 @@ func (o *Object) SetModTime(ctx context.Context, t time.Time) (err error) {
604604
if err != nil {
605605
return err
606606
}
607-
defer o.fs.putConnection(&cn)
607+
defer o.fs.putConnection(&cn, err)
608608

609609
err = cn.smbShare.Chtimes(reqDir, t, t)
610610
if err != nil {
@@ -650,24 +650,25 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
650650
}
651651
fl, err := cn.smbShare.OpenFile(filename, os.O_RDONLY, 0)
652652
if err != nil {
653-
o.fs.putConnection(&cn)
653+
o.fs.putConnection(&cn, err)
654654
return nil, fmt.Errorf("failed to open: %w", err)
655655
}
656656
pos, err := fl.Seek(offset, io.SeekStart)
657657
if err != nil {
658-
o.fs.putConnection(&cn)
658+
o.fs.putConnection(&cn, err)
659659
return nil, fmt.Errorf("failed to seek: %w", err)
660660
}
661661
if pos != offset {
662-
o.fs.putConnection(&cn)
663-
return nil, fmt.Errorf("failed to seek: wrong position (expected=%d, reported=%d)", offset, pos)
662+
err = fmt.Errorf("failed to seek: wrong position (expected=%d, reported=%d)", offset, pos)
663+
o.fs.putConnection(&cn, err)
664+
return nil, err
664665
}
665666

666667
in = readers.NewLimitedReadCloser(fl, limit)
667668
in = &boundReadCloser{
668669
rc: in,
669670
close: func() error {
670-
o.fs.putConnection(&cn)
671+
o.fs.putConnection(&cn, nil)
671672
return nil
672673
},
673674
}
@@ -697,7 +698,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
697698
return err
698699
}
699700
defer func() {
700-
o.fs.putConnection(&cn)
701+
o.fs.putConnection(&cn, err)
701702
}()
702703

703704
fl, err := cn.smbShare.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
@@ -757,7 +758,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
757758
}
758759

759760
err = cn.smbShare.Remove(filename)
760-
o.fs.putConnection(&cn)
761+
o.fs.putConnection(&cn, err)
761762

762763
return err
763764
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ require (
2525
github.com/aws/smithy-go v1.22.1
2626
github.com/buengese/sgzip v0.1.1
2727
github.com/cloudinary/cloudinary-go/v2 v2.9.0
28-
github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6
28+
github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed
2929
github.com/colinmarc/hdfs/v2 v2.4.0
3030
github.com/coreos/go-semver v0.3.1
3131
github.com/coreos/go-systemd/v22 v22.5.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vc
181181
github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
182182
github.com/cloudinary/cloudinary-go/v2 v2.9.0 h1:8C76QklmuV4qmKAC7cUnu9D68X9kCkFMuLspPikECCo=
183183
github.com/cloudinary/cloudinary-go/v2 v2.9.0/go.mod h1:ireC4gqVetsjVhYlwjUJwKTbZuWjEIynbR9zQTlqsvo=
184-
github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6 h1:mLY/79N73URZ2J/oRKTxmfhCgxThzBmjQ6XOjX5tYjI=
185-
github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6/go.mod h1:0aLYPsmguHbok591y6hI5yAqU0drbUzrPEO10ZpgTTw=
184+
github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed h1:KrdJUJWhJ1UWhvaP6SBsvG356KjqfdDjcS/4xTswAU4=
185+
github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed/go.mod h1:0aLYPsmguHbok591y6hI5yAqU0drbUzrPEO10ZpgTTw=
186186
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
187187
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
188188
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=

0 commit comments

Comments
 (0)