11// Copyright 2015-2020 CEA/DAM/DIF
2- // Contributor: Arnaud Guignard <[email protected] > 2+ // Author: Arnaud Guignard <[email protected] > 3+ // Contributor: Cyril Servant <[email protected] > 34//
45// This software is governed by the CeCILL-B license under French law and
56// abiding by the rules of distribution of free software. You can use,
@@ -98,6 +99,7 @@ type Recorder struct {
9899 command string // initial user command
99100 dumpfile string // path to filename where the raw records are dumped
100101 dumpLimitSize uint64 // number of bytes beyond which records are no longer dumped
102+ dumpLimitWindow time.Duration // time window in which dump size is accounted
101103 writer * record.Writer // *record.Writer where the raw records are dumped
102104}
103105
@@ -106,7 +108,7 @@ type Recorder struct {
106108// If dumpfile is not empty, the intercepted raw data will be written in this
107109// file. Logging of basic statistics will be done every logStatsInterval seconds. Bandwidth will be updated in etcd every etcdStatsInterval seconds.
108110// It will stop recording when the context is cancelled.
109- func NewRecorder (conninfo * ConnInfo , dumpfile , command string , etcdStatsInterval time.Duration , logStatsInterval time.Duration , dumpLimitSize uint64 ) * Recorder {
111+ func NewRecorder (conninfo * ConnInfo , dumpfile , command string , etcdStatsInterval time.Duration , logStatsInterval time.Duration , dumpLimitSize uint64 , dumpLimitWindow time. Duration ) * Recorder {
110112 ch := make (chan record.Record )
111113
112114 return & Recorder {
@@ -122,6 +124,7 @@ func NewRecorder(conninfo *ConnInfo, dumpfile, command string, etcdStatsInterval
122124 command : command ,
123125 dumpfile : dumpfile ,
124126 dumpLimitSize : dumpLimitSize ,
127+ dumpLimitWindow : dumpLimitWindow ,
125128 writer : nil ,
126129 }
127130}
@@ -231,6 +234,12 @@ func (r *Recorder) Run(ctx context.Context, cli *utils.Client, etcdPath string)
231234 }
232235 }()
233236 }
237+ bw := uint64 (0 )
238+ bwBuf := uint64 (0 )
239+ var bwTimeout <- chan time.Time
240+ if r .dumpLimitWindow != 0 {
241+ bwTimeout = time .After (r .dumpLimitWindow )
242+ }
234243 if r .etcdStatsInterval != 0 {
235244 go func () {
236245 for {
@@ -252,16 +261,22 @@ func (r *Recorder) Run(ctx context.Context, cli *utils.Client, etcdPath string)
252261 r .bandwidth [i ] = buf [i ] / uint64 (r .etcdStatsInterval .Seconds ())
253262 buf [i ] = 0
254263 }
264+ case <- bwTimeout :
265+ bwTimeout = time .After (r .dumpLimitWindow )
266+ bw = bwBuf
267+ bwBuf = 0
255268 case rec := <- r .ch :
256269 buf [rec .Fd ] += uint64 (rec .Size )
257270 r .totals [rec .Fd ] += uint64 (rec .Size )
258271 if r .writer != nil {
259- r .dump (rec )
260- if r .dumpLimitSize != 0 && r .totals [rec .Fd ] > r .dumpLimitSize {
272+ if r .dumpLimitSize == 0 || (bw < r .dumpLimitSize && bwBuf < r .dumpLimitSize ) {
273+ r .dump (rec )
274+ } else if r .dumpLimitWindow == 0 {
261275 fd .Close ()
262276 fd = nil
263277 r .writer = nil
264278 }
279+ bwBuf += uint64 (rec .Size )
265280 }
266281 case <- ctx .Done ():
267282 return
@@ -270,15 +285,21 @@ func (r *Recorder) Run(ctx context.Context, cli *utils.Client, etcdPath string)
270285 } else {
271286 for {
272287 select {
288+ case <- bwTimeout :
289+ bwTimeout = time .After (r .dumpLimitWindow )
290+ bw = bwBuf
291+ bwBuf = 0
273292 case rec := <- r .ch :
274293 r .totals [rec .Fd ] += uint64 (rec .Size )
275294 if r .writer != nil {
276- r .dump (rec )
277- if r .dumpLimitSize != 0 && r .totals [rec .Fd ] > r .dumpLimitSize {
295+ if r .dumpLimitSize == 0 || (bw < r .dumpLimitSize && bwBuf < r .dumpLimitSize ) {
296+ r .dump (rec )
297+ } else if r .dumpLimitWindow == 0 {
278298 fd .Close ()
279299 fd = nil
280300 r .writer = nil
281301 }
302+ bwBuf += uint64 (rec .Size )
282303 }
283304 case <- ctx .Done ():
284305 return
0 commit comments