Skip to content

Commit 1055c8b

Browse files
srebhanmstrandboge
authored andcommitted
chore(agent): Refactor ticker code into own internal package (#18313)
(cherry picked from commit 2a32a7a)
1 parent 46df3d5 commit 1055c8b

File tree

7 files changed

+406
-486
lines changed

7 files changed

+406
-486
lines changed

agent/agent.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/influxdata/telegraf"
1616
"github.com/influxdata/telegraf/config"
1717
"github.com/influxdata/telegraf/internal"
18+
"github.com/influxdata/telegraf/internal/clock"
1819
"github.com/influxdata/telegraf/models"
1920
"github.com/influxdata/telegraf/plugins/common/snmp"
2021
"github.com/influxdata/telegraf/plugins/processors"
@@ -398,7 +399,7 @@ func (a *Agent) runInputs(
398399
unit *inputUnit,
399400
) {
400401
var wg sync.WaitGroup
401-
tickers := make([]Ticker, 0, len(unit.inputs))
402+
tickers := make([]clock.Ticker, 0, len(unit.inputs))
402403
for _, input := range unit.inputs {
403404
// Overwrite agent interval if this plugin has its own.
404405
interval := time.Duration(a.Config.Agent.Interval)
@@ -424,12 +425,7 @@ func (a *Agent) runInputs(
424425
offset = input.Config.CollectionOffset
425426
}
426427

427-
var ticker Ticker
428-
if a.Config.Agent.RoundInterval {
429-
ticker = NewAlignedTicker(startTime, interval, jitter, offset)
430-
} else {
431-
ticker = NewUnalignedTicker(interval, jitter, offset)
432-
}
428+
ticker := clock.NewTicker(startTime, interval, jitter, offset, a.Config.Agent.RoundInterval)
433429
tickers = append(tickers, ticker)
434430

435431
acc := NewAccumulator(input, unit.dst)
@@ -566,7 +562,7 @@ func (a *Agent) gatherLoop(
566562
ctx context.Context,
567563
acc telegraf.Accumulator,
568564
input *models.RunningInput,
569-
ticker Ticker,
565+
ticker clock.Ticker,
570566
interval time.Duration,
571567
) {
572568
for {
@@ -583,7 +579,7 @@ func (a *Agent) gatherLoop(
583579
}
584580

585581
// gatherOnce runs the input's Gather function once, logging a warning each interval it fails to complete before.
586-
func (*Agent) gatherOnce(acc telegraf.Accumulator, input *models.RunningInput, ticker Ticker, interval time.Duration) error {
582+
func (*Agent) gatherOnce(acc telegraf.Accumulator, input *models.RunningInput, ticker clock.Ticker, interval time.Duration) error {
587583
done := make(chan error)
588584
go func() {
589585
defer panicRecover(input)
@@ -854,7 +850,7 @@ func (a *Agent) runOutputs(
854850
go func(output *models.RunningOutput) {
855851
defer wg.Done()
856852

857-
ticker := NewRollingTicker(interval, jitter)
853+
ticker := clock.NewTimer(interval, jitter)
858854
defer ticker.Stop()
859855

860856
a.flushLoop(ctx, output, ticker)
@@ -884,7 +880,7 @@ func (a *Agent) runOutputs(
884880
func (a *Agent) flushLoop(
885881
ctx context.Context,
886882
output *models.RunningOutput,
887-
ticker Ticker,
883+
ticker clock.Ticker,
888884
) {
889885
logError := func(err error) {
890886
if err != nil {
@@ -921,7 +917,7 @@ func (a *Agent) flushLoop(
921917
}
922918

923919
// flushOnce runs the output's Write function once, logging a warning each interval it fails to complete before the flush interval elapses.
924-
func (*Agent) flushOnce(output *models.RunningOutput, ticker Ticker, writeFunc func() error) error {
920+
func (*Agent) flushOnce(output *models.RunningOutput, ticker clock.Ticker, writeFunc func() error) error {
925921
done := make(chan error)
926922
go func() {
927923
done <- writeFunc()
@@ -1208,7 +1204,7 @@ func panicRecover(input *models.RunningInput) {
12081204
}
12091205
}
12101206

1211-
func stopTickers(tickers []Ticker) {
1207+
func stopTickers(tickers []clock.Ticker) {
12121208
for _, ticker := range tickers {
12131209
ticker.Stop()
12141210
}

agent/tick.go

Lines changed: 0 additions & 281 deletions
This file was deleted.

0 commit comments

Comments
 (0)