Skip to content

Commit 46b498b

Browse files
darthShadowncw
authored andcommitted
stats: fix the speed not getting updated after a pause in the processing
This shifts the behavior of the average loop to be a persistent loop that gets resumed/paused when transfers & checks are started/completed. Previously, the averageLoop was stopped on completion of transfers & checks but failed to start again due to the protection of the sync.Once Signed-off-by: Anagh Kumar Baranwal <[email protected]>
1 parent b76cd74 commit 46b498b

File tree

1 file changed

+75
-38
lines changed

1 file changed

+75
-38
lines changed

fs/accounting/stats.go

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -65,28 +65,29 @@ type StatsInfo struct {
6565
}
6666

6767
type averageValues struct {
68-
mu sync.Mutex
69-
lpBytes int64
70-
lpTime time.Time
71-
speed float64
72-
stop chan bool
73-
stopped sync.WaitGroup
74-
startOnce sync.Once
75-
stopOnce sync.Once
68+
mu sync.Mutex
69+
lpBytes int64
70+
lpTime time.Time
71+
speed float64
72+
stop chan bool
73+
stopped sync.WaitGroup
74+
started bool
7675
}
7776

7877
// NewStats creates an initialised StatsInfo
7978
func NewStats(ctx context.Context) *StatsInfo {
8079
ci := fs.GetConfig(ctx)
81-
return &StatsInfo{
80+
s := &StatsInfo{
8281
ctx: ctx,
8382
ci: ci,
8483
checking: newTransferMap(ci.Checkers, "checking"),
8584
transferring: newTransferMap(ci.Transfers, "transferring"),
8685
inProgress: newInProgress(ctx),
8786
startTime: time.Now(),
88-
average: averageValues{stop: make(chan bool)},
87+
average: averageValues{},
8988
}
89+
s.startAverageLoop()
90+
return s
9091
}
9192

9293
// RemoteStats returns stats for rc
@@ -328,61 +329,96 @@ func (s *StatsInfo) averageLoop() {
328329
ticker := time.NewTicker(averagePeriodLength)
329330
defer ticker.Stop()
330331

331-
startTime := time.Now()
332332
a := &s.average
333333
defer a.stopped.Done()
334+
335+
shouldRun := false
336+
334337
for {
335338
select {
336339
case now := <-ticker.C:
337340
a.mu.Lock()
338-
var elapsed float64
339-
if a.lpTime.IsZero() {
340-
elapsed = now.Sub(startTime).Seconds()
341-
} else {
342-
elapsed = now.Sub(a.lpTime).Seconds()
341+
342+
if !shouldRun {
343+
a.mu.Unlock()
344+
continue
343345
}
346+
344347
avg := 0.0
348+
elapsed := now.Sub(a.lpTime).Seconds()
345349
if elapsed > 0 {
346350
avg = float64(a.lpBytes) / elapsed
347351
}
352+
348353
if period < averagePeriod {
349354
period++
350355
}
356+
351357
a.speed = (avg + a.speed*(period-1)) / period
352358
a.lpBytes = 0
353359
a.lpTime = now
360+
361+
a.mu.Unlock()
362+
363+
case stop, ok := <-a.stop:
364+
if !ok {
365+
return // Channel closed, exit the loop
366+
}
367+
368+
a.mu.Lock()
369+
370+
// If we are resuming, store the current time
371+
if !shouldRun && !stop {
372+
a.lpTime = time.Now()
373+
}
374+
shouldRun = !stop
375+
354376
a.mu.Unlock()
355-
case <-a.stop:
356-
return
357377
}
358378
}
359379
}
360380

361-
// Start the average loop
362-
func (s *StatsInfo) startAverageLoop() {
381+
// Resume the average loop
382+
func (s *StatsInfo) resumeAverageLoop() {
383+
s.mu.Lock()
384+
defer s.mu.Unlock()
385+
s.average.stop <- false
386+
}
387+
388+
// Pause the average loop
389+
func (s *StatsInfo) pauseAverageLoop() {
363390
s.mu.Lock()
364391
defer s.mu.Unlock()
365-
s.average.startOnce.Do(func() {
392+
s.average.stop <- true
393+
}
394+
395+
// Start the average loop
396+
//
397+
// Call with the mutex held
398+
func (s *StatsInfo) _startAverageLoop() {
399+
if !s.average.started {
400+
s.average.stop = make(chan bool)
401+
s.average.started = true
366402
s.average.stopped.Add(1)
367403
go s.averageLoop()
368-
})
404+
}
405+
}
406+
407+
// Start the average loop
408+
func (s *StatsInfo) startAverageLoop() {
409+
s.mu.Lock()
410+
defer s.mu.Unlock()
411+
s._startAverageLoop()
369412
}
370413

371414
// Stop the average loop
372415
//
373416
// Call with the mutex held
374417
func (s *StatsInfo) _stopAverageLoop() {
375-
s.average.stopOnce.Do(func() {
418+
if s.average.started {
376419
close(s.average.stop)
377420
s.average.stopped.Wait()
378-
})
379-
}
380-
381-
// Stop the average loop
382-
func (s *StatsInfo) stopAverageLoop() {
383-
s.mu.Lock()
384-
defer s.mu.Unlock()
385-
s._stopAverageLoop()
421+
}
386422
}
387423

388424
// String convert the StatsInfo to a string for printing
@@ -564,9 +600,9 @@ func (s *StatsInfo) GetBytesWithPending() int64 {
564600
pending := int64(0)
565601
for _, tr := range s.startedTransfers {
566602
if tr.acc != nil {
567-
bytes, size := tr.acc.progress()
568-
if bytes < size {
569-
pending += size - bytes
603+
bytesRead, size := tr.acc.progress()
604+
if bytesRead < size {
605+
pending += size - bytesRead
570606
}
571607
}
572608
}
@@ -699,7 +735,8 @@ func (s *StatsInfo) ResetCounters() {
699735
s.oldDuration = 0
700736

701737
s._stopAverageLoop()
702-
s.average = averageValues{stop: make(chan bool)}
738+
s.average = averageValues{}
739+
s._startAverageLoop()
703740
}
704741

705742
// ResetErrors sets the errors count to 0 and resets lastError, fatalError and retryError
@@ -788,15 +825,15 @@ func (s *StatsInfo) NewTransfer(obj fs.DirEntry, dstFs fs.Fs) *Transfer {
788825
}
789826
tr := newTransfer(s, obj, srcFs, dstFs)
790827
s.transferring.add(tr)
791-
s.startAverageLoop()
828+
s.resumeAverageLoop()
792829
return tr
793830
}
794831

795832
// NewTransferRemoteSize adds a transfer to the stats based on remote and size.
796833
func (s *StatsInfo) NewTransferRemoteSize(remote string, size int64, srcFs, dstFs fs.Fs) *Transfer {
797834
tr := newTransferRemoteSize(s, remote, size, false, "", srcFs, dstFs)
798835
s.transferring.add(tr)
799-
s.startAverageLoop()
836+
s.resumeAverageLoop()
800837
return tr
801838
}
802839

@@ -811,7 +848,7 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
811848
s.mu.Unlock()
812849
}
813850
if s.transferring.empty() && s.checking.empty() {
814-
time.AfterFunc(averageStopAfter, s.stopAverageLoop)
851+
s.pauseAverageLoop()
815852
}
816853
}
817854

0 commit comments

Comments
 (0)