@@ -22,6 +22,8 @@ import (
2222
2323 "github.com/cea-hpc/sshproxy/pkg/record"
2424 "github.com/cea-hpc/sshproxy/pkg/utils"
25+
26+ "go.etcd.io/etcd/clientv3"
2527)
2628
2729// Dup duplicates a []byte slice.
@@ -100,6 +102,7 @@ type Recorder struct {
100102 dumpfile string // path to filename where the raw records are dumped
101103 dumpLimitSize uint64 // number of bytes beyond which records are no longer dumped
102104 dumpLimitWindow time.Duration // time window in which dump size is accounted
105+ leaseID clientv3.LeaseID // etcd lease ID used for updating stats
103106 writer * record.Writer // *record.Writer where the raw records are dumped
104107}
105108
@@ -108,7 +111,7 @@ type Recorder struct {
108111// If dumpfile is not empty, the intercepted raw data will be written in this
109112// file. Logging of basic statistics will be done every logStatsInterval seconds. Bandwidth will be updated in etcd every etcdStatsInterval seconds.
110113// It will stop recording when the context is cancelled.
111- func NewRecorder (conninfo * ConnInfo , dumpfile , command string , etcdStatsInterval time.Duration , logStatsInterval time.Duration , dumpLimitSize uint64 , dumpLimitWindow time.Duration ) * Recorder {
114+ func NewRecorder (conninfo * ConnInfo , dumpfile , command string , etcdStatsInterval time.Duration , logStatsInterval time.Duration , dumpLimitSize uint64 , dumpLimitWindow time.Duration , leaseID clientv3. LeaseID ) * Recorder {
112115 ch := make (chan record.Record )
113116
114117 return & Recorder {
@@ -125,27 +128,19 @@ func NewRecorder(conninfo *ConnInfo, dumpfile, command string, etcdStatsInterval
125128 dumpfile : dumpfile ,
126129 dumpLimitSize : dumpLimitSize ,
127130 dumpLimitWindow : dumpLimitWindow ,
131+ leaseID : leaseID ,
128132 writer : nil ,
129133 }
130134}
131135
132136// updateStats writes the bandwidth to etcd
133- func (r * Recorder ) updateStats (ctx context. Context , cli * utils.Client , etcdPath string ) {
137+ func (r * Recorder ) updateStats (cli * utils.Client , etcdPath string ) {
134138 if cli != nil {
135139 if cli .IsAlive () {
136- keepAliveChan , err := cli .UpdateStats (ctx , etcdPath , r .bandwidth )
140+ err := cli .UpdateStats (etcdPath , r .bandwidth , r . leaseID )
137141 if err != nil {
138142 log .Errorf ("updating stats: %v" , err )
139143 }
140- go func () {
141- for {
142- select {
143- case <- keepAliveChan :
144- case <- ctx .Done ():
145- return
146- }
147- }
148- }()
149144 }
150145 }
151146}
@@ -245,7 +240,7 @@ func (r *Recorder) Run(ctx context.Context, cli *utils.Client, etcdPath string)
245240 for {
246241 select {
247242 case <- time .After (r .etcdStatsInterval ):
248- r .updateStats (ctx , cli , etcdPath )
243+ r .updateStats (cli , etcdPath )
249244 case <- ctx .Done ():
250245 return
251246 }
0 commit comments