Skip to content

Commit fb4e922

Browse files
committed
version 1.3.3: reconnect to etcd after a connection lost. Avoid heavy
CPU usage in case of loss of etcd connection.
1 parent 379fdf4 commit fb4e922

File tree

5 files changed

+65
-28
lines changed

5 files changed

+65
-28
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
SSHPROXY_VERSION ?= 1.3.2
1+
SSHPROXY_VERSION ?= 1.3.3
22
SSHPROXY_GIT_URL ?= github.com/cea-hpc/sshproxy
33

44
prefix ?= /usr

cmd/sshproxy/recorder.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323

2424
"github.com/cea-hpc/sshproxy/pkg/record"
2525
"github.com/cea-hpc/sshproxy/pkg/utils"
26-
27-
"go.etcd.io/etcd/clientv3"
2826
)
2927

3028
// Dup duplicates a []byte slice.
@@ -103,7 +101,6 @@ type Recorder struct {
103101
dumpfile string // path to filename where the raw records are dumped
104102
dumpLimitSize uint64 // number of bytes beyond which records are no longer dumped
105103
dumpLimitWindow time.Duration // time window in which dump size is accounted
106-
leaseID clientv3.LeaseID // etcd lease ID used for updating stats
107104
lock sync.RWMutex // mutex to avoid concurrent reads and writes in bandwidth and totals maps
108105
writer *record.Writer // *record.Writer where the raw records are dumped
109106
}
@@ -113,7 +110,7 @@ type Recorder struct {
113110
// If dumpfile is not empty, the intercepted raw data will be written in this
114111
// file. Logging of basic statistics will be done every logStatsInterval seconds. Bandwidth will be updated in etcd every etcdStatsInterval seconds.
115112
// It will stop recording when the context is cancelled.
116-
func NewRecorder(conninfo *ConnInfo, dumpfile, command string, etcdStatsInterval time.Duration, logStatsInterval time.Duration, dumpLimitSize uint64, dumpLimitWindow time.Duration, leaseID clientv3.LeaseID) *Recorder {
113+
func NewRecorder(conninfo *ConnInfo, dumpfile, command string, etcdStatsInterval time.Duration, logStatsInterval time.Duration, dumpLimitSize uint64, dumpLimitWindow time.Duration) *Recorder {
117114
ch := make(chan record.Record)
118115

119116
return &Recorder{
@@ -130,23 +127,21 @@ func NewRecorder(conninfo *ConnInfo, dumpfile, command string, etcdStatsInterval
130127
dumpfile: dumpfile,
131128
dumpLimitSize: dumpLimitSize,
132129
dumpLimitWindow: dumpLimitWindow,
133-
leaseID: leaseID,
134130
lock: sync.RWMutex{},
135131
writer: nil,
136132
}
137133
}
138134

139135
// updateStats writes the bandwidth to etcd
140136
func (r *Recorder) updateStats(cli *utils.Client, etcdPath string) {
141-
if cli != nil {
142-
if cli.IsAlive() {
143-
r.lock.RLock()
144-
stats := r.bandwidth
145-
r.lock.RUnlock()
146-
err := cli.UpdateStats(etcdPath, stats, r.leaseID)
147-
if err != nil {
148-
log.Errorf("updating stats: %v", err)
149-
}
137+
if cli != nil && cli.IsAlive() {
138+
r.lock.RLock()
139+
stats := r.bandwidth
140+
r.lock.RUnlock()
141+
err := cli.UpdateStats(etcdPath, stats)
142+
if err != nil {
143+
log.Errorf("updating stats: %v", err)
144+
cli.Disable()
150145
}
151146
}
152147
}

cmd/sshproxy/sshproxy.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,12 @@ func mainExitCode() int {
352352
}()
353353

354354
var etcdPath string
355-
var leaseID clientv3.LeaseID
355+
var tmpKeepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
356356
// Register destination in etcd and keep it alive while running.
357357
if cli != nil && cli.IsAlive() {
358358
key := fmt.Sprintf("%s@%s", username, service)
359-
keepAliveChan, eP, lID, err := cli.SetDestination(ctx, key, sshInfos.Dst(), hostport)
359+
keepAliveChan, eP, err := cli.SetDestination(ctx, key, sshInfos.Dst(), hostport)
360360
etcdPath = eP
361-
leaseID = lID
362361
if err != nil {
363362
log.Warningf("setting destination in etcd: %v", err)
364363
}
@@ -368,6 +367,18 @@ func mainExitCode() int {
368367
for {
369368
select {
370369
case <-keepAliveChan:
370+
if !cli.IsAlive() {
371+
tmpKeepAliveChan, err = cli.NewLease(ctx)
372+
if err != nil {
373+
log.Warningf("getting a new lease in etcd: %v", err)
374+
} else {
375+
keepAliveChan = tmpKeepAliveChan
376+
cli.Enable()
377+
}
378+
} else {
379+
// Avoid looping too fast in case of a loss of etcd
380+
time.Sleep(time.Second)
381+
}
371382
case <-ctx.Done():
372383
return
373384
}
@@ -447,7 +458,7 @@ func mainExitCode() int {
447458

448459
var recorder *Recorder
449460
if config.Dump != "" {
450-
recorder = NewRecorder(conninfo, config.Dump, originalCmd, config.EtcdStatsInterval.Duration(), config.LogStatsInterval.Duration(), config.DumpLimitSize, config.DumpLimitWindow.Duration(), leaseID)
461+
recorder = NewRecorder(conninfo, config.Dump, originalCmd, config.EtcdStatsInterval.Duration(), config.LogStatsInterval.Duration(), config.DumpLimitSize, config.DumpLimitWindow.Duration())
451462

452463
wg.Add(1)
453464
go func() {

misc/sshproxy.spec

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
%global debug_package %{nil}
44

55
Name: sshproxy
6-
Version: 1.3.2
6+
Version: 1.3.3
77
Release: 1%{?dist}
88
Summary: SSH proxy
99
License: CeCILL-B
@@ -51,6 +51,9 @@ install -p -m 0644 config/sshproxy.yaml %{buildroot}%{_sysconfdir}/sshproxy
5151
%{_mandir}/man8/sshproxy-replay.8*
5252

5353
%changelog
54+
* Fri Oct 02 2020 Cyril Servant <[email protected]> - 1.3.3-1
55+
- sshproxy 1.3.3
56+
5457
* Mon Sep 28 2020 Cyril Servant <[email protected]> - 1.3.2-1
5558
- sshproxy 1.3.2
5659

pkg/utils/etcd.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ type Client struct {
107107
log *logging.Logger
108108
requestTimeout time.Duration
109109
keyTTL int64
110+
active bool
111+
leaseID clientv3.LeaseID
110112
}
111113

112114
// Host represents the state of a host.
@@ -172,6 +174,7 @@ func NewEtcdClient(config *Config, log *logging.Logger) (*Client, error) {
172174
log: log,
173175
requestTimeout: 2 * time.Second,
174176
keyTTL: keyTTL,
177+
active: true,
175178
}, nil
176179
}
177180

@@ -205,35 +208,50 @@ func (c *Client) GetDestination(key string) (string, error) {
205208
}
206209

207210
// SetDestination set current destination in etcd.
208-
func (c *Client) SetDestination(rootctx context.Context, key, sshdHostport string, dst string) (<-chan *clientv3.LeaseKeepAliveResponse, string, clientv3.LeaseID, error) {
211+
func (c *Client) SetDestination(rootctx context.Context, key, sshdHostport string, dst string) (<-chan *clientv3.LeaseKeepAliveResponse, string, error) {
209212
path := fmt.Sprintf("%s/%s/%s/%s", toConnectionKey(key), dst, sshdHostport, time.Now().Format(time.RFC3339Nano))
210213
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
211214
resp, err := c.cli.Grant(ctx, c.keyTTL)
212215
cancel()
213216
if err != nil {
214-
return nil, "", 0, err
217+
return nil, "", err
215218
}
216219

217220
bytes, err := json.Marshal(&Bandwidth{
218221
In: 0,
219222
Out: 0,
220223
})
221224
if err != nil {
222-
return nil, "", 0, err
225+
return nil, "", err
223226
}
224227
ctx, cancel = context.WithTimeout(context.Background(), c.requestTimeout)
225228
_, err = c.cli.Put(ctx, path, string(bytes), clientv3.WithLease(resp.ID))
226229
cancel()
227230
if err != nil {
228-
return nil, "", 0, err
231+
return nil, "", err
229232
}
230233

231234
k, e := c.cli.KeepAlive(rootctx, resp.ID)
232-
return k, path, resp.ID, e
235+
c.leaseID = resp.ID
236+
return k, path, e
237+
}
238+
239+
// NewLease creates a new lease in etcd.
240+
func (c *Client) NewLease(rootctx context.Context) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
241+
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
242+
resp, err := c.cli.Grant(ctx, c.keyTTL)
243+
cancel()
244+
if err != nil {
245+
return nil, err
246+
}
247+
248+
k, e := c.cli.KeepAlive(rootctx, resp.ID)
249+
c.leaseID = resp.ID
250+
return k, e
233251
}
234252

235253
// UpdateStats updates the stats (bandwidth in and out in kB/s) of a connection.
236-
func (c *Client) UpdateStats(etcdPath string, stats map[int]uint64, leaseID clientv3.LeaseID) error {
254+
func (c *Client) UpdateStats(etcdPath string, stats map[int]uint64) error {
237255
bytes, err := json.Marshal(&Bandwidth{
238256
In: int(stats[0] / 1024),
239257
Out: int((stats[1] + stats[2]) / 1024),
@@ -243,7 +261,7 @@ func (c *Client) UpdateStats(etcdPath string, stats map[int]uint64, leaseID clie
243261
}
244262

245263
ctx, cancel := context.WithTimeout(context.Background(), c.requestTimeout)
246-
_, err = c.cli.Put(ctx, etcdPath, string(bytes), clientv3.WithLease(leaseID))
264+
_, err = c.cli.Put(ctx, etcdPath, string(bytes), clientv3.WithLease(c.leaseID))
247265
cancel()
248266
if err != nil {
249267
return err
@@ -567,5 +585,15 @@ func (c *Client) GetAllGroups(allFlag bool) (map[string]*FlatGroup, error) {
567585

568586
// IsAlive checks if etcd client is still usable.
569587
func (c *Client) IsAlive() bool {
570-
return c.cli != nil
588+
return c.cli != nil && c.active == true
589+
}
590+
591+
// Enable enables the etcd client.
592+
func (c *Client) Enable() {
593+
c.active = true
594+
}
595+
596+
// Disable disables the etcd client.
597+
func (c *Client) Disable() {
598+
c.active = false
571599
}

0 commit comments

Comments
 (0)