From f330d67d7ad228eaf4c4ae623d8b4ad5cb885f3b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 4 Dec 2025 17:12:57 +0100 Subject: [PATCH 01/32] move scheduler to source package --- internal/component/loki/source/file/file.go | 11 ++++++----- .../component/loki/source/{file => }/scheduler.go | 2 +- .../loki/source/{file => }/scheduler_test.go | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) rename internal/component/loki/source/{file => }/scheduler.go (99%) rename internal/component/loki/source/{file => }/scheduler_test.go (99%) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 0da6c03cc0..785ad3ddcf 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -144,7 +145,7 @@ type Component struct { // new arguments. resolver resolver // scheduler owns the lifecycle of sources. - scheduler *Scheduler[positions.Entry] + scheduler *source.Scheduler[positions.Entry] // watcher is a background trigger that periodically invokes // scheduling when file matching is enabled. @@ -186,7 +187,7 @@ func New(o component.Options, args Arguments) (*Component, error) { handler: loki.NewLogsReceiver(), receivers: args.ForwardTo, posFile: positionsFile, - scheduler: NewScheduler[positions.Entry](), + scheduler: source.NewScheduler[positions.Entry](), watcher: time.NewTicker(args.FileMatch.SyncPeriod), } @@ -368,7 +369,7 @@ func (c *Component) scheduleSources() { c.scheduler.ScheduleSource(source) } - var toDelete []Source[positions.Entry] + var toDelete []source.Source[positions.Entry] // Avoid mutating the scheduler state during iteration. Collect sources to // remove and stop them in a separate loop. @@ -426,7 +427,7 @@ type sourceOptions struct { } // newSource will return a decompressor source if enabled, otherwise a tailer source. -func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], error) { +func (c *Component) newSource(opts sourceOptions) (source.Source[positions.Entry], error) { if opts.decompressionConfig.Enabled { decompressor, err := newDecompressor( c.metrics, @@ -452,7 +453,7 @@ func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], erro if err != nil { return nil, fmt.Errorf("failed to create tailer %w", err) } - return NewSourceWithRetry(tailer, backoff.Config{ + return source.NewSourceWithRetry(tailer, backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 10 * time.Second, }), nil diff --git a/internal/component/loki/source/file/scheduler.go b/internal/component/loki/source/scheduler.go similarity index 99% rename from internal/component/loki/source/file/scheduler.go rename to internal/component/loki/source/scheduler.go index 610bde4b41..3be0ca668b 100644 --- a/internal/component/loki/source/file/scheduler.go +++ b/internal/component/loki/source/scheduler.go @@ -1,4 +1,4 @@ -package file +package source import ( "context" diff --git a/internal/component/loki/source/file/scheduler_test.go b/internal/component/loki/source/scheduler_test.go similarity index 99% rename from internal/component/loki/source/file/scheduler_test.go rename to internal/component/loki/source/scheduler_test.go index 0187c09dd1..e584444fbb 100644 --- a/internal/component/loki/source/file/scheduler_test.go +++ b/internal/component/loki/source/scheduler_test.go @@ -1,4 +1,4 @@ -package file +package source import ( "context" From a21c23a2a2cb344832bc1359679c75d3154d404d Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 11:32:00 +0100 Subject: [PATCH 02/32] Add ext interface and move info creation into sources --- .../component/loki/source/file/decompresser.go | 10 ++++++++-- internal/component/loki/source/file/file.go | 13 ++++--------- internal/component/loki/source/file/tailer.go | 14 ++++++++++---- internal/component/loki/source/file/tailer_test.go | 1 + internal/component/loki/source/scheduler.go | 12 ++++++------ 5 files changed, 29 insertions(+), 21 deletions(-) diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index 2bed272e5c..10f9070ce0 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -339,8 +339,14 @@ func (d *decompressor) Key() positions.Entry { return d.key } -func (d *decompressor) IsRunning() bool { - return d.running.Load() +func (d *decompressor) DebugInfo() any { + offset, _ := d.positions.Get(d.key.Path, d.key.Labels) + return sourceDebugInfo{ + Path: d.key.Path, + Labels: d.key.Labels, + IsRunning: d.running.Load(), + ReadOffset: offset, + } } // cleanupMetrics removes all metrics exported by this reader diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 785ad3ddcf..555f66edf7 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -386,10 +386,10 @@ func (c *Component) scheduleSources() { } type debugInfo struct { - TargetsInfo []targetInfo `alloy:"targets_info,block"` + TargetsInfo []any `alloy:"targets_info,block"` } -type targetInfo struct { +type sourceDebugInfo struct { Path string `alloy:"path,attr"` Labels string `alloy:"labels,attr"` IsRunning bool `alloy:"is_running,attr"` @@ -404,13 +404,8 @@ func (c *Component) DebugInfo() any { defer c.mut.RUnlock() var res debugInfo for s := range c.scheduler.Sources() { - offset, _ := c.posFile.Get(s.Key().Path, s.Key().Labels) - res.TargetsInfo = append(res.TargetsInfo, targetInfo{ - Path: s.Key().Path, - Labels: s.Key().Labels, - IsRunning: s.IsRunning(), - ReadOffset: offset, - }) + ds := s.(source.DebugSource[positions.Entry]) + res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) } return res } diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 886784ebcf..a0a89d1400 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -354,10 +354,6 @@ func (t *tailer) Key() positions.Entry { return t.key } -func (t *tailer) IsRunning() bool { - return t.running.Load() -} - // cleanupMetrics removes all metrics exported by this tailer func (t *tailer) cleanupMetrics() { // When we stop tailing the file, also un-export metrics related to the file @@ -366,3 +362,13 @@ func (t *tailer) cleanupMetrics() { t.metrics.readBytes.DeleteLabelValues(t.key.Path) t.metrics.totalBytes.DeleteLabelValues(t.key.Path) } + +func (t *tailer) DebugInfo() any { + offset, _ := t.positions.Get(t.key.Path, t.key.Labels) + return sourceDebugInfo{ + Path: t.key.Path, + Labels: t.key.Labels, + IsRunning: t.running.Load(), + ReadOffset: offset, + } +} diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index 03e8f4db2f..7f4e21d75a 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -356,6 +356,7 @@ func TestTailerCorruptedPositions(t *testing.T) { // tailer needs some time to start time.Sleep(50 * time.Millisecond) + _, err = logFile.Write([]byte("writing some text\n")) require.NoError(t, err) select { diff --git a/internal/component/loki/source/scheduler.go b/internal/component/loki/source/scheduler.go index 3be0ca668b..77c8285bf7 100644 --- a/internal/component/loki/source/scheduler.go +++ b/internal/component/loki/source/scheduler.go @@ -98,8 +98,12 @@ type Source[K comparable] interface { Run(ctx context.Context) // Key is used to uniquely identify the source. Key() K - // IsRunning reports if source is still running. - IsRunning() bool +} + +// DebugSource is an optional interface with debug information. +type DebugSource[k comparable] interface { + Source[k] + DebugInfo() any } func NewSourceWithRetry[K comparable](source Source[K], config backoff.Config) *SourceWithRetry[K] { @@ -126,10 +130,6 @@ func (s *SourceWithRetry[K]) Key() K { return s.source.Key() } -func (s *SourceWithRetry[K]) IsRunning() bool { - return s.source.IsRunning() -} - // scheduledSource is a source that is already scheduled. // to stop the scheduledSource cancel needs to be called. type scheduledSource[K comparable] struct { From 235d5ead00e8e9a1f55bd41ca9fc8d35afec614b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 13:58:22 +0100 Subject: [PATCH 03/32] loki.source.docker: refactor to use shared scheduler --- .../component/loki/source/docker/docker.go | 147 ++++---- .../loki/source/docker/docker_test.go | 152 +------- .../{internal/dockertarget => }/metrics.go | 19 +- .../component/loki/source/docker/runner.go | 247 ------------- .../dockertarget/target.go => tailer.go} | 339 +++++++++--------- .../target_test.go => tailer_test.go} | 181 ++++++++-- .../dockertarget => }/testdata/flog.log | Bin .../testdata/flog_after_restart.log | Bin 8 files changed, 421 insertions(+), 664 deletions(-) rename internal/component/loki/source/docker/{internal/dockertarget => }/metrics.go (64%) delete mode 100644 internal/component/loki/source/docker/runner.go rename internal/component/loki/source/docker/{internal/dockertarget/target.go => tailer.go} (55%) rename internal/component/loki/source/docker/{internal/dockertarget/target_test.go => tailer_test.go} (57%) rename internal/component/loki/source/docker/{internal/dockertarget => }/testdata/flog.log (100%) rename internal/component/loki/source/docker/{internal/dockertarget => }/testdata/flog_after_restart.log (100%) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index d9c22920cb..72850e9637 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -25,7 +25,7 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/discovery" - dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" + "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -47,9 +47,12 @@ func init() { var userAgent = useragent.Get() const ( + // See github.com/prometheus/prometheus/discovery/moby dockerLabel = model.MetaLabelPrefix + "docker_" dockerLabelContainerPrefix = dockerLabel + "container_" dockerLabelContainerID = dockerLabelContainerPrefix + "id" + dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream" + dockerMaxChunkSize = 16384 ) // Arguments holds values which are used to configure the loki.source.docker @@ -102,16 +105,15 @@ var ( // Component implements the loki.source.file component. type Component struct { opts component.Options - metrics *dt.Metrics + metrics *metrics - mut sync.RWMutex - args Arguments - manager *manager - lastOptions *options - handler loki.LogsReceiver - posFile positions.Positions - rcs []*relabel.Config - defaultLabels model.LabelSet + mut sync.RWMutex + args Arguments + scheduler *source.Scheduler[string] + client client.APIClient + handler loki.LogsReceiver + posFile positions.Positions + rcs []*relabel.Config receiversMut sync.RWMutex receivers []loki.LogsReceiver @@ -134,11 +136,10 @@ func New(o component.Options, args Arguments) (*Component, error) { } c := &Component{ - opts: o, - metrics: dt.NewMetrics(o.Registerer), - + opts: o, + metrics: newMetrics(o.Registerer), handler: loki.NewLogsReceiver(), - manager: newManager(o.Logger, nil), + scheduler: source.NewScheduler[string](), receivers: args.ForwardTo, posFile: positionsFile, } @@ -159,11 +160,8 @@ func (c *Component) Run(ctx context.Context) error { c.mut.Lock() defer c.mut.Unlock() - // Guard for safety, but it's not possible for Run to be called without - // c.tailer being initialized. - if c.manager != nil { - c.manager.stop() - } + // FIXME: We need to drain here + c.scheduler.Stop() }() for { @@ -175,7 +173,11 @@ func (c *Component) Run(ctx context.Context) error { receivers := c.receivers c.receiversMut.RUnlock() for _, receiver := range receivers { - receiver.Chan() <- entry + select { + case <-ctx.Done(): + return nil + case receiver.Chan() <- entry: + } } } } @@ -198,33 +200,22 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() - managerOpts, err := c.getManagerOptions(newArgs) + opts, err := c.getClient(newArgs) if err != nil { return err } - if managerOpts != c.lastOptions { - // Options changed; pass it to the tailer. - // This will never fail because it only fails if the context gets canceled. - _ = c.manager.updateOptions(context.Background(), managerOpts) - c.lastOptions = managerOpts + if opts != c.client { + // Stop all tailers so all will be restarted + c.scheduler.Stop() } defaultLabels := make(model.LabelSet, len(newArgs.Labels)) for k, v := range newArgs.Labels { defaultLabels[model.LabelName(k)] = model.LabelValue(v) } - c.defaultLabels = defaultLabels - - if len(newArgs.RelabelRules) > 0 { - c.rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules) - } else { - c.rcs = []*relabel.Config{} - } - // Convert input targets into targets to give to tailer. - targets := make([]*dt.Target, 0, len(newArgs.Targets)) - seenTargets := make(map[string]struct{}, len(newArgs.Targets)) + c.rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules) promTargets := make([]promTarget, len(newArgs.Targets)) for i, target := range newArgs.Targets { @@ -238,53 +229,67 @@ func (c *Component) Update(args component.Arguments) error { return promTargets[i].fingerPrint < promTargets[j].fingerPrint }) + shouldRun := make(map[string]struct{}, len(newArgs.Targets)) for _, markedTarget := range promTargets { containerID, ok := markedTarget.labels[dockerLabelContainerID] if !ok { level.Debug(c.opts.Logger).Log("msg", "docker target did not include container ID label:"+dockerLabelContainerID) continue } - if _, seen := seenTargets[string(containerID)]; seen { + + if c.scheduler.Contains(string(containerID)) { continue } - seenTargets[string(containerID)] = struct{}{} - tgt, err := dt.NewTarget( + tailer, err := newTailer( c.metrics, log.With(c.opts.Logger, "target", fmt.Sprintf("docker/%s", containerID)), - c.manager.opts.handler, - c.manager.opts.positions, + c.handler, + c.posFile, string(containerID), - markedTarget.labels.Merge(c.defaultLabels), + markedTarget.labels.Merge(defaultLabels), c.rcs, - c.manager.opts.client, + opts, + 5*time.Second, ) + if err != nil { return err } - targets = append(targets, tgt) + + shouldRun[tailer.Key()] = struct{}{} + c.scheduler.ScheduleSource(tailer) } - // This will never fail because it only fails if the context gets canceled. - _ = c.manager.syncTargets(context.Background(), targets) + // Avoid mutating the scheduler state during iteration. Collect sources to + // remove and stop them in a separate loop. + var toDelete []source.Source[string] + for source := range c.scheduler.Sources() { + if _, ok := shouldRun[source.Key()]; ok { + continue + } + toDelete = append(toDelete, source) + } + + for _, s := range toDelete { + c.scheduler.StopSource(s) // stops without blocking + } c.args = newArgs return nil } -// getTailerOptions gets tailer options from arguments. If args hasn't changed -// from the last call to getTailerOptions, c.lastOptions is returned. -// c.lastOptions must be updated by the caller. -// -// getTailerOptions must only be called when c.mut is held. -func (c *Component) getManagerOptions(args Arguments) (*options, error) { - if reflect.DeepEqual(c.args.Host, args.Host) && c.lastOptions != nil { - return c.lastOptions, nil +// getClient created client from args. If args hasn't changed +// from the last call to getClient, c.client is returned. +// getClient must only be called when c.mut is held. +func (c *Component) getClient(args Arguments) (client.APIClient, error) { + if reflect.DeepEqual(c.args.Host, args.Host) && c.client != nil { + return c.client, nil } hostURL, err := url.Parse(args.Host) if err != nil { - return c.lastOptions, err + return c.client, err } opts := []client.Opt{ @@ -298,7 +303,7 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { if hostURL.Scheme == "http" || hostURL.Scheme == "https" { rt, err := config.NewRoundTripperFromConfig(*args.HTTPClientConfig.Convert(), "docker_sd") if err != nil { - return c.lastOptions, err + return c.client, err } opts = append(opts, client.WithHTTPClient(&http.Client{ @@ -315,41 +320,33 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { client, err := client.NewClientWithOpts(opts...) if err != nil { level.Error(c.opts.Logger).Log("msg", "could not create new Docker client", "err", err) - return c.lastOptions, fmt.Errorf("failed to build docker client: %w", err) + return c.client, fmt.Errorf("failed to build docker client: %w", err) } - return &options{ - client: client, - handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), - positions: c.posFile, - targetRestartInterval: 5 * time.Second, - }, nil + return client, nil } // DebugInfo returns information about the status of tailed targets. func (c *Component) DebugInfo() interface{} { + c.mut.RLock() + defer c.mut.RUnlock() + var res readerDebugInfo - for _, tgt := range c.manager.targets() { - details := tgt.Details() - res.TargetsInfo = append(res.TargetsInfo, targetInfo{ - Labels: tgt.LabelsStr(), - ID: details["id"], - LastError: details["error"], - IsRunning: details["running"], - ReadOffset: details["position"], - }) + for s := range c.scheduler.Sources() { + ds := s.(source.DebugSource[string]) + res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) } return res } type readerDebugInfo struct { - TargetsInfo []targetInfo `alloy:"targets_info,block"` + TargetsInfo []any `alloy:"targets_info,block"` } -type targetInfo struct { +type sourceInfo struct { ID string `alloy:"id,attr"` LastError string `alloy:"last_error,attr"` Labels string `alloy:"labels,attr"` - IsRunning string `alloy:"is_running,attr"` + IsRunning bool `alloy:"is_running,attr"` ReadOffset string `alloy:"read_offset,attr"` } diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index e120c4bed5..8d8dcee388 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -1,35 +1,19 @@ package docker import ( - "context" - "io" - "os" - "strings" "testing" "time" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/relabel" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/common/loki" - dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" - "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/runtime/componenttest" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" ) -const targetRestartInterval = 20 * time.Millisecond - -func Test(t *testing.T) { +func TestComponent(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` host = "tcp://127.0.0.1:9375" @@ -52,7 +36,7 @@ func Test(t *testing.T) { require.NoError(t, ctrl.WaitRunning(time.Minute)) } -func TestDuplicateTargets(t *testing.T) { +func TestComponentDuplicateTargets(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` host = "tcp://127.0.0.1:9376" @@ -85,8 +69,11 @@ func TestDuplicateTargets(t *testing.T) { }, args) require.NoError(t, err) - require.Len(t, cmp.manager.tasks, 1) - require.Equal(t, cmp.manager.tasks[0].target.LabelsStr(), "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}") + require.Equal(t, 1, cmp.scheduler.Len()) + for s := range cmp.scheduler.Sources() { + ss := s.(*tailer) + require.Equal(t, "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}", ss.labelsStr) + } var newCfg = ` host = "tcp://127.0.0.1:9376" @@ -99,127 +86,10 @@ func TestDuplicateTargets(t *testing.T) { err = syntax.Unmarshal([]byte(newCfg), &args) require.NoError(t, err) cmp.Update(args) - require.Len(t, cmp.manager.tasks, 1) // Although the order of the targets changed, the filtered target stays the same. - require.Equal(t, cmp.manager.tasks[0].target.LabelsStr(), "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}") -} - -func TestRestart(t *testing.T) { - finishedAt := "2024-05-02T13:11:55.879889Z" - var runningState atomic.Bool - runningState.Store(true) - client := clientMock{ - logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", - running: func() bool { return runningState.Load() }, - finishedAt: func() string { return finishedAt }, - } - expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" - - tailer, entryHandler := setupTailer(t, client) - go tailer.Run(t.Context()) - - // The container is already running, expect log lines. - assert.EventuallyWithT(t, func(c *assert.CollectT) { - logLines := entryHandler.Received() - if assert.NotEmpty(c, logLines) { - assert.Equal(c, expectedLogLine, logLines[0].Line) - } - }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") - - // Stops the container. - runningState.Store(false) - time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. - entryHandler.Clear() - time.Sleep(targetRestartInterval + 10*time.Millisecond) - assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. - - // Restart the container and expect log lines. - runningState.Store(true) - assert.EventuallyWithT(t, func(c *assert.CollectT) { - logLines := entryHandler.Received() - if assert.NotEmpty(c, logLines) { - assert.Equal(c, expectedLogLine, logLines[0].Line) - } - }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") -} - -func TestTargetNeverStarted(t *testing.T) { - runningState := false - finishedAt := "2024-05-02T13:11:55.879889Z" - client := clientMock{ - logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", - running: func() bool { return runningState }, - finishedAt: func() string { return finishedAt }, - } - expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" - - tailer, entryHandler := setupTailer(t, client) - - ctx, cancel := context.WithCancel(t.Context()) - go tailer.Run(ctx) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - logLines := entryHandler.Received() - if assert.NotEmpty(c, logLines) { - assert.Equal(c, expectedLogLine, logLines[0].Line) - } - }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") - - require.NotPanics(t, func() { cancel() }) -} - -func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *loki.CollectingHandler) { - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) - entryHandler = loki.NewCollectingHandler() - - ps, err := positions.New(logger, positions.Config{ - SyncPeriod: 10 * time.Second, - PositionsFile: t.TempDir() + "/positions.yml", - }) - require.NoError(t, err) - - tgt, err := dt.NewTarget( - dt.NewMetrics(prometheus.NewRegistry()), - logger, - entryHandler, - ps, - "flog", - model.LabelSet{"job": "docker"}, - []*relabel.Config{}, - client, - ) - require.NoError(t, err) - tailerTask := &tailerTask{ - options: &options{ - client: client, - targetRestartInterval: targetRestartInterval, - }, - target: tgt, + require.Equal(t, 1, cmp.scheduler.Len()) + for s := range cmp.scheduler.Sources() { + ss := s.(*tailer) + require.Equal(t, "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}", ss.labelsStr) } - return newTailer(logger, tailerTask), entryHandler -} - -type clientMock struct { - client.APIClient - logLine string - running func() bool - finishedAt func() string -} - -func (mock clientMock) ContainerInspect(ctx context.Context, c string) (container.InspectResponse, error) { - return container.InspectResponse{ - ContainerJSONBase: &container.ContainerJSONBase{ - ID: c, - State: &container.State{ - Running: mock.running(), - FinishedAt: mock.finishedAt(), - }, - }, - Config: &container.Config{Tty: true}, - }, nil -} - -func (mock clientMock) ContainerLogs(ctx context.Context, container string, options container.LogsOptions) (io.ReadCloser, error) { - return io.NopCloser(strings.NewReader(mock.logLine)), nil } diff --git a/internal/component/loki/source/docker/internal/dockertarget/metrics.go b/internal/component/loki/source/docker/metrics.go similarity index 64% rename from internal/component/loki/source/docker/internal/dockertarget/metrics.go rename to internal/component/loki/source/docker/metrics.go index 917a83aab0..ae5f4a1e7f 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/metrics.go +++ b/internal/component/loki/source/docker/metrics.go @@ -1,26 +1,23 @@ -package dockertarget - -// NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). -// The dockertarget package is used to configure and run the targets that can -// read logs from Docker containers and forward them to other loki components. +package docker import ( - "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/alloy/internal/util" ) -// Metrics holds a set of Docker target metrics. -type Metrics struct { +// metrics holds a set of Docker target metrics. +type metrics struct { reg prometheus.Registerer dockerEntries prometheus.Counter dockerErrors prometheus.Counter } -// NewMetrics creates a new set of Docker target metrics. If reg is non-nil, the +// newMetrics creates a new set of Docker target metrics. If reg is non-nil, the // metrics will be registered. -func NewMetrics(reg prometheus.Registerer) *Metrics { - var m Metrics +func newMetrics(reg prometheus.Registerer) *metrics { + var m metrics m.reg = reg m.dockerEntries = prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go deleted file mode 100644 index a09f4e39c5..0000000000 --- a/internal/component/loki/source/docker/runner.go +++ /dev/null @@ -1,247 +0,0 @@ -package docker - -// NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). - -import ( - "context" - "sync" - "time" - - "github.com/docker/docker/client" - "github.com/go-kit/log" - - "github.com/grafana/alloy/internal/component/common/loki" - dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" - "github.com/grafana/alloy/internal/component/loki/source/internal/positions" - "github.com/grafana/alloy/internal/runner" - "github.com/grafana/alloy/internal/runtime/logging/level" -) - -// A manager manages a set of running tailers. -type manager struct { - log log.Logger - - mut sync.Mutex - opts *options - tasks []*tailerTask - - runner *runner.Runner[*tailerTask] -} - -// newManager returns a new Manager which manages a set of running tailers. -// Options must not be modified after passing it to a Manager. -// -// If newManager is called with a nil set of options, no targets will be -// scheduled for running until UpdateOptions is called. -func newManager(l log.Logger, opts *options) *manager { - return &manager{ - log: l, - opts: opts, - runner: runner.New(func(t *tailerTask) runner.Worker { - return newTailer(l, t) - }), - } -} - -// options passed to all tailers. -type options struct { - // client to use to request logs from Docker. - client client.APIClient - - // handler to send discovered logs to. - handler loki.EntryHandler - - // positions interface so tailers can save/restore offsets in log files. - positions positions.Positions - - // targetRestartInterval to restart task that has stopped running. - targetRestartInterval time.Duration -} - -// tailerTask is the payload used to create tailers. It implements runner.Task. -type tailerTask struct { - options *options - target *dt.Target -} - -var _ runner.Task = (*tailerTask)(nil) - -func (tt *tailerTask) Hash() uint64 { return tt.target.Hash() } - -func (tt *tailerTask) Equals(other runner.Task) bool { - otherTask := other.(*tailerTask) - - // Quick path: pointers are exactly the same. - if tt == otherTask { - return true - } - - // Slow path: check individual fields which are part of the task. - return tt.options == otherTask.options && - tt.target.LabelsStr() == otherTask.target.LabelsStr() -} - -// A tailer tails the logs of a docker container. It is created by a [Manager]. -type tailer struct { - log log.Logger - opts *options - target *dt.Target -} - -// newTailer returns a new tailer which tails logs from the target specified by -// the task. -func newTailer(l log.Logger, task *tailerTask) *tailer { - return &tailer{ - log: log.WithPrefix(l, "target", task.target.Name()), - opts: task.options, - target: task.target, - } -} - -func (t *tailer) Run(ctx context.Context) { - ticker := time.NewTicker(t.opts.targetRestartInterval) - tickerC := ticker.C - - for { - select { - case <-tickerC: - res, err := t.opts.client.ContainerInspect(ctx, t.target.Name()) - if err != nil { - level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err) - continue - } - - finished, err := time.Parse(time.RFC3339Nano, res.State.FinishedAt) - if err != nil { - level.Error(t.log).Log("msg", "error parsing finished time for Docker container", "id", t.target.Name(), "error", err) - finished = time.Unix(0, 0) - } - - if res.State.Running || finished.Unix() >= t.target.Last() { - t.target.StartIfNotRunning() - } - case <-ctx.Done(): - t.target.Stop() - ticker.Stop() - return - } - } -} - -// syncTargets synchronizes the set of running tailers to the set specified by -// targets. -func (m *manager) syncTargets(ctx context.Context, targets []*dt.Target) error { - m.mut.Lock() - defer m.mut.Unlock() - - // Convert targets into tasks to give to the runner. - tasks := make([]*tailerTask, 0, len(targets)) - for _, target := range targets { - tasks = append(tasks, &tailerTask{ - options: m.opts, - target: target, - }) - } - - // Sync our tasks to the runner. If the Manager doesn't have any options, - // the runner will be cleared of tasks until UpdateOptions is called with a - // non-nil set of options. - switch m.opts { - default: - if err := m.runner.ApplyTasks(ctx, tasks); err != nil { - return err - } - case nil: - if err := m.runner.ApplyTasks(ctx, nil); err != nil { - return err - } - } - - // Delete positions for targets which have gone away. - newEntries := make(map[positions.Entry]struct{}, len(targets)) - for _, target := range targets { - newEntries[entryForTarget(target)] = struct{}{} - } - - for _, task := range m.tasks { - ent := entryForTarget(task.target) - - // The task from the last call to SyncTargets is no longer in newEntries; - // remove it from the positions file. We do this _after_ calling ApplyTasks - // to ensure that the old tailers have shut down, otherwise the tailer - // might write its position again during shutdown after we removed it. - if _, found := newEntries[ent]; !found { - level.Info(m.log).Log("msg", "removing entry from positions file", "path", ent.Path, "labels", ent.Labels) - m.opts.positions.Remove(ent.Path, ent.Labels) - } - } - - m.tasks = tasks - return nil -} - -func entryForTarget(t *dt.Target) positions.Entry { - // The positions entry is keyed by container_id; the path is fed into - // positions.CursorKey to treat it as a "cursor"; otherwise - // positions.Positions will try to read the path as a file and delete the - // entry when it can't find it. - return positions.Entry{ - Path: positions.CursorKey(t.Name()), - Labels: t.LabelsStr(), - } -} - -// updateOptions updates the Options shared with all Tailers. All Tailers will -// be updated with the new set of Options. Options should not be modified after -// passing to updateOptions. -// -// If newOptions is nil, all tasks will be cleared until updateOptions is -// called again with a non-nil set of options. -func (m *manager) updateOptions(ctx context.Context, newOptions *options) error { - m.mut.Lock() - defer m.mut.Unlock() - - // Iterate through the previous set of tasks and create a new task with the - // new set of options. - tasks := make([]*tailerTask, 0, len(m.tasks)) - for _, oldTask := range m.tasks { - tasks = append(tasks, &tailerTask{ - options: newOptions, - target: oldTask.target, - }) - } - - switch newOptions { - case nil: - if err := m.runner.ApplyTasks(ctx, nil); err != nil { - return err - } - default: - if err := m.runner.ApplyTasks(ctx, tasks); err != nil { - return err - } - } - - m.opts = newOptions - m.tasks = tasks - return nil -} - -// targets returns the set of targets which are actively being tailed. targets -// for tailers which have terminated are not included. The returned set of -// targets are deduplicated. -func (m *manager) targets() []*dt.Target { - tasks := m.runner.Tasks() - - targets := make([]*dt.Target, 0, len(tasks)) - for _, task := range tasks { - targets = append(targets, task.target) - } - return targets -} - -// stop stops the manager and all running Tailers. It blocks until all Tailers -// have exited. -func (m *manager) stop() { - m.runner.Stop() -} diff --git a/internal/component/loki/source/docker/internal/dockertarget/target.go b/internal/component/loki/source/docker/tailer.go similarity index 55% rename from internal/component/loki/source/docker/internal/dockertarget/target.go rename to internal/component/loki/source/docker/tailer.go index 97c00ac2c5..4e856f0b1f 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target.go +++ b/internal/component/loki/source/docker/tailer.go @@ -1,4 +1,4 @@ -package dockertarget +package docker // NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). // The dockertarget package is used to configure and run the targets that can @@ -31,24 +31,17 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" ) -const ( - // See github.com/prometheus/prometheus/discovery/moby - dockerLabel = model.MetaLabelPrefix + "docker_" - dockerLabelContainerPrefix = dockerLabel + "container_" - dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream" - dockerMaxChunkSize = 16384 -) - -// Target enables reading Docker container logs. -type Target struct { - logger log.Logger - handler loki.EntryHandler - positions positions.Positions - containerName string - labels model.LabelSet - labelsStr string - relabelConfig []*relabel.Config - metrics *Metrics +// tailer for Docker container logs. +type tailer struct { + logger log.Logger + recv loki.LogsReceiver + positions positions.Positions + containerID string + labels model.LabelSet + labelsStr string + relabelConfig []*relabel.Config + metrics *metrics + restartInverval time.Duration client client.APIClient @@ -63,39 +56,141 @@ type Target struct { since *atomic.Int64 } -// NewTarget starts a new target to read logs from a given container ID. -func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, containerID string, labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient) (*Target, error) { +// newTailer starts a new tailer to read logs from a given container ID. +func newTailer( + metrics *metrics, logger log.Logger, recv loki.LogsReceiver, position positions.Positions, containerID string, + labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient, restartInterval time.Duration, +) (*tailer, error) { + labelsStr := labels.String() pos, err := position.Get(positions.CursorKey(containerID), labelsStr) if err != nil { return nil, err } - var since int64 - if pos != 0 { - since = pos + + return &tailer{ + logger: logger, + recv: recv, + since: atomic.NewInt64(pos), + last: atomic.NewInt64(0), + positions: position, + containerID: containerID, + labels: labels, + labelsStr: labelsStr, + relabelConfig: relabelConfig, + metrics: metrics, + client: client, + restartInverval: restartInterval, + }, nil +} + +func (s *tailer) Run(ctx context.Context) { + ticker := time.NewTicker(s.restartInverval) + defer ticker.Stop() + + // start on initial call to Run. + s.startIfNotRunning() + + for { + select { + case <-ticker.C: + res, err := s.client.ContainerInspect(ctx, s.containerID) + if err != nil { + level.Error(s.logger).Log("msg", "error inspecting Docker container", "id", s.containerID, "error", err) + continue + } + + finished, err := time.Parse(time.RFC3339Nano, res.State.FinishedAt) + if err != nil { + level.Error(s.logger).Log("msg", "error parsing finished time for Docker container", "id", s.containerID, "error", err) + finished = time.Unix(0, 0) + } + + if res.State.Running || finished.Unix() >= s.last.Load() { + s.startIfNotRunning() + } + case <-ctx.Done(): + s.stop() + return + } + } +} + +// startIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice. +func (s *tailer) startIfNotRunning() { + s.mu.Lock() + defer s.mu.Unlock() + if !s.running { + level.Debug(s.logger).Log("msg", "starting process loop", "container", s.containerID) + + ctx := context.Background() + info, err := s.client.ContainerInspect(ctx, s.containerID) + if err != nil { + level.Error(s.logger).Log("msg", "could not inspect container info", "container", s.containerID, "err", err) + s.err = err + return + } + + reader, err := s.client.ContainerLogs(ctx, s.containerID, container.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: true, + Since: strconv.FormatInt(s.since.Load(), 10), + }) + if err != nil { + level.Error(s.logger).Log("msg", "could not fetch logs for container", "container", s.containerID, "err", err) + s.err = err + return + } + + ctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.running = true + // proccessLoop will start 3 goroutines that we need to wait for if Stop is called. + s.wg.Add(3) + go s.processLoop(ctx, info.Config.Tty, reader) + } +} + +// stop shuts down the target. +func (s *tailer) stop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.running { + s.running = false + if s.cancel != nil { + s.cancel() + } + s.wg.Wait() + level.Debug(s.logger).Log("msg", "stopped Docker target", "container", s.containerID) } +} + +func (s *tailer) Key() string { + return s.containerID +} - t := &Target{ - logger: logger, - handler: handler, - since: atomic.NewInt64(since), - last: atomic.NewInt64(0), - positions: position, - containerName: containerID, - labels: labels, - labelsStr: labelsStr, - relabelConfig: relabelConfig, - metrics: metrics, - client: client, +func (s *tailer) DebugInfo() any { + s.mu.Lock() + defer s.mu.Unlock() + running := s.running + + var errMsg string + if s.err != nil { + errMsg = s.err.Error() } - // NOTE (@tpaschalis) The original Promtail implementation would call - // t.StartIfNotRunning() right here to start tailing. - // We manage targets from a task's Run method. - return t, nil + return sourceInfo{ + ID: s.containerID, + LastError: errMsg, + Labels: s.labelsStr, + IsRunning: running, + ReadOffset: s.positions.GetString(positions.CursorKey(s.containerID), s.labelsStr), + } } -func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser) { +func (s *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser) { defer reader.Close() // Start transferring @@ -103,10 +198,10 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser rstderr, wstderr := io.Pipe() go func() { defer func() { - t.wg.Done() + s.wg.Done() wstdout.Close() wstderr.Close() - t.Stop() + s.stop() }() var written int64 var err error @@ -114,26 +209,26 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser written, err = io.Copy(wstdout, reader) } else { // For non-TTY, wrap the pipe writers with our chunk writer to reassemble frames. - wcstdout := newChunkWriter(wstdout, t.logger) + wcstdout := newChunkWriter(wstdout, s.logger) defer wcstdout.Close() - wcstderr := newChunkWriter(wstderr, t.logger) + wcstderr := newChunkWriter(wstderr, s.logger) defer wcstderr.Close() written, err = stdcopy.StdCopy(wcstdout, wcstderr, reader) } if err != nil { - level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err) + level.Warn(s.logger).Log("msg", "could not transfer logs", "written", written, "container", s.containerID, "err", err) } else { - level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName) + level.Info(s.logger).Log("msg", "finished transferring logs", "written", written, "container", s.containerID) } }() // Start processing - go t.process(rstdout, t.getStreamLabels("stdout")) - go t.process(rstderr, t.getStreamLabels("stderr")) + go s.process(rstdout, s.getStreamLabels("stdout")) + go s.process(rstderr, s.getStreamLabels("stderr")) // Wait until done <-ctx.Done() - level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName) + level.Debug(s.logger).Log("msg", "done processing Docker logs", "container", s.containerID) } // extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice. @@ -157,8 +252,24 @@ func extractTsFromBytes(line []byte) (time.Time, []byte, error) { return ts, line[spaceIdx+1:], nil } -func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { - defer t.wg.Done() +// https://devmarkpro.com/working-big-files-golang +func readLine(r *bufio.Reader) (string, error) { + var ( + isPrefix = true + err error + line, ln []byte + ) + + for isPrefix && err == nil { + line, isPrefix, err = r.ReadLine() + ln = append(ln, line...) + } + + return string(ln), err +} + +func (s *tailer) process(r io.Reader, logStreamLset model.LabelSet) { + defer s.wg.Done() scanner := bufio.NewScanner(r) const maxCapacity = dockerMaxChunkSize * 64 @@ -169,19 +280,19 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { ts, content, err := extractTsFromBytes(line) if err != nil { - level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err) - t.metrics.dockerErrors.Inc() + level.Error(s.logger).Log("msg", "could not extract timestamp, skipping line", "err", err) + s.metrics.dockerErrors.Inc() continue } - t.handler.Chan() <- loki.Entry{ + s.recv.Chan() <- loki.Entry{ Labels: logStreamLset, Entry: push.Entry{ Timestamp: ts, Line: string(content), }, } - t.metrics.dockerEntries.Inc() + s.metrics.dockerEntries.Inc() // NOTE(@tpaschalis) We don't save the positions entry with the // filtered labels, but with the default label set, as this is the one @@ -189,124 +300,24 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { // problematic if we have the same container with a different set of // labels (e.g. duplicated and relabeled), but this shouldn't be the // case anyway. - t.positions.Put(positions.CursorKey(t.containerName), t.labelsStr, ts.Unix()) - t.since.Store(ts.Unix()) - t.last.Store(time.Now().Unix()) + s.positions.Put(positions.CursorKey(s.containerID), s.labelsStr, ts.Unix()) + s.since.Store(ts.Unix()) + s.last.Store(time.Now().Unix()) } if err := scanner.Err(); err != nil { - level.Error(t.logger).Log("msg", "error reading docker log line", "err", err) - t.metrics.dockerErrors.Inc() - } -} - -// StartIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice. -func (t *Target) StartIfNotRunning() { - t.mu.Lock() - defer t.mu.Unlock() - if !t.running { - level.Debug(t.logger).Log("msg", "starting process loop", "container", t.containerName) - - ctx := context.Background() - info, err := t.client.ContainerInspect(ctx, t.containerName) - if err != nil { - level.Error(t.logger).Log("msg", "could not inspect container info", "container", t.containerName, "err", err) - t.err = err - return - } - - reader, err := t.client.ContainerLogs(ctx, t.containerName, container.LogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Timestamps: true, - Since: strconv.FormatInt(t.since.Load(), 10), - }) - if err != nil { - level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err) - t.err = err - return - } - - ctx, cancel := context.WithCancel(ctx) - t.cancel = cancel - t.running = true - // proccessLoop will start 3 goroutines that we need to wait for if Stop is called. - t.wg.Add(3) - go t.processLoop(ctx, info.Config.Tty, reader) - } -} - -// Stop shuts down the target. -func (t *Target) Stop() { - t.mu.Lock() - defer t.mu.Unlock() - if t.running { - t.running = false - if t.cancel != nil { - t.cancel() - } - t.wg.Wait() - level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName) - } -} - -// Ready reports whether the target is running. -func (t *Target) Ready() bool { - t.mu.Lock() - defer t.mu.Unlock() - return t.running -} - -// LabelsStr returns the target's original labels string representation. -func (t *Target) LabelsStr() string { - return t.labelsStr -} - -// Name reports the container name. -func (t *Target) Name() string { - return t.containerName -} - -// Hash is used when comparing targets in tasks. -func (t *Target) Hash() uint64 { - return uint64(t.labels.Fingerprint()) -} - -// Path returns the target's container name. -func (t *Target) Path() string { - return t.containerName -} - -// Last returns the unix timestamp of the target's last processing loop. -func (t *Target) Last() int64 { return t.last.Load() } - -// Details returns target-specific details. -func (t *Target) Details() map[string]string { - t.mu.Lock() - running := t.running - - var errMsg string - if t.err != nil { - errMsg = t.err.Error() - } - t.mu.Unlock() - - return map[string]string{ - "id": t.containerName, - "error": errMsg, - "position": t.positions.GetString(positions.CursorKey(t.containerName), t.labelsStr), - "running": strconv.FormatBool(running), + level.Error(s.logger).Log("msg", "error reading docker log line", "err", err) + s.metrics.dockerErrors.Inc() } } -func (t *Target) getStreamLabels(logStream string) model.LabelSet { +func (s *tailer) getStreamLabels(logStream string) model.LabelSet { // Add all labels from the config, relabel and filter them. lb := labels.NewBuilder(labels.EmptyLabels()) - for k, v := range t.labels { + for k, v := range s.labels { lb.Set(string(k), string(v)) } lb.Set(dockerLabelLogStream, logStream) - processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...) + processed, _ := relabel.Process(lb.Labels(), s.relabelConfig...) filtered := make(model.LabelSet) processed.Range(func(lbl labels.Label) { diff --git a/internal/component/loki/source/docker/internal/dockertarget/target_test.go b/internal/component/loki/source/docker/tailer_test.go similarity index 57% rename from internal/component/loki/source/docker/internal/dockertarget/target_test.go rename to internal/component/loki/source/docker/tailer_test.go index eeacc2b829..11a31f6f79 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target_test.go +++ b/internal/component/loki/source/docker/tailer_test.go @@ -1,4 +1,4 @@ -package dockertarget +package docker // NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). // The dockertarget package is used to configure and run the targets that can @@ -6,7 +6,9 @@ package dockertarget import ( "bytes" + "context" "encoding/json" + "io" "net/http" "net/http/httptest" "os" @@ -23,17 +25,19 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" ) -func TestDockerTarget(t *testing.T) { +const restartInterval = 20 * time.Millisecond + +func TestTailer(t *testing.T) { server := newDockerServer(t) defer server.Close() - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) + logger := log.NewNopLogger() entryHandler := loki.NewCollectingHandler() client, err := client.NewClientWithOpts(client.WithHost(server.URL)) require.NoError(t, err) @@ -44,18 +48,24 @@ func TestDockerTarget(t *testing.T) { }) require.NoError(t, err) - tgt, err := NewTarget( - NewMetrics(prometheus.NewRegistry()), + tailer, err := newTailer( + newMetrics(prometheus.NewRegistry()), logger, - entryHandler, + entryHandler.Receiver(), ps, "flog", model.LabelSet{"job": "docker"}, []*relabel.Config{}, client, + restartInterval, ) require.NoError(t, err) - tgt.StartIfNotRunning() + + ctx, cancel := context.WithCancel(t.Context()) + wg := sync.WaitGroup{} + wg.Go(func() { + tailer.Run(ctx) + }) expectedLines := []string{ "5.3.69.55 - - [09/Dec/2021:09:15:02 +0000] \"HEAD /brand/users/clicks-and-mortar/front-end HTTP/2.0\" 503 27087", @@ -69,14 +79,16 @@ func TestDockerTarget(t *testing.T) { assertExpectedLog(c, entryHandler, expectedLines) }, 5*time.Second, 100*time.Millisecond, "Expected log lines were not found within the time limit.") - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.False(c, tgt.Ready()) - }, 5*time.Second, 20*time.Millisecond, "Expected target to finish processing within the time limit.") - entryHandler.Clear() // restart target to simulate container restart - tgt.Stop() - tgt.StartIfNotRunning() + cancel() + wg.Wait() + + ctx, cancel = context.WithCancel(t.Context()) + defer cancel() + wg.Go(func() { + tailer.Run(ctx) + }) expectedLinesAfterRestart := []string{ "243.115.12.215 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /morph/exploit/granular HTTP/1.0\" 500 26468", "221.41.123.237 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /user-centric/whiteboard HTTP/2.0\" 205 22487", @@ -89,7 +101,7 @@ func TestDockerTarget(t *testing.T) { }, 5*time.Second, 100*time.Millisecond, "Expected log lines after restart were not found within the time limit.") } -func TestStartStopStressTest(t *testing.T) { +func TestTailerStartStopStressTest(t *testing.T) { server := newDockerServer(t) defer server.Close() @@ -105,19 +117,20 @@ func TestStartStopStressTest(t *testing.T) { client, err := client.NewClientWithOpts(client.WithHost(server.URL)) require.NoError(t, err) - tgt, err := NewTarget( - NewMetrics(prometheus.NewRegistry()), + tgt, err := newTailer( + newMetrics(prometheus.NewRegistry()), logger, - entryHandler, + entryHandler.Receiver(), ps, "flog", model.LabelSet{"job": "docker"}, []*relabel.Config{}, client, + restartInterval, ) require.NoError(t, err) - tgt.StartIfNotRunning() + tgt.startIfNotRunning() // Stress test the concurrency of StartIfNotRunning and Stop wg := sync.WaitGroup{} @@ -125,19 +138,83 @@ func TestStartStopStressTest(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - tgt.StartIfNotRunning() + tgt.startIfNotRunning() }() wg.Add(1) go func() { defer wg.Done() - tgt.Stop() + tgt.stop() }() } wg.Wait() } -func TestDockerChunkWriter(t *testing.T) { +func TestTailerRestart(t *testing.T) { + finishedAt := "2024-05-02T13:11:55.879889Z" + runningState := atomic.NewBool(true) + + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState.Load() }, + finishedAt: func() string { return finishedAt }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + go tailer.Run(t.Context()) + + // The container is already running, expect log lines. + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") + + // Stops the container. + runningState.Store(false) + time.Sleep(restartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. + entryHandler.Clear() + time.Sleep(restartInterval + 10*time.Millisecond) + assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. + + // Restart the container and expect log lines. + runningState.Store(true) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") +} + +func TestTailerNeverStarted(t *testing.T) { + runningState := false + finishedAt := "2024-05-02T13:11:55.879889Z" + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState }, + finishedAt: func() string { return finishedAt }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + + ctx, cancel := context.WithCancel(t.Context()) + go tailer.Run(ctx) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") + + require.NotPanics(t, func() { cancel() }) +} + +func TestChunkWriter(t *testing.T) { logger := log.NewNopLogger() var buf bytes.Buffer writer := newChunkWriter(&buf, logger) @@ -194,10 +271,12 @@ func newDockerServer(t *testing.T) *httptest.Server { default: w.Header().Set("Content-Type", "application/json") info := container.InspectResponse{ - ContainerJSONBase: &container.ContainerJSONBase{}, - Mounts: []container.MountPoint{}, - Config: &container.Config{Tty: false}, - NetworkSettings: &container.NetworkSettings{}, + ContainerJSONBase: &container.ContainerJSONBase{ + State: &container.State{}, + }, + Mounts: []container.MountPoint{}, + Config: &container.Config{Tty: false}, + NetworkSettings: &container.NetworkSettings{}, } writeErr = json.NewEncoder(w).Encode(info) } @@ -240,3 +319,53 @@ func containsString(slice []string, str string) bool { } return false } + +func setupTailer(t *testing.T, client clientMock) (*tailer, *loki.CollectingHandler) { + logger := log.NewNopLogger() + entryHandler := loki.NewCollectingHandler() + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + tailer, err := newTailer( + newMetrics(prometheus.NewRegistry()), + logger, + entryHandler.Receiver(), + ps, + "flog", + model.LabelSet{"job": "docker"}, + []*relabel.Config{}, + client, + restartInterval, + ) + require.NoError(t, err) + + return tailer, entryHandler +} + +type clientMock struct { + client.APIClient + logLine string + running func() bool + finishedAt func() string +} + +func (mock clientMock) ContainerInspect(ctx context.Context, c string) (container.InspectResponse, error) { + return container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: c, + State: &container.State{ + Running: mock.running(), + FinishedAt: mock.finishedAt(), + }, + }, + Config: &container.Config{Tty: true}, + }, nil +} + +func (mock clientMock) ContainerLogs(ctx context.Context, container string, options container.LogsOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(mock.logLine)), nil +} diff --git a/internal/component/loki/source/docker/internal/dockertarget/testdata/flog.log b/internal/component/loki/source/docker/testdata/flog.log similarity index 100% rename from internal/component/loki/source/docker/internal/dockertarget/testdata/flog.log rename to internal/component/loki/source/docker/testdata/flog.log diff --git a/internal/component/loki/source/docker/internal/dockertarget/testdata/flog_after_restart.log b/internal/component/loki/source/docker/testdata/flog_after_restart.log similarity index 100% rename from internal/component/loki/source/docker/internal/dockertarget/testdata/flog_after_restart.log rename to internal/component/loki/source/docker/testdata/flog_after_restart.log From 3994303ab594a58107a60ec792b57166efa6a3e0 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:10:35 +0100 Subject: [PATCH 04/32] Extract common function to drain handler --- .../component/loki/source/docker/docker.go | 13 +++++--- internal/component/loki/source/drain.go | 33 +++++++++++++++++++ internal/component/loki/source/file/file.go | 29 ++++++---------- 3 files changed, 51 insertions(+), 24 deletions(-) create mode 100644 internal/component/loki/source/drain.go diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 72850e9637..53f59ccdbf 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -157,11 +157,14 @@ func (c *Component) Run(ctx context.Context) error { defer c.posFile.Stop() defer func() { - c.mut.Lock() - defer c.mut.Unlock() - - // FIXME: We need to drain here - c.scheduler.Stop() + c.posFile.Stop() + + // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). + source.Drain(c.handler, func() { + c.mut.Lock() + defer c.mut.Unlock() + c.scheduler.Stop() + }) }() for { diff --git a/internal/component/loki/source/drain.go b/internal/component/loki/source/drain.go new file mode 100644 index 0000000000..cdeedb54af --- /dev/null +++ b/internal/component/loki/source/drain.go @@ -0,0 +1,33 @@ +package source + +import ( + "context" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +// Drain consumes log entries from recv in a background goroutine while f executes. +// This prevents deadlocks that can occur when stopping components that may still be +// sending entries to the receiver channel. The draining goroutine will continue +// consuming entries until f returns, at which point the context is cancelled and +// the goroutine exits. +// +// This is typically used during component shutdown to drain any remaining entries +// from a receiver channel while performing cleanup operations (e.g., stopping +// sources, closing channels). +func Drain(recv loki.LogsReceiver, f func()) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case <-recv.Chan(): + // Consume and discard entries to prevent channel blocking + } + } + }() + + f() +} diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 555f66edf7..c9ab9b7f98 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -203,27 +203,18 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping sources and positions file") - // We need to stop posFile first so we don't record entries we are draining c.posFile.Stop() - // Start black hole drain routine to prevent deadlock when we call c.t.Stop(). - drainCtx, cancelDrain := context.WithCancel(context.Background()) - defer cancelDrain() - go func() { - for { - select { - case <-drainCtx.Done(): - return - case <-c.handler.Chan(): // Ignore the remaining entries - } - } - }() - c.mut.Lock() - c.stopping.Store(true) - c.watcher.Stop() - c.scheduler.Stop() - close(c.handler.Chan()) - c.mut.Unlock() + + // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). + source.Drain(c.handler, func() { + c.mut.Lock() + c.stopping.Store(true) + c.watcher.Stop() + c.scheduler.Stop() + close(c.handler.Chan()) + c.mut.Unlock() + }) }() var wg sync.WaitGroup From 3d1027557c9cfa93e6854d43cb9902756cfc3660 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:12:53 +0100 Subject: [PATCH 05/32] Add changelog --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 288ff7edaf..2d89ce5d90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,10 @@ Main (unreleased) - Fix default values for relabel rules, this caused issues in e.g. `prometheus.operator.servicemonitors` when using labeldrop. (@kalleep) +- Fix issue in `loki.source.docker` where scheduling containers to tail could take too long. (@kalleep) + +- Fix potential deadlock in `loki.source.docker` when component is shutting down. (@kalleep) + v1.12.0 ----------------- @@ -154,7 +158,7 @@ v1.12.0 ### Enhancements -- Add per-application rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout) +- Add per-applicationTestDockerChunkWriter rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout) - Add support of `tls` in components `loki.source.(awsfirehose|gcplog|heroku|api)` and `prometheus.receive_http` and `pyroscope.receive_http`. (@fgouteroux) From 883616f92e5f801abdb88c709ef37fdac8b6c0f8 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:23:11 +0100 Subject: [PATCH 06/32] update comment --- internal/component/loki/source/drain.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/component/loki/source/drain.go b/internal/component/loki/source/drain.go index cdeedb54af..432a986b80 100644 --- a/internal/component/loki/source/drain.go +++ b/internal/component/loki/source/drain.go @@ -13,8 +13,7 @@ import ( // the goroutine exits. // // This is typically used during component shutdown to drain any remaining entries -// from a receiver channel while performing cleanup operations (e.g., stopping -// sources, closing channels). +// from a receiver channel while performing cleanup operations. func Drain(recv loki.LogsReceiver, f func()) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 1c905eaea8f518d65847455a5f7d435a2af81edf Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:38:11 +0100 Subject: [PATCH 07/32] only stop position once --- internal/component/loki/source/docker/docker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 53f59ccdbf..8f71b2bc49 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -154,8 +154,6 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { - defer c.posFile.Stop() - defer func() { c.posFile.Stop() From 7a54f61dbc713a457f4574301b73999277bb95e2 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 15:56:49 +0100 Subject: [PATCH 08/32] Add shared Fanout and Consume loop --- internal/component/common/loki/fanout.go | 64 +++++++++++++++++++ internal/component/loki/source/consume.go | 26 ++++++++ .../component/loki/source/docker/docker.go | 32 ++-------- internal/component/loki/source/file/file.go | 51 ++------------- 4 files changed, 103 insertions(+), 70 deletions(-) create mode 100644 internal/component/common/loki/fanout.go create mode 100644 internal/component/loki/source/consume.go diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go new file mode 100644 index 0000000000..1eb7dc9326 --- /dev/null +++ b/internal/component/common/loki/fanout.go @@ -0,0 +1,64 @@ +package loki + +import ( + "context" + "reflect" + "sync" +) + +// NewFanout creates a new Fanout that will send log entries to the provided +// list of LogsReceivers. +func NewFanout(children []LogsReceiver) *Fanout { + return &Fanout{ + children: children, + } +} + +// Fanout distributes log entries to multiple LogsReceivers. +// It is thread-safe and allows the list of receivers to be updated dynamically +type Fanout struct { + mut sync.RWMutex + children []LogsReceiver +} + +// Send forwards a log entry to all registered receivers. It returns an error +// if the context is cancelled while sending. +func (f *Fanout) Send(ctx context.Context, entry Entry) error { + f.mut.RLock() + for _, recv := range f.children { + select { + case <-ctx.Done(): + f.mut.RUnlock() + return ctx.Err() + case recv.Chan() <- entry: + } + } + f.mut.RUnlock() + return nil +} + +// UpdateChildren updates the list of receivers that will receive log entries. +func (f *Fanout) UpdateChildren(children []LogsReceiver) { + f.mut.RLock() + if receiversChanged(f.children, children) { + // Upgrade lock to write. + f.mut.RUnlock() + f.mut.Lock() + f.children = children + f.mut.Unlock() + } else { + f.mut.RUnlock() + } +} + +func receiversChanged(prev, next []LogsReceiver) bool { + if len(prev) != len(next) { + return true + } + for i := range prev { + if !reflect.DeepEqual(prev[i], next[i]) { + return true + } + } + return false +} diff --git a/internal/component/loki/source/consume.go b/internal/component/loki/source/consume.go new file mode 100644 index 0000000000..00d75c1946 --- /dev/null +++ b/internal/component/loki/source/consume.go @@ -0,0 +1,26 @@ +package source + +import ( + "context" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +// Consume continuously reads log entries from recv and forwards them to the fanout f. +// It runs until ctx is cancelled or an error occurs while sending to the fanout. +// +// This function is typically used in component Run methods to handle the forwarding +// of log entries from a component's internal handler to downstream receivers. +// The fanout allows entries to be sent to multiple receivers concurrently. +func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) { + for { + select { + case <-ctx.Done(): + return + case entry := <-recv.Chan(): + if err := f.Send(ctx, entry); err != nil { + return + } + } + } +} diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 8f71b2bc49..816b6059ee 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -115,8 +115,7 @@ type Component struct { posFile positions.Positions rcs []*relabel.Config - receiversMut sync.RWMutex - receivers []loki.LogsReceiver + fanout *loki.Fanout } // New creates a new loki.source.file component. @@ -140,7 +139,7 @@ func New(o component.Options, args Arguments) (*Component, error) { metrics: newMetrics(o.Registerer), handler: loki.NewLogsReceiver(), scheduler: source.NewScheduler[string](), - receivers: args.ForwardTo, + fanout: loki.NewFanout(args.ForwardTo), posFile: positionsFile, } @@ -165,23 +164,9 @@ func (c *Component) Run(ctx context.Context) error { }) }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.handler.Chan(): - c.receiversMut.RLock() - receivers := c.receivers - c.receiversMut.RUnlock() - for _, receiver := range receivers { - select { - case <-ctx.Done(): - return nil - case receiver.Chan() <- entry: - } - } - } - } + // Start consume and fanout loop + source.Consume(ctx, c.handler, c.fanout) + return nil } type promTarget struct { @@ -193,14 +178,11 @@ type promTarget struct { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) - // Update the receivers before anything else, just in case something fails. - c.receiversMut.Lock() - c.receivers = newArgs.ForwardTo - c.receiversMut.Unlock() - c.mut.Lock() defer c.mut.Unlock() + c.fanout.UpdateChildren(newArgs.ForwardTo) + opts, err := c.getClient(newArgs) if err != nil { return err diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index c9ab9b7f98..2f2d1cf734 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "reflect" "sync" "time" @@ -154,8 +153,7 @@ type Component struct { handler loki.LogsReceiver posFile positions.Positions - receiversMut sync.RWMutex - receivers []loki.LogsReceiver + fanout *loki.Fanout stopping atomic.Bool } @@ -185,7 +183,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), handler: loki.NewLogsReceiver(), - receivers: args.ForwardTo, + fanout: loki.NewFanout(args.ForwardTo), posFile: positionsFile, scheduler: source.NewScheduler[positions.Entry](), watcher: time.NewTicker(args.FileMatch.SyncPeriod), @@ -218,25 +216,9 @@ func (c *Component) Run(ctx context.Context) error { }() var wg sync.WaitGroup - wg.Go(func() { - for { - select { - case <-ctx.Done(): - return - case entry := <-c.handler.Chan(): - c.receiversMut.RLock() - for _, receiver := range c.receivers { - select { - case <-ctx.Done(): - c.receiversMut.RUnlock() - return - case receiver.Chan() <- entry: - } - } - c.receiversMut.RUnlock() - } - } - }) + + // Start consume and fanout loop + wg.Go(func() { source.Consume(ctx, c.handler, c.fanout) }) wg.Go(func() { for { @@ -268,16 +250,7 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() - c.receiversMut.RLock() - if receiversChanged(c.receivers, newArgs.ForwardTo) { - // Upgrade lock to write. - c.receiversMut.RUnlock() - c.receiversMut.Lock() - c.receivers = newArgs.ForwardTo - c.receiversMut.Unlock() - } else { - c.receiversMut.RUnlock() - } + c.fanout.UpdateChildren(newArgs.ForwardTo) // Choose resolver on FileMatch. if newArgs.FileMatch.Enabled { @@ -448,15 +421,3 @@ func (c *Component) newSource(opts sourceOptions) (source.Source[positions.Entry func (c *Component) IsStopping() bool { return c.stopping.Load() } - -func receiversChanged(prev, next []loki.LogsReceiver) bool { - if len(prev) != len(next) { - return true - } - for i := range prev { - if !reflect.DeepEqual(prev[i], next[i]) { - return true - } - } - return false -} From 65ddf17272195f8cc486dd617358627cefaf8d8b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 16:01:02 +0100 Subject: [PATCH 09/32] simplify unlock --- internal/component/common/loki/fanout.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go index 1eb7dc9326..95ec40169d 100644 --- a/internal/component/common/loki/fanout.go +++ b/internal/component/common/loki/fanout.go @@ -25,15 +25,14 @@ type Fanout struct { // if the context is cancelled while sending. func (f *Fanout) Send(ctx context.Context, entry Entry) error { f.mut.RLock() + defer f.mut.Unlock() for _, recv := range f.children { select { case <-ctx.Done(): - f.mut.RUnlock() return ctx.Err() case recv.Chan() <- entry: } } - f.mut.RUnlock() return nil } From 2612990354b491f66100519bfe087e4a2576fa51 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 16:12:57 +0100 Subject: [PATCH 10/32] Add note --- internal/component/loki/source/consume.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/component/loki/source/consume.go b/internal/component/loki/source/consume.go index 00d75c1946..0d3a69951e 100644 --- a/internal/component/loki/source/consume.go +++ b/internal/component/loki/source/consume.go @@ -18,6 +18,7 @@ func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) { case <-ctx.Done(): return case entry := <-recv.Chan(): + // NOTE: the only error we can get is context.Canceled. if err := f.Send(ctx, entry); err != nil { return } From b9c3360908bbee7a6f25dd29bcc3c1bb115ae818 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 8 Dec 2025 17:04:36 +0100 Subject: [PATCH 11/32] fix --- internal/component/common/loki/fanout.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go index 95ec40169d..2443988fec 100644 --- a/internal/component/common/loki/fanout.go +++ b/internal/component/common/loki/fanout.go @@ -25,7 +25,7 @@ type Fanout struct { // if the context is cancelled while sending. func (f *Fanout) Send(ctx context.Context, entry Entry) error { f.mut.RLock() - defer f.mut.Unlock() + defer f.mut.RUnlock() for _, recv := range f.children { select { case <-ctx.Done(): From a026e61d953619ac1c1e44e9e33bc039604b5eac Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 13:37:21 +0100 Subject: [PATCH 12/32] Update internal/component/loki/source/docker/docker.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/component/loki/source/docker/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 816b6059ee..b940bee8f8 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -262,7 +262,7 @@ func (c *Component) Update(args component.Arguments) error { return nil } -// getClient created client from args. If args hasn't changed +// getClient creates a client from args. If args hasn't changed // from the last call to getClient, c.client is returned. // getClient must only be called when c.mut is held. func (c *Component) getClient(args Arguments) (client.APIClient, error) { From fb0e5de121eb38edb9d0931a5a32c6ed5864626b Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 13:38:30 +0100 Subject: [PATCH 13/32] Update internal/component/loki/source/docker/tailer.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/component/loki/source/docker/tailer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 4e856f0b1f..b664ebf34d 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -147,7 +147,7 @@ func (s *tailer) startIfNotRunning() { ctx, cancel := context.WithCancel(ctx) s.cancel = cancel s.running = true - // proccessLoop will start 3 goroutines that we need to wait for if Stop is called. + // processLoop will start 3 goroutines that we need to wait for if Stop is called. s.wg.Add(3) go s.processLoop(ctx, info.Config.Tty, reader) } From 173d0cb925fbc6f1ffe1da73a451d61cf1b0e085 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 13:45:17 +0100 Subject: [PATCH 14/32] Fix DebugSource --- internal/component/loki/source/docker/docker.go | 6 ++++-- internal/component/loki/source/file/file.go | 6 ++++-- internal/component/loki/source/scheduler.go | 11 +++++++++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index b940bee8f8..4b8c214178 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -316,8 +316,10 @@ func (c *Component) DebugInfo() interface{} { var res readerDebugInfo for s := range c.scheduler.Sources() { - ds := s.(source.DebugSource[string]) - res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) + ds, ok := s.(source.DebugSource) + if ok { + res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) + } } return res } diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 2f2d1cf734..8270242326 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -368,8 +368,10 @@ func (c *Component) DebugInfo() any { defer c.mut.RUnlock() var res debugInfo for s := range c.scheduler.Sources() { - ds := s.(source.DebugSource[positions.Entry]) - res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) + ds, ok := s.(source.DebugSource) + if ok { + res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) + } } return res } diff --git a/internal/component/loki/source/scheduler.go b/internal/component/loki/source/scheduler.go index 77c8285bf7..4a12696fa2 100644 --- a/internal/component/loki/source/scheduler.go +++ b/internal/component/loki/source/scheduler.go @@ -101,8 +101,7 @@ type Source[K comparable] interface { } // DebugSource is an optional interface with debug information. -type DebugSource[k comparable] interface { - Source[k] +type DebugSource interface { DebugInfo() any } @@ -130,6 +129,14 @@ func (s *SourceWithRetry[K]) Key() K { return s.source.Key() } +func (s *SourceWithRetry[K]) DebugInfo() any { + ss, ok := s.source.(DebugSource) + if !ok { + return nil + } + return ss.DebugInfo() +} + // scheduledSource is a source that is already scheduled. // to stop the scheduledSource cancel needs to be called. type scheduledSource[K comparable] struct { From 1d7da5e159d4fc15646d768141e24a1157760937 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 13:48:58 +0100 Subject: [PATCH 15/32] fix logic around docker container scheduling --- internal/component/loki/source/docker/docker.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 4b8c214178..01a6678084 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -220,7 +220,13 @@ func (c *Component) Update(args component.Arguments) error { continue } - if c.scheduler.Contains(string(containerID)) { + key := string(containerID) + if _, ok := shouldRun[key]; ok { + continue + } + shouldRun[key] = struct{}{} + + if c.scheduler.Contains(key) { continue } @@ -237,10 +243,10 @@ func (c *Component) Update(args component.Arguments) error { ) if err != nil { - return err + level.Error(c.opts.Logger).Log("msg", "failed to tail docker container", "containerID", containerID, "error", err) + continue } - shouldRun[tailer.Key()] = struct{}{} c.scheduler.ScheduleSource(tailer) } From 178fbebbb36eac9d4504326d72e42825dea38bdf Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:00:13 +0100 Subject: [PATCH 16/32] Clenup position when target is removed --- .../component/loki/source/docker/docker.go | 12 +++++-- .../component/loki/source/docker/tailer.go | 31 ++++++++++++------- .../loki/source/docker/tailer_test.go | 3 ++ 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 01a6678084..8988921c9b 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component" types "github.com/grafana/alloy/internal/component/common/config" @@ -106,6 +107,7 @@ var ( type Component struct { opts component.Options metrics *metrics + exited *atomic.Bool mut sync.RWMutex args Arguments @@ -137,6 +139,7 @@ func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, metrics: newMetrics(o.Registerer), + exited: atomic.NewBool(false), handler: loki.NewLogsReceiver(), scheduler: source.NewScheduler[string](), fanout: loki.NewFanout(args.ForwardTo), @@ -154,6 +157,7 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { defer func() { + c.exited.Store(true) c.posFile.Stop() // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). @@ -183,12 +187,13 @@ func (c *Component) Update(args component.Arguments) error { c.fanout.UpdateChildren(newArgs.ForwardTo) - opts, err := c.getClient(newArgs) + client, err := c.getClient(newArgs) if err != nil { return err } - if opts != c.client { + if client != c.client { + c.client = client // Stop all tailers so all will be restarted c.scheduler.Stop() } @@ -238,8 +243,9 @@ func (c *Component) Update(args component.Arguments) error { string(containerID), markedTarget.labels.Merge(defaultLabels), c.rcs, - opts, + client, 5*time.Second, + func() bool { return c.exited.Load() }, ) if err != nil { diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index b664ebf34d..dc45435e64 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -33,15 +33,16 @@ import ( // tailer for Docker container logs. type tailer struct { - logger log.Logger - recv loki.LogsReceiver - positions positions.Positions - containerID string - labels model.LabelSet - labelsStr string - relabelConfig []*relabel.Config - metrics *metrics - restartInverval time.Duration + logger log.Logger + recv loki.LogsReceiver + positions positions.Positions + containerID string + labels model.LabelSet + labelsStr string + relabelConfig []*relabel.Config + metrics *metrics + restartInterval time.Duration + componentStopping func() bool client client.APIClient @@ -60,6 +61,7 @@ type tailer struct { func newTailer( metrics *metrics, logger log.Logger, recv loki.LogsReceiver, position positions.Positions, containerID string, labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient, restartInterval time.Duration, + componentStopping func() bool, ) (*tailer, error) { labelsStr := labels.String() @@ -80,12 +82,12 @@ func newTailer( relabelConfig: relabelConfig, metrics: metrics, client: client, - restartInverval: restartInterval, + restartInterval: restartInterval, }, nil } func (s *tailer) Run(ctx context.Context) { - ticker := time.NewTicker(s.restartInverval) + ticker := time.NewTicker(s.restartInterval) defer ticker.Stop() // start on initial call to Run. @@ -164,6 +166,13 @@ func (s *tailer) stop() { } s.wg.Wait() level.Debug(s.logger).Log("msg", "stopped Docker target", "container", s.containerID) + + // If the component is not stopping, then it means that the target for this component is gone and that + // we should clear the entry from the positions file. + if !s.componentStopping() { + s.positions.Remove(positions.CursorKey(s.containerID), s.labelsStr) + } + } } diff --git a/internal/component/loki/source/docker/tailer_test.go b/internal/component/loki/source/docker/tailer_test.go index 11a31f6f79..86aff4d4d7 100644 --- a/internal/component/loki/source/docker/tailer_test.go +++ b/internal/component/loki/source/docker/tailer_test.go @@ -58,6 +58,7 @@ func TestTailer(t *testing.T) { []*relabel.Config{}, client, restartInterval, + func() bool { return false }, ) require.NoError(t, err) @@ -127,6 +128,7 @@ func TestTailerStartStopStressTest(t *testing.T) { []*relabel.Config{}, client, restartInterval, + func() bool { return false }, ) require.NoError(t, err) @@ -340,6 +342,7 @@ func setupTailer(t *testing.T, client clientMock) (*tailer, *loki.CollectingHand []*relabel.Config{}, client, restartInterval, + func() bool { return false }, ) require.NoError(t, err) From 5b9fd0513a0e5cb4e5d2e8f18d2a3e7fd36e8011 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:02:17 +0100 Subject: [PATCH 17/32] lint --- .../component/loki/source/docker/tailer.go | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index dc45435e64..763179dbb5 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -71,18 +71,19 @@ func newTailer( } return &tailer{ - logger: logger, - recv: recv, - since: atomic.NewInt64(pos), - last: atomic.NewInt64(0), - positions: position, - containerID: containerID, - labels: labels, - labelsStr: labelsStr, - relabelConfig: relabelConfig, - metrics: metrics, - client: client, - restartInterval: restartInterval, + logger: logger, + recv: recv, + since: atomic.NewInt64(pos), + last: atomic.NewInt64(0), + positions: position, + containerID: containerID, + labels: labels, + labelsStr: labelsStr, + relabelConfig: relabelConfig, + metrics: metrics, + client: client, + restartInterval: restartInterval, + componentStopping: componentStopping, }, nil } @@ -172,7 +173,6 @@ func (s *tailer) stop() { if !s.componentStopping() { s.positions.Remove(positions.CursorKey(s.containerID), s.labelsStr) } - } } From be36bda7244b4c19933a667800488cdb8f28c947 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:20:36 +0100 Subject: [PATCH 18/32] Add tests --- .../component/loki/source/consume_test.go | 57 +++++++++++++++++++ internal/component/loki/source/drain_test.go | 46 +++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 internal/component/loki/source/consume_test.go create mode 100644 internal/component/loki/source/drain_test.go diff --git a/internal/component/loki/source/consume_test.go b/internal/component/loki/source/consume_test.go new file mode 100644 index 0000000000..a5d8eedce0 --- /dev/null +++ b/internal/component/loki/source/consume_test.go @@ -0,0 +1,57 @@ +package source + +import ( + "context" + "sync" + "testing" + + "github.com/grafana/loki/pkg/push" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +func TestConsume(t *testing.T) { + consumer := loki.NewLogsReceiver() + producer := loki.NewLogsReceiver() + fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + + t.Run("should fanout any consumed entries", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Go(func() { + Consume(ctx, producer, fanout) + }) + + producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + e := <-consumer.Chan() + require.Equal(t, "1", e.Entry.Line) + cancel() + wg.Wait() + }) + + t.Run("should stop if context is canceled while trying to fanout", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Go(func() { + Consume(ctx, producer, fanout) + }) + + producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + cancel() + wg.Wait() + }) + + t.Run("should stop if context is canceled while trying to send", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Go(func() { + Consume(ctx, producer, fanout) + }) + + producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + cancel() + wg.Wait() + }) +} diff --git a/internal/component/loki/source/drain_test.go b/internal/component/loki/source/drain_test.go new file mode 100644 index 0000000000..0fba83df80 --- /dev/null +++ b/internal/component/loki/source/drain_test.go @@ -0,0 +1,46 @@ +package source + +import ( + "sync" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/loki/pkg/push" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestDrain(t *testing.T) { + recv := loki.NewLogsReceiver() + + var wg sync.WaitGroup + wg.Go(func() { + for range 10 { + entry := loki.Entry{ + Labels: model.LabelSet{"test": "label"}, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "test log entry", + }, + } + recv.Chan() <- entry + } + }) + + completed := false + Drain(recv, func() { + time.Sleep(100 * time.Millisecond) + completed = true + }) + + wg.Wait() + require.True(t, completed, "Drain should complete without deadlock") + + // Verify channel is empty. + select { + case <-recv.Chan(): + t.Fatal("Channel should be empty after draining") + default: + } +} From b145afc30757e2967e037f537ea963e0118ffb66 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:22:57 +0100 Subject: [PATCH 19/32] remove duplicated test --- internal/component/loki/source/consume_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/internal/component/loki/source/consume_test.go b/internal/component/loki/source/consume_test.go index a5d8eedce0..cc428f0ba7 100644 --- a/internal/component/loki/source/consume_test.go +++ b/internal/component/loki/source/consume_test.go @@ -42,16 +42,4 @@ func TestConsume(t *testing.T) { cancel() wg.Wait() }) - - t.Run("should stop if context is canceled while trying to send", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - wg.Go(func() { - Consume(ctx, producer, fanout) - }) - - producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} - cancel() - wg.Wait() - }) } From 2a280e12890deb1a0e437cfd2fd0b6c23793a4bf Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:35:27 +0100 Subject: [PATCH 20/32] Add reset --- internal/component/loki/source/docker/docker.go | 4 ++-- internal/component/loki/source/scheduler.go | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index 8988921c9b..e3b31346f3 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -194,8 +194,8 @@ func (c *Component) Update(args component.Arguments) error { if client != c.client { c.client = client - // Stop all tailers so all will be restarted - c.scheduler.Stop() + // Stop all tailers because we need to restart them. + c.scheduler.Reset() } defaultLabels := make(model.LabelSet, len(newArgs.Labels)) diff --git a/internal/component/loki/source/scheduler.go b/internal/component/loki/source/scheduler.go index 4a12696fa2..fe81607b3d 100644 --- a/internal/component/loki/source/scheduler.go +++ b/internal/component/loki/source/scheduler.go @@ -86,12 +86,22 @@ func (s *Scheduler[K]) Len() int { } // Stop will stop all running sources and wait for them to finish. +// Scheduler should not be reused after Stop is called. func (s *Scheduler[K]) Stop() { s.cancel() s.running.Wait() s.sources = make(map[K]scheduledSource[K]) } +// Reset will stop all running sources and wait for them to finish and reset +// Scheduler to a usable state. +func (s *Scheduler[K]) Reset() { + s.cancel() + s.running.Wait() + s.sources = make(map[K]scheduledSource[K]) + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + type Source[K comparable] interface { // Run should start the source. // It should run until there is no more work or context is canceled. From 58b7520f521f5728bccb56d23cd87b22334d5688 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 14:36:16 +0100 Subject: [PATCH 21/32] Update internal/component/loki/source/docker/tailer.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/component/loki/source/docker/tailer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 763179dbb5..1ecd471b45 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -119,7 +119,7 @@ func (s *tailer) Run(ctx context.Context) { } } -// startIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice. +// startIfNotRunning starts processing container logs. The operation is idempotent, i.e. the processing cannot be started twice. func (s *tailer) startIfNotRunning() { s.mu.Lock() defer s.mu.Unlock() From 0399358e5f93af8841a9fb95d09d195dc8d9468b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 17:13:57 +0100 Subject: [PATCH 22/32] Check if channel was closed --- internal/component/loki/source/drain.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/component/loki/source/drain.go b/internal/component/loki/source/drain.go index 432a986b80..acca25d41a 100644 --- a/internal/component/loki/source/drain.go +++ b/internal/component/loki/source/drain.go @@ -22,8 +22,11 @@ func Drain(recv loki.LogsReceiver, f func()) { select { case <-ctx.Done(): return - case <-recv.Chan(): + case _, ok := <-recv.Chan(): // Consume and discard entries to prevent channel blocking + if !ok { + return + } } } }() From f853904815a759b5e122be1b7117e21aeab346af Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 9 Dec 2025 17:16:01 +0100 Subject: [PATCH 23/32] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d89ce5d90..4c33189ee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,7 +77,7 @@ Main (unreleased) - Fix default values for relabel rules, this caused issues in e.g. `prometheus.operator.servicemonitors` when using labeldrop. (@kalleep) -- Fix issue in `loki.source.docker` where scheduling containers to tail could take too long. (@kalleep) +- Fix issue in `loki.source.docker` where scheduling containers to tail could take too long, this would increase the chance of Alloy missing shortlived jobs. (@kalleep) - Fix potential deadlock in `loki.source.docker` when component is shutting down. (@kalleep) From fe8603cc47fa960e08c67aa424739a68625b2eaf Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:13:21 +0100 Subject: [PATCH 24/32] remove comment --- internal/component/loki/source/docker/docker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index e3b31346f3..e31d49f742 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -48,7 +48,6 @@ func init() { var userAgent = useragent.Get() const ( - // See github.com/prometheus/prometheus/discovery/moby dockerLabel = model.MetaLabelPrefix + "docker_" dockerLabelContainerPrefix = dockerLabel + "container_" dockerLabelContainerID = dockerLabelContainerPrefix + "id" From a8e02370609764f5e4378588dbdd8d018006db77 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:26:16 +0100 Subject: [PATCH 25/32] fix test --- internal/component/loki/source/drain_test.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/component/loki/source/drain_test.go b/internal/component/loki/source/drain_test.go index 0fba83df80..a29749ce89 100644 --- a/internal/component/loki/source/drain_test.go +++ b/internal/component/loki/source/drain_test.go @@ -36,11 +36,4 @@ func TestDrain(t *testing.T) { wg.Wait() require.True(t, completed, "Drain should complete without deadlock") - - // Verify channel is empty. - select { - case <-recv.Chan(): - t.Fatal("Channel should be empty after draining") - default: - } } From 4f5422bf361cb184f545f87e3b9c0289c4319f6b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:27:30 +0100 Subject: [PATCH 26/32] rename receiver --- .../component/loki/source/docker/tailer.go | 142 +++++++++--------- 1 file changed, 71 insertions(+), 71 deletions(-) diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 1ecd471b45..7c26588073 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -87,119 +87,119 @@ func newTailer( }, nil } -func (s *tailer) Run(ctx context.Context) { - ticker := time.NewTicker(s.restartInterval) +func (t *tailer) Run(ctx context.Context) { + ticker := time.NewTicker(t.restartInterval) defer ticker.Stop() // start on initial call to Run. - s.startIfNotRunning() + t.startIfNotRunning() for { select { case <-ticker.C: - res, err := s.client.ContainerInspect(ctx, s.containerID) + res, err := t.client.ContainerInspect(ctx, t.containerID) if err != nil { - level.Error(s.logger).Log("msg", "error inspecting Docker container", "id", s.containerID, "error", err) + level.Error(t.logger).Log("msg", "error inspecting Docker container", "id", t.containerID, "error", err) continue } finished, err := time.Parse(time.RFC3339Nano, res.State.FinishedAt) if err != nil { - level.Error(s.logger).Log("msg", "error parsing finished time for Docker container", "id", s.containerID, "error", err) + level.Error(t.logger).Log("msg", "error parsing finished time for Docker container", "id", t.containerID, "error", err) finished = time.Unix(0, 0) } - if res.State.Running || finished.Unix() >= s.last.Load() { - s.startIfNotRunning() + if res.State.Running || finished.Unix() >= t.last.Load() { + t.startIfNotRunning() } case <-ctx.Done(): - s.stop() + t.stop() return } } } // startIfNotRunning starts processing container logs. The operation is idempotent, i.e. the processing cannot be started twice. -func (s *tailer) startIfNotRunning() { - s.mu.Lock() - defer s.mu.Unlock() - if !s.running { - level.Debug(s.logger).Log("msg", "starting process loop", "container", s.containerID) +func (t *tailer) startIfNotRunning() { + t.mu.Lock() + defer t.mu.Unlock() + if !t.running { + level.Debug(t.logger).Log("msg", "starting process loop", "container", t.containerID) ctx := context.Background() - info, err := s.client.ContainerInspect(ctx, s.containerID) + info, err := t.client.ContainerInspect(ctx, t.containerID) if err != nil { - level.Error(s.logger).Log("msg", "could not inspect container info", "container", s.containerID, "err", err) - s.err = err + level.Error(t.logger).Log("msg", "could not inspect container info", "container", t.containerID, "err", err) + t.err = err return } - reader, err := s.client.ContainerLogs(ctx, s.containerID, container.LogsOptions{ + reader, err := t.client.ContainerLogs(ctx, t.containerID, container.LogsOptions{ ShowStdout: true, ShowStderr: true, Follow: true, Timestamps: true, - Since: strconv.FormatInt(s.since.Load(), 10), + Since: strconv.FormatInt(t.since.Load(), 10), }) if err != nil { - level.Error(s.logger).Log("msg", "could not fetch logs for container", "container", s.containerID, "err", err) - s.err = err + level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerID, "err", err) + t.err = err return } ctx, cancel := context.WithCancel(ctx) - s.cancel = cancel - s.running = true + t.cancel = cancel + t.running = true // processLoop will start 3 goroutines that we need to wait for if Stop is called. - s.wg.Add(3) - go s.processLoop(ctx, info.Config.Tty, reader) + t.wg.Add(3) + go t.processLoop(ctx, info.Config.Tty, reader) } } // stop shuts down the target. -func (s *tailer) stop() { - s.mu.Lock() - defer s.mu.Unlock() - if s.running { - s.running = false - if s.cancel != nil { - s.cancel() +func (t *tailer) stop() { + t.mu.Lock() + defer t.mu.Unlock() + if t.running { + t.running = false + if t.cancel != nil { + t.cancel() } - s.wg.Wait() - level.Debug(s.logger).Log("msg", "stopped Docker target", "container", s.containerID) + t.wg.Wait() + level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerID) // If the component is not stopping, then it means that the target for this component is gone and that // we should clear the entry from the positions file. - if !s.componentStopping() { - s.positions.Remove(positions.CursorKey(s.containerID), s.labelsStr) + if !t.componentStopping() { + t.positions.Remove(positions.CursorKey(t.containerID), t.labelsStr) } } } -func (s *tailer) Key() string { - return s.containerID +func (t *tailer) Key() string { + return t.containerID } -func (s *tailer) DebugInfo() any { - s.mu.Lock() - defer s.mu.Unlock() - running := s.running +func (t *tailer) DebugInfo() any { + t.mu.Lock() + defer t.mu.Unlock() + running := t.running var errMsg string - if s.err != nil { - errMsg = s.err.Error() + if t.err != nil { + errMsg = t.err.Error() } return sourceInfo{ - ID: s.containerID, + ID: t.containerID, LastError: errMsg, - Labels: s.labelsStr, + Labels: t.labelsStr, IsRunning: running, - ReadOffset: s.positions.GetString(positions.CursorKey(s.containerID), s.labelsStr), + ReadOffset: t.positions.GetString(positions.CursorKey(t.containerID), t.labelsStr), } } -func (s *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser) { +func (t *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser) { defer reader.Close() // Start transferring @@ -207,10 +207,10 @@ func (s *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser rstderr, wstderr := io.Pipe() go func() { defer func() { - s.wg.Done() + t.wg.Done() wstdout.Close() wstderr.Close() - s.stop() + t.stop() }() var written int64 var err error @@ -218,26 +218,26 @@ func (s *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser written, err = io.Copy(wstdout, reader) } else { // For non-TTY, wrap the pipe writers with our chunk writer to reassemble frames. - wcstdout := newChunkWriter(wstdout, s.logger) + wcstdout := newChunkWriter(wstdout, t.logger) defer wcstdout.Close() - wcstderr := newChunkWriter(wstderr, s.logger) + wcstderr := newChunkWriter(wstderr, t.logger) defer wcstderr.Close() written, err = stdcopy.StdCopy(wcstdout, wcstderr, reader) } if err != nil { - level.Warn(s.logger).Log("msg", "could not transfer logs", "written", written, "container", s.containerID, "err", err) + level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerID, "err", err) } else { - level.Info(s.logger).Log("msg", "finished transferring logs", "written", written, "container", s.containerID) + level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerID) } }() // Start processing - go s.process(rstdout, s.getStreamLabels("stdout")) - go s.process(rstderr, s.getStreamLabels("stderr")) + go t.process(rstdout, t.getStreamLabels("stdout")) + go t.process(rstderr, t.getStreamLabels("stderr")) // Wait until done <-ctx.Done() - level.Debug(s.logger).Log("msg", "done processing Docker logs", "container", s.containerID) + level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerID) } // extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice. @@ -277,8 +277,8 @@ func readLine(r *bufio.Reader) (string, error) { return string(ln), err } -func (s *tailer) process(r io.Reader, logStreamLset model.LabelSet) { - defer s.wg.Done() +func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) { + defer t.wg.Done() scanner := bufio.NewScanner(r) const maxCapacity = dockerMaxChunkSize * 64 @@ -289,19 +289,19 @@ func (s *tailer) process(r io.Reader, logStreamLset model.LabelSet) { ts, content, err := extractTsFromBytes(line) if err != nil { - level.Error(s.logger).Log("msg", "could not extract timestamp, skipping line", "err", err) - s.metrics.dockerErrors.Inc() + level.Error(t.logger).Log("msg", "could not extract timestamp, skipping line", "err", err) + t.metrics.dockerErrors.Inc() continue } - s.recv.Chan() <- loki.Entry{ + t.recv.Chan() <- loki.Entry{ Labels: logStreamLset, Entry: push.Entry{ Timestamp: ts, Line: string(content), }, } - s.metrics.dockerEntries.Inc() + t.metrics.dockerEntries.Inc() // NOTE(@tpaschalis) We don't save the positions entry with the // filtered labels, but with the default label set, as this is the one @@ -309,24 +309,24 @@ func (s *tailer) process(r io.Reader, logStreamLset model.LabelSet) { // problematic if we have the same container with a different set of // labels (e.g. duplicated and relabeled), but this shouldn't be the // case anyway. - s.positions.Put(positions.CursorKey(s.containerID), s.labelsStr, ts.Unix()) - s.since.Store(ts.Unix()) - s.last.Store(time.Now().Unix()) + t.positions.Put(positions.CursorKey(t.containerID), t.labelsStr, ts.Unix()) + t.since.Store(ts.Unix()) + t.last.Store(time.Now().Unix()) } if err := scanner.Err(); err != nil { - level.Error(s.logger).Log("msg", "error reading docker log line", "err", err) - s.metrics.dockerErrors.Inc() + level.Error(t.logger).Log("msg", "error reading docker log line", "err", err) + t.metrics.dockerErrors.Inc() } } -func (s *tailer) getStreamLabels(logStream string) model.LabelSet { +func (t *tailer) getStreamLabels(logStream string) model.LabelSet { // Add all labels from the config, relabel and filter them. lb := labels.NewBuilder(labels.EmptyLabels()) - for k, v := range s.labels { + for k, v := range t.labels { lb.Set(string(k), string(v)) } lb.Set(dockerLabelLogStream, logStream) - processed, _ := relabel.Process(lb.Labels(), s.relabelConfig...) + processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...) filtered := make(model.LabelSet) processed.Range(func(lbl labels.Label) { From cd895c7155073f86f80c168f93819aaaca321647 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 09:47:27 +0100 Subject: [PATCH 27/32] remove unused function --- internal/component/loki/source/docker/tailer.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 7c26588073..9147d4d8a2 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -261,22 +261,6 @@ func extractTsFromBytes(line []byte) (time.Time, []byte, error) { return ts, line[spaceIdx+1:], nil } -// https://devmarkpro.com/working-big-files-golang -func readLine(r *bufio.Reader) (string, error) { - var ( - isPrefix = true - err error - line, ln []byte - ) - - for isPrefix && err == nil { - line, isPrefix, err = r.ReadLine() - ln = append(ln, line...) - } - - return string(ln), err -} - func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) { defer t.wg.Done() From 648150be34216b3c61cb9ddbc9b152e01cb8c2a4 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:11:17 +0100 Subject: [PATCH 28/32] fix debug info --- internal/component/loki/source/docker/docker.go | 9 ++++----- internal/component/loki/source/docker/tailer.go | 2 +- internal/component/loki/source/file/file.go | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index e31d49f742..dfa243cde8 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -327,16 +327,15 @@ func (c *Component) DebugInfo() interface{} { var res readerDebugInfo for s := range c.scheduler.Sources() { - ds, ok := s.(source.DebugSource) - if ok { - res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) - } + t := s.(*tailer) + res.TargetsInfo = append(res.TargetsInfo, t.DebugInfo()) + } return res } type readerDebugInfo struct { - TargetsInfo []any `alloy:"targets_info,block"` + TargetsInfo []sourceInfo `alloy:"targets_info,block"` } type sourceInfo struct { diff --git a/internal/component/loki/source/docker/tailer.go b/internal/component/loki/source/docker/tailer.go index 9147d4d8a2..05b6df2a20 100644 --- a/internal/component/loki/source/docker/tailer.go +++ b/internal/component/loki/source/docker/tailer.go @@ -180,7 +180,7 @@ func (t *tailer) Key() string { return t.containerID } -func (t *tailer) DebugInfo() any { +func (t *tailer) DebugInfo() sourceInfo { t.mu.Lock() defer t.mu.Unlock() running := t.running diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 8270242326..863f5696d9 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -350,7 +350,7 @@ func (c *Component) scheduleSources() { } type debugInfo struct { - TargetsInfo []any `alloy:"targets_info,block"` + TargetsInfo []sourceDebugInfo `alloy:"targets_info,block"` } type sourceDebugInfo struct { @@ -370,7 +370,7 @@ func (c *Component) DebugInfo() any { for s := range c.scheduler.Sources() { ds, ok := s.(source.DebugSource) if ok { - res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo()) + res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo().(sourceDebugInfo)) } } return res From 0979991e968c1117b1095f030330702df2f30569 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 12:26:08 +0100 Subject: [PATCH 29/32] lint --- internal/component/loki/source/docker/docker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index dfa243cde8..e0a63bbdc5 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -329,7 +329,6 @@ func (c *Component) DebugInfo() interface{} { for s := range c.scheduler.Sources() { t := s.(*tailer) res.TargetsInfo = append(res.TargetsInfo, t.DebugInfo()) - } return res } From 4fcfb70bb97b6d259d0de5a64fff2608466efe06 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 13:04:08 +0100 Subject: [PATCH 30/32] Add a note about impotance of holding read lock --- internal/component/common/loki/fanout.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go index 2443988fec..c03c97cdbc 100644 --- a/internal/component/common/loki/fanout.go +++ b/internal/component/common/loki/fanout.go @@ -24,6 +24,13 @@ type Fanout struct { // Send forwards a log entry to all registered receivers. It returns an error // if the context is cancelled while sending. func (f *Fanout) Send(ctx context.Context, entry Entry) error { + // NOTE: It's important that we hold a read lock for the duration of Send + // rather than making a copy of children and releasing the lock early. When + // the Alloy config is updated and one or more receivers are removed, all + // updates are performed before a component is stopped. Because we hold the + // lock for the duration of this call, updates will be blocked until we have + // sent the entry to all receivers. + f.mut.RLock() defer f.mut.RUnlock() for _, recv := range f.children { From 86eb1e12835fa21e9cdfe648187bdbdb18757559 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 13:05:25 +0100 Subject: [PATCH 31/32] fix changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c33189ee0..830b34e13a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -158,7 +158,7 @@ v1.12.0 ### Enhancements -- Add per-applicationTestDockerChunkWriter rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout) +- Add per-application rate limiting with the `strategy` attribute in the `faro.receiver` component, to prevent one application from consuming the rate limit quota of others. (@hhertout) - Add support of `tls` in components `loki.source.(awsfirehose|gcplog|heroku|api)` and `prometheus.receive_http` and `pyroscope.receive_http`. (@fgouteroux) From 9eae721f2d624c86db7a00ad5b0eb12cd5b0ced8 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Wed, 10 Dec 2025 15:15:31 +0100 Subject: [PATCH 32/32] Update note --- internal/component/common/loki/fanout.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go index c03c97cdbc..35101d0504 100644 --- a/internal/component/common/loki/fanout.go +++ b/internal/component/common/loki/fanout.go @@ -25,11 +25,15 @@ type Fanout struct { // if the context is cancelled while sending. func (f *Fanout) Send(ctx context.Context, entry Entry) error { // NOTE: It's important that we hold a read lock for the duration of Send - // rather than making a copy of children and releasing the lock early. When - // the Alloy config is updated and one or more receivers are removed, all - // updates are performed before a component is stopped. Because we hold the - // lock for the duration of this call, updates will be blocked until we have - // sent the entry to all receivers. + // rather than making a copy of children and releasing the lock early. + // + // When config is updated, the loader evaluates all components and updates + // them while they continue running. The scheduler only stops removed components + // after all updates complete. During this window, Send may execute concurrently + // with receiver list updates. By holding the read lock for the entire Send + // operation, receiver list updates (which require a write lock) will block + // until all in-flight Send calls complete. This prevents sending entries to + // receivers that have been removed by the scheduler. f.mut.RLock() defer f.mut.RUnlock()