Skip to content

Commit 05bfe8a

Browse files
committed
move scheduler to source package
1 parent 1ab7f12 commit 05bfe8a

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

internal/component/loki/source/file/file.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/grafana/alloy/internal/component"
1818
"github.com/grafana/alloy/internal/component/common/loki"
1919
"github.com/grafana/alloy/internal/component/discovery"
20+
"github.com/grafana/alloy/internal/component/loki/source"
2021
"github.com/grafana/alloy/internal/component/loki/source/internal/positions"
2122
"github.com/grafana/alloy/internal/featuregate"
2223
"github.com/grafana/alloy/internal/runtime/logging/level"
@@ -144,7 +145,7 @@ type Component struct {
144145
// new arguments.
145146
resolver resolver
146147
// scheduler owns the lifecycle of sources.
147-
scheduler *Scheduler[positions.Entry]
148+
scheduler *source.Scheduler[positions.Entry]
148149

149150
// watcher is a background trigger that periodically invokes
150151
// scheduling when file matching is enabled.
@@ -186,7 +187,7 @@ func New(o component.Options, args Arguments) (*Component, error) {
186187
handler: loki.NewLogsReceiver(),
187188
receivers: args.ForwardTo,
188189
posFile: positionsFile,
189-
scheduler: NewScheduler[positions.Entry](),
190+
scheduler: source.NewScheduler[positions.Entry](),
190191
watcher: time.NewTicker(args.FileMatch.SyncPeriod),
191192
}
192193

@@ -368,7 +369,7 @@ func (c *Component) scheduleSources() {
368369
c.scheduler.ScheduleSource(source)
369370
}
370371

371-
var toDelete []Source[positions.Entry]
372+
var toDelete []source.Source[positions.Entry]
372373

373374
// Avoid mutating the scheduler state during iteration. Collect sources to
374375
// remove and stop them in a separate loop.
@@ -426,7 +427,7 @@ type sourceOptions struct {
426427
}
427428

428429
// newSource will return a decompressor source if enabled, otherwise a tailer source.
429-
func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], error) {
430+
func (c *Component) newSource(opts sourceOptions) (source.Source[positions.Entry], error) {
430431
if opts.decompressionConfig.Enabled {
431432
decompressor, err := newDecompressor(
432433
c.metrics,
@@ -452,7 +453,7 @@ func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], erro
452453
if err != nil {
453454
return nil, fmt.Errorf("failed to create tailer %w", err)
454455
}
455-
return NewSourceWithRetry(tailer, backoff.Config{
456+
return source.NewSourceWithRetry(tailer, backoff.Config{
456457
MinBackoff: 1 * time.Second,
457458
MaxBackoff: 10 * time.Second,
458459
}), nil

internal/component/loki/source/file/scheduler.go renamed to internal/component/loki/source/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package file
1+
package source
22

33
import (
44
"context"

internal/component/loki/source/file/scheduler_test.go renamed to internal/component/loki/source/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package file
1+
package source
22

33
import (
44
"context"

0 commit comments

Comments
 (0)