diff --git a/CHANGELOG.md b/CHANGELOG.md index e4d2c661fa..65d7b8722e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,10 @@ Main (unreleased) - Fix Docker log corruption for multiplexed long lines. (@axd1x8a) +- Fix issue in `loki.source.docker` where scheduling containers to tail could take too long, this would increase the chance of Alloy missing shortlived jobs. (@kalleep) + +- Fix potential deadlock in `loki.source.docker` when component is shutting down. (@kalleep) + v1.12.0 ----------------- diff --git a/internal/component/common/loki/fanout.go b/internal/component/common/loki/fanout.go new file mode 100644 index 0000000000..35101d0504 --- /dev/null +++ b/internal/component/common/loki/fanout.go @@ -0,0 +1,74 @@ +package loki + +import ( + "context" + "reflect" + "sync" +) + +// NewFanout creates a new Fanout that will send log entries to the provided +// list of LogsReceivers. +func NewFanout(children []LogsReceiver) *Fanout { + return &Fanout{ + children: children, + } +} + +// Fanout distributes log entries to multiple LogsReceivers. +// It is thread-safe and allows the list of receivers to be updated dynamically +type Fanout struct { + mut sync.RWMutex + children []LogsReceiver +} + +// Send forwards a log entry to all registered receivers. It returns an error +// if the context is cancelled while sending. +func (f *Fanout) Send(ctx context.Context, entry Entry) error { + // NOTE: It's important that we hold a read lock for the duration of Send + // rather than making a copy of children and releasing the lock early. + // + // When config is updated, the loader evaluates all components and updates + // them while they continue running. The scheduler only stops removed components + // after all updates complete. During this window, Send may execute concurrently + // with receiver list updates. By holding the read lock for the entire Send + // operation, receiver list updates (which require a write lock) will block + // until all in-flight Send calls complete. This prevents sending entries to + // receivers that have been removed by the scheduler. + + f.mut.RLock() + defer f.mut.RUnlock() + for _, recv := range f.children { + select { + case <-ctx.Done(): + return ctx.Err() + case recv.Chan() <- entry: + } + } + return nil +} + +// UpdateChildren updates the list of receivers that will receive log entries. +func (f *Fanout) UpdateChildren(children []LogsReceiver) { + f.mut.RLock() + if receiversChanged(f.children, children) { + // Upgrade lock to write. + f.mut.RUnlock() + f.mut.Lock() + f.children = children + f.mut.Unlock() + } else { + f.mut.RUnlock() + } +} + +func receiversChanged(prev, next []LogsReceiver) bool { + if len(prev) != len(next) { + return true + } + for i := range prev { + if !reflect.DeepEqual(prev[i], next[i]) { + return true + } + } + return false +} diff --git a/internal/component/loki/source/consume.go b/internal/component/loki/source/consume.go new file mode 100644 index 0000000000..0d3a69951e --- /dev/null +++ b/internal/component/loki/source/consume.go @@ -0,0 +1,27 @@ +package source + +import ( + "context" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +// Consume continuously reads log entries from recv and forwards them to the fanout f. +// It runs until ctx is cancelled or an error occurs while sending to the fanout. +// +// This function is typically used in component Run methods to handle the forwarding +// of log entries from a component's internal handler to downstream receivers. +// The fanout allows entries to be sent to multiple receivers concurrently. +func Consume(ctx context.Context, recv loki.LogsReceiver, f *loki.Fanout) { + for { + select { + case <-ctx.Done(): + return + case entry := <-recv.Chan(): + // NOTE: the only error we can get is context.Canceled. + if err := f.Send(ctx, entry); err != nil { + return + } + } + } +} diff --git a/internal/component/loki/source/consume_test.go b/internal/component/loki/source/consume_test.go new file mode 100644 index 0000000000..cc428f0ba7 --- /dev/null +++ b/internal/component/loki/source/consume_test.go @@ -0,0 +1,45 @@ +package source + +import ( + "context" + "sync" + "testing" + + "github.com/grafana/loki/pkg/push" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +func TestConsume(t *testing.T) { + consumer := loki.NewLogsReceiver() + producer := loki.NewLogsReceiver() + fanout := loki.NewFanout([]loki.LogsReceiver{consumer}) + + t.Run("should fanout any consumed entries", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Go(func() { + Consume(ctx, producer, fanout) + }) + + producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + e := <-consumer.Chan() + require.Equal(t, "1", e.Entry.Line) + cancel() + wg.Wait() + }) + + t.Run("should stop if context is canceled while trying to fanout", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + wg.Go(func() { + Consume(ctx, producer, fanout) + }) + + producer.Chan() <- loki.Entry{Entry: push.Entry{Line: "1"}} + cancel() + wg.Wait() + }) +} diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index d9c22920cb..e0a63bbdc5 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -19,13 +19,14 @@ import ( "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component" types "github.com/grafana/alloy/internal/component/common/config" "github.com/grafana/alloy/internal/component/common/loki" alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel" "github.com/grafana/alloy/internal/component/discovery" - dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" + "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -50,6 +51,8 @@ const ( dockerLabel = model.MetaLabelPrefix + "docker_" dockerLabelContainerPrefix = dockerLabel + "container_" dockerLabelContainerID = dockerLabelContainerPrefix + "id" + dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream" + dockerMaxChunkSize = 16384 ) // Arguments holds values which are used to configure the loki.source.docker @@ -102,19 +105,18 @@ var ( // Component implements the loki.source.file component. type Component struct { opts component.Options - metrics *dt.Metrics - - mut sync.RWMutex - args Arguments - manager *manager - lastOptions *options - handler loki.LogsReceiver - posFile positions.Positions - rcs []*relabel.Config - defaultLabels model.LabelSet - - receiversMut sync.RWMutex - receivers []loki.LogsReceiver + metrics *metrics + exited *atomic.Bool + + mut sync.RWMutex + args Arguments + scheduler *source.Scheduler[string] + client client.APIClient + handler loki.LogsReceiver + posFile positions.Positions + rcs []*relabel.Config + + fanout *loki.Fanout } // New creates a new loki.source.file component. @@ -134,12 +136,12 @@ func New(o component.Options, args Arguments) (*Component, error) { } c := &Component{ - opts: o, - metrics: dt.NewMetrics(o.Registerer), - + opts: o, + metrics: newMetrics(o.Registerer), + exited: atomic.NewBool(false), handler: loki.NewLogsReceiver(), - manager: newManager(o.Logger, nil), - receivers: args.ForwardTo, + scheduler: source.NewScheduler[string](), + fanout: loki.NewFanout(args.ForwardTo), posFile: positionsFile, } @@ -153,32 +155,21 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { - defer c.posFile.Stop() - defer func() { - c.mut.Lock() - defer c.mut.Unlock() - - // Guard for safety, but it's not possible for Run to be called without - // c.tailer being initialized. - if c.manager != nil { - c.manager.stop() - } + c.exited.Store(true) + c.posFile.Stop() + + // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). + source.Drain(c.handler, func() { + c.mut.Lock() + defer c.mut.Unlock() + c.scheduler.Stop() + }) }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.handler.Chan(): - c.receiversMut.RLock() - receivers := c.receivers - c.receiversMut.RUnlock() - for _, receiver := range receivers { - receiver.Chan() <- entry - } - } - } + // Start consume and fanout loop + source.Consume(ctx, c.handler, c.fanout) + return nil } type promTarget struct { @@ -190,41 +181,28 @@ type promTarget struct { func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) - // Update the receivers before anything else, just in case something fails. - c.receiversMut.Lock() - c.receivers = newArgs.ForwardTo - c.receiversMut.Unlock() - c.mut.Lock() defer c.mut.Unlock() - managerOpts, err := c.getManagerOptions(newArgs) + c.fanout.UpdateChildren(newArgs.ForwardTo) + + client, err := c.getClient(newArgs) if err != nil { return err } - if managerOpts != c.lastOptions { - // Options changed; pass it to the tailer. - // This will never fail because it only fails if the context gets canceled. - _ = c.manager.updateOptions(context.Background(), managerOpts) - c.lastOptions = managerOpts + if client != c.client { + c.client = client + // Stop all tailers because we need to restart them. + c.scheduler.Reset() } defaultLabels := make(model.LabelSet, len(newArgs.Labels)) for k, v := range newArgs.Labels { defaultLabels[model.LabelName(k)] = model.LabelValue(v) } - c.defaultLabels = defaultLabels - if len(newArgs.RelabelRules) > 0 { - c.rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules) - } else { - c.rcs = []*relabel.Config{} - } - - // Convert input targets into targets to give to tailer. - targets := make([]*dt.Target, 0, len(newArgs.Targets)) - seenTargets := make(map[string]struct{}, len(newArgs.Targets)) + c.rcs = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules) promTargets := make([]promTarget, len(newArgs.Targets)) for i, target := range newArgs.Targets { @@ -238,53 +216,74 @@ func (c *Component) Update(args component.Arguments) error { return promTargets[i].fingerPrint < promTargets[j].fingerPrint }) + shouldRun := make(map[string]struct{}, len(newArgs.Targets)) for _, markedTarget := range promTargets { containerID, ok := markedTarget.labels[dockerLabelContainerID] if !ok { level.Debug(c.opts.Logger).Log("msg", "docker target did not include container ID label:"+dockerLabelContainerID) continue } - if _, seen := seenTargets[string(containerID)]; seen { + + key := string(containerID) + if _, ok := shouldRun[key]; ok { + continue + } + shouldRun[key] = struct{}{} + + if c.scheduler.Contains(key) { continue } - seenTargets[string(containerID)] = struct{}{} - tgt, err := dt.NewTarget( + tailer, err := newTailer( c.metrics, log.With(c.opts.Logger, "target", fmt.Sprintf("docker/%s", containerID)), - c.manager.opts.handler, - c.manager.opts.positions, + c.handler, + c.posFile, string(containerID), - markedTarget.labels.Merge(c.defaultLabels), + markedTarget.labels.Merge(defaultLabels), c.rcs, - c.manager.opts.client, + client, + 5*time.Second, + func() bool { return c.exited.Load() }, ) + if err != nil { - return err + level.Error(c.opts.Logger).Log("msg", "failed to tail docker container", "containerID", containerID, "error", err) + continue } - targets = append(targets, tgt) + + c.scheduler.ScheduleSource(tailer) } - // This will never fail because it only fails if the context gets canceled. - _ = c.manager.syncTargets(context.Background(), targets) + // Avoid mutating the scheduler state during iteration. Collect sources to + // remove and stop them in a separate loop. + var toDelete []source.Source[string] + for source := range c.scheduler.Sources() { + if _, ok := shouldRun[source.Key()]; ok { + continue + } + toDelete = append(toDelete, source) + } + + for _, s := range toDelete { + c.scheduler.StopSource(s) // stops without blocking + } c.args = newArgs return nil } -// getTailerOptions gets tailer options from arguments. If args hasn't changed -// from the last call to getTailerOptions, c.lastOptions is returned. -// c.lastOptions must be updated by the caller. -// -// getTailerOptions must only be called when c.mut is held. -func (c *Component) getManagerOptions(args Arguments) (*options, error) { - if reflect.DeepEqual(c.args.Host, args.Host) && c.lastOptions != nil { - return c.lastOptions, nil +// getClient creates a client from args. If args hasn't changed +// from the last call to getClient, c.client is returned. +// getClient must only be called when c.mut is held. +func (c *Component) getClient(args Arguments) (client.APIClient, error) { + if reflect.DeepEqual(c.args.Host, args.Host) && c.client != nil { + return c.client, nil } hostURL, err := url.Parse(args.Host) if err != nil { - return c.lastOptions, err + return c.client, err } opts := []client.Opt{ @@ -298,7 +297,7 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { if hostURL.Scheme == "http" || hostURL.Scheme == "https" { rt, err := config.NewRoundTripperFromConfig(*args.HTTPClientConfig.Convert(), "docker_sd") if err != nil { - return c.lastOptions, err + return c.client, err } opts = append(opts, client.WithHTTPClient(&http.Client{ @@ -315,41 +314,33 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { client, err := client.NewClientWithOpts(opts...) if err != nil { level.Error(c.opts.Logger).Log("msg", "could not create new Docker client", "err", err) - return c.lastOptions, fmt.Errorf("failed to build docker client: %w", err) + return c.client, fmt.Errorf("failed to build docker client: %w", err) } - return &options{ - client: client, - handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), - positions: c.posFile, - targetRestartInterval: 5 * time.Second, - }, nil + return client, nil } // DebugInfo returns information about the status of tailed targets. func (c *Component) DebugInfo() interface{} { + c.mut.RLock() + defer c.mut.RUnlock() + var res readerDebugInfo - for _, tgt := range c.manager.targets() { - details := tgt.Details() - res.TargetsInfo = append(res.TargetsInfo, targetInfo{ - Labels: tgt.LabelsStr(), - ID: details["id"], - LastError: details["error"], - IsRunning: details["running"], - ReadOffset: details["position"], - }) + for s := range c.scheduler.Sources() { + t := s.(*tailer) + res.TargetsInfo = append(res.TargetsInfo, t.DebugInfo()) } return res } type readerDebugInfo struct { - TargetsInfo []targetInfo `alloy:"targets_info,block"` + TargetsInfo []sourceInfo `alloy:"targets_info,block"` } -type targetInfo struct { +type sourceInfo struct { ID string `alloy:"id,attr"` LastError string `alloy:"last_error,attr"` Labels string `alloy:"labels,attr"` - IsRunning string `alloy:"is_running,attr"` + IsRunning bool `alloy:"is_running,attr"` ReadOffset string `alloy:"read_offset,attr"` } diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index e120c4bed5..8d8dcee388 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -1,35 +1,19 @@ package docker import ( - "context" - "io" - "os" - "strings" "testing" "time" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/relabel" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/common/loki" - dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" - "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/runtime/componenttest" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" ) -const targetRestartInterval = 20 * time.Millisecond - -func Test(t *testing.T) { +func TestComponent(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` host = "tcp://127.0.0.1:9375" @@ -52,7 +36,7 @@ func Test(t *testing.T) { require.NoError(t, ctrl.WaitRunning(time.Minute)) } -func TestDuplicateTargets(t *testing.T) { +func TestComponentDuplicateTargets(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` host = "tcp://127.0.0.1:9376" @@ -85,8 +69,11 @@ func TestDuplicateTargets(t *testing.T) { }, args) require.NoError(t, err) - require.Len(t, cmp.manager.tasks, 1) - require.Equal(t, cmp.manager.tasks[0].target.LabelsStr(), "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}") + require.Equal(t, 1, cmp.scheduler.Len()) + for s := range cmp.scheduler.Sources() { + ss := s.(*tailer) + require.Equal(t, "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}", ss.labelsStr) + } var newCfg = ` host = "tcp://127.0.0.1:9376" @@ -99,127 +86,10 @@ func TestDuplicateTargets(t *testing.T) { err = syntax.Unmarshal([]byte(newCfg), &args) require.NoError(t, err) cmp.Update(args) - require.Len(t, cmp.manager.tasks, 1) // Although the order of the targets changed, the filtered target stays the same. - require.Equal(t, cmp.manager.tasks[0].target.LabelsStr(), "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}") -} - -func TestRestart(t *testing.T) { - finishedAt := "2024-05-02T13:11:55.879889Z" - var runningState atomic.Bool - runningState.Store(true) - client := clientMock{ - logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", - running: func() bool { return runningState.Load() }, - finishedAt: func() string { return finishedAt }, - } - expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" - - tailer, entryHandler := setupTailer(t, client) - go tailer.Run(t.Context()) - - // The container is already running, expect log lines. - assert.EventuallyWithT(t, func(c *assert.CollectT) { - logLines := entryHandler.Received() - if assert.NotEmpty(c, logLines) { - assert.Equal(c, expectedLogLine, logLines[0].Line) - } - }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") - - // Stops the container. - runningState.Store(false) - time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. - entryHandler.Clear() - time.Sleep(targetRestartInterval + 10*time.Millisecond) - assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. - - // Restart the container and expect log lines. - runningState.Store(true) - assert.EventuallyWithT(t, func(c *assert.CollectT) { - logLines := entryHandler.Received() - if assert.NotEmpty(c, logLines) { - assert.Equal(c, expectedLogLine, logLines[0].Line) - } - }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") -} - -func TestTargetNeverStarted(t *testing.T) { - runningState := false - finishedAt := "2024-05-02T13:11:55.879889Z" - client := clientMock{ - logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", - running: func() bool { return runningState }, - finishedAt: func() string { return finishedAt }, - } - expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" - - tailer, entryHandler := setupTailer(t, client) - - ctx, cancel := context.WithCancel(t.Context()) - go tailer.Run(ctx) - - assert.EventuallyWithT(t, func(c *assert.CollectT) { - logLines := entryHandler.Received() - if assert.NotEmpty(c, logLines) { - assert.Equal(c, expectedLogLine, logLines[0].Line) - } - }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") - - require.NotPanics(t, func() { cancel() }) -} - -func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *loki.CollectingHandler) { - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) - entryHandler = loki.NewCollectingHandler() - - ps, err := positions.New(logger, positions.Config{ - SyncPeriod: 10 * time.Second, - PositionsFile: t.TempDir() + "/positions.yml", - }) - require.NoError(t, err) - - tgt, err := dt.NewTarget( - dt.NewMetrics(prometheus.NewRegistry()), - logger, - entryHandler, - ps, - "flog", - model.LabelSet{"job": "docker"}, - []*relabel.Config{}, - client, - ) - require.NoError(t, err) - tailerTask := &tailerTask{ - options: &options{ - client: client, - targetRestartInterval: targetRestartInterval, - }, - target: tgt, + require.Equal(t, 1, cmp.scheduler.Len()) + for s := range cmp.scheduler.Sources() { + ss := s.(*tailer) + require.Equal(t, "{__meta_docker_container_id=\"foo\", __meta_docker_port_private=\"8080\"}", ss.labelsStr) } - return newTailer(logger, tailerTask), entryHandler -} - -type clientMock struct { - client.APIClient - logLine string - running func() bool - finishedAt func() string -} - -func (mock clientMock) ContainerInspect(ctx context.Context, c string) (container.InspectResponse, error) { - return container.InspectResponse{ - ContainerJSONBase: &container.ContainerJSONBase{ - ID: c, - State: &container.State{ - Running: mock.running(), - FinishedAt: mock.finishedAt(), - }, - }, - Config: &container.Config{Tty: true}, - }, nil -} - -func (mock clientMock) ContainerLogs(ctx context.Context, container string, options container.LogsOptions) (io.ReadCloser, error) { - return io.NopCloser(strings.NewReader(mock.logLine)), nil } diff --git a/internal/component/loki/source/docker/internal/dockertarget/metrics.go b/internal/component/loki/source/docker/metrics.go similarity index 64% rename from internal/component/loki/source/docker/internal/dockertarget/metrics.go rename to internal/component/loki/source/docker/metrics.go index 917a83aab0..ae5f4a1e7f 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/metrics.go +++ b/internal/component/loki/source/docker/metrics.go @@ -1,26 +1,23 @@ -package dockertarget - -// NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). -// The dockertarget package is used to configure and run the targets that can -// read logs from Docker containers and forward them to other loki components. +package docker import ( - "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/alloy/internal/util" ) -// Metrics holds a set of Docker target metrics. -type Metrics struct { +// metrics holds a set of Docker target metrics. +type metrics struct { reg prometheus.Registerer dockerEntries prometheus.Counter dockerErrors prometheus.Counter } -// NewMetrics creates a new set of Docker target metrics. If reg is non-nil, the +// newMetrics creates a new set of Docker target metrics. If reg is non-nil, the // metrics will be registered. -func NewMetrics(reg prometheus.Registerer) *Metrics { - var m Metrics +func newMetrics(reg prometheus.Registerer) *metrics { + var m metrics m.reg = reg m.dockerEntries = prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go deleted file mode 100644 index a09f4e39c5..0000000000 --- a/internal/component/loki/source/docker/runner.go +++ /dev/null @@ -1,247 +0,0 @@ -package docker - -// NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). - -import ( - "context" - "sync" - "time" - - "github.com/docker/docker/client" - "github.com/go-kit/log" - - "github.com/grafana/alloy/internal/component/common/loki" - dt "github.com/grafana/alloy/internal/component/loki/source/docker/internal/dockertarget" - "github.com/grafana/alloy/internal/component/loki/source/internal/positions" - "github.com/grafana/alloy/internal/runner" - "github.com/grafana/alloy/internal/runtime/logging/level" -) - -// A manager manages a set of running tailers. -type manager struct { - log log.Logger - - mut sync.Mutex - opts *options - tasks []*tailerTask - - runner *runner.Runner[*tailerTask] -} - -// newManager returns a new Manager which manages a set of running tailers. -// Options must not be modified after passing it to a Manager. -// -// If newManager is called with a nil set of options, no targets will be -// scheduled for running until UpdateOptions is called. -func newManager(l log.Logger, opts *options) *manager { - return &manager{ - log: l, - opts: opts, - runner: runner.New(func(t *tailerTask) runner.Worker { - return newTailer(l, t) - }), - } -} - -// options passed to all tailers. -type options struct { - // client to use to request logs from Docker. - client client.APIClient - - // handler to send discovered logs to. - handler loki.EntryHandler - - // positions interface so tailers can save/restore offsets in log files. - positions positions.Positions - - // targetRestartInterval to restart task that has stopped running. - targetRestartInterval time.Duration -} - -// tailerTask is the payload used to create tailers. It implements runner.Task. -type tailerTask struct { - options *options - target *dt.Target -} - -var _ runner.Task = (*tailerTask)(nil) - -func (tt *tailerTask) Hash() uint64 { return tt.target.Hash() } - -func (tt *tailerTask) Equals(other runner.Task) bool { - otherTask := other.(*tailerTask) - - // Quick path: pointers are exactly the same. - if tt == otherTask { - return true - } - - // Slow path: check individual fields which are part of the task. - return tt.options == otherTask.options && - tt.target.LabelsStr() == otherTask.target.LabelsStr() -} - -// A tailer tails the logs of a docker container. It is created by a [Manager]. -type tailer struct { - log log.Logger - opts *options - target *dt.Target -} - -// newTailer returns a new tailer which tails logs from the target specified by -// the task. -func newTailer(l log.Logger, task *tailerTask) *tailer { - return &tailer{ - log: log.WithPrefix(l, "target", task.target.Name()), - opts: task.options, - target: task.target, - } -} - -func (t *tailer) Run(ctx context.Context) { - ticker := time.NewTicker(t.opts.targetRestartInterval) - tickerC := ticker.C - - for { - select { - case <-tickerC: - res, err := t.opts.client.ContainerInspect(ctx, t.target.Name()) - if err != nil { - level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err) - continue - } - - finished, err := time.Parse(time.RFC3339Nano, res.State.FinishedAt) - if err != nil { - level.Error(t.log).Log("msg", "error parsing finished time for Docker container", "id", t.target.Name(), "error", err) - finished = time.Unix(0, 0) - } - - if res.State.Running || finished.Unix() >= t.target.Last() { - t.target.StartIfNotRunning() - } - case <-ctx.Done(): - t.target.Stop() - ticker.Stop() - return - } - } -} - -// syncTargets synchronizes the set of running tailers to the set specified by -// targets. -func (m *manager) syncTargets(ctx context.Context, targets []*dt.Target) error { - m.mut.Lock() - defer m.mut.Unlock() - - // Convert targets into tasks to give to the runner. - tasks := make([]*tailerTask, 0, len(targets)) - for _, target := range targets { - tasks = append(tasks, &tailerTask{ - options: m.opts, - target: target, - }) - } - - // Sync our tasks to the runner. If the Manager doesn't have any options, - // the runner will be cleared of tasks until UpdateOptions is called with a - // non-nil set of options. - switch m.opts { - default: - if err := m.runner.ApplyTasks(ctx, tasks); err != nil { - return err - } - case nil: - if err := m.runner.ApplyTasks(ctx, nil); err != nil { - return err - } - } - - // Delete positions for targets which have gone away. - newEntries := make(map[positions.Entry]struct{}, len(targets)) - for _, target := range targets { - newEntries[entryForTarget(target)] = struct{}{} - } - - for _, task := range m.tasks { - ent := entryForTarget(task.target) - - // The task from the last call to SyncTargets is no longer in newEntries; - // remove it from the positions file. We do this _after_ calling ApplyTasks - // to ensure that the old tailers have shut down, otherwise the tailer - // might write its position again during shutdown after we removed it. - if _, found := newEntries[ent]; !found { - level.Info(m.log).Log("msg", "removing entry from positions file", "path", ent.Path, "labels", ent.Labels) - m.opts.positions.Remove(ent.Path, ent.Labels) - } - } - - m.tasks = tasks - return nil -} - -func entryForTarget(t *dt.Target) positions.Entry { - // The positions entry is keyed by container_id; the path is fed into - // positions.CursorKey to treat it as a "cursor"; otherwise - // positions.Positions will try to read the path as a file and delete the - // entry when it can't find it. - return positions.Entry{ - Path: positions.CursorKey(t.Name()), - Labels: t.LabelsStr(), - } -} - -// updateOptions updates the Options shared with all Tailers. All Tailers will -// be updated with the new set of Options. Options should not be modified after -// passing to updateOptions. -// -// If newOptions is nil, all tasks will be cleared until updateOptions is -// called again with a non-nil set of options. -func (m *manager) updateOptions(ctx context.Context, newOptions *options) error { - m.mut.Lock() - defer m.mut.Unlock() - - // Iterate through the previous set of tasks and create a new task with the - // new set of options. - tasks := make([]*tailerTask, 0, len(m.tasks)) - for _, oldTask := range m.tasks { - tasks = append(tasks, &tailerTask{ - options: newOptions, - target: oldTask.target, - }) - } - - switch newOptions { - case nil: - if err := m.runner.ApplyTasks(ctx, nil); err != nil { - return err - } - default: - if err := m.runner.ApplyTasks(ctx, tasks); err != nil { - return err - } - } - - m.opts = newOptions - m.tasks = tasks - return nil -} - -// targets returns the set of targets which are actively being tailed. targets -// for tailers which have terminated are not included. The returned set of -// targets are deduplicated. -func (m *manager) targets() []*dt.Target { - tasks := m.runner.Tasks() - - targets := make([]*dt.Target, 0, len(tasks)) - for _, task := range tasks { - targets = append(targets, task.target) - } - return targets -} - -// stop stops the manager and all running Tailers. It blocks until all Tailers -// have exited. -func (m *manager) stop() { - m.runner.Stop() -} diff --git a/internal/component/loki/source/docker/internal/dockertarget/target.go b/internal/component/loki/source/docker/tailer.go similarity index 67% rename from internal/component/loki/source/docker/internal/dockertarget/target.go rename to internal/component/loki/source/docker/tailer.go index 97c00ac2c5..05b6df2a20 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target.go +++ b/internal/component/loki/source/docker/tailer.go @@ -1,4 +1,4 @@ -package dockertarget +package docker // NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). // The dockertarget package is used to configure and run the targets that can @@ -31,24 +31,18 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" ) -const ( - // See github.com/prometheus/prometheus/discovery/moby - dockerLabel = model.MetaLabelPrefix + "docker_" - dockerLabelContainerPrefix = dockerLabel + "container_" - dockerLabelLogStream = dockerLabelContainerPrefix + "log_stream" - dockerMaxChunkSize = 16384 -) - -// Target enables reading Docker container logs. -type Target struct { - logger log.Logger - handler loki.EntryHandler - positions positions.Positions - containerName string - labels model.LabelSet - labelsStr string - relabelConfig []*relabel.Config - metrics *Metrics +// tailer for Docker container logs. +type tailer struct { + logger log.Logger + recv loki.LogsReceiver + positions positions.Positions + containerID string + labels model.LabelSet + labelsStr string + relabelConfig []*relabel.Config + metrics *metrics + restartInterval time.Duration + componentStopping func() bool client client.APIClient @@ -63,39 +57,149 @@ type Target struct { since *atomic.Int64 } -// NewTarget starts a new target to read logs from a given container ID. -func NewTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, position positions.Positions, containerID string, labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient) (*Target, error) { +// newTailer starts a new tailer to read logs from a given container ID. +func newTailer( + metrics *metrics, logger log.Logger, recv loki.LogsReceiver, position positions.Positions, containerID string, + labels model.LabelSet, relabelConfig []*relabel.Config, client client.APIClient, restartInterval time.Duration, + componentStopping func() bool, +) (*tailer, error) { + labelsStr := labels.String() pos, err := position.Get(positions.CursorKey(containerID), labelsStr) if err != nil { return nil, err } - var since int64 - if pos != 0 { - since = pos + + return &tailer{ + logger: logger, + recv: recv, + since: atomic.NewInt64(pos), + last: atomic.NewInt64(0), + positions: position, + containerID: containerID, + labels: labels, + labelsStr: labelsStr, + relabelConfig: relabelConfig, + metrics: metrics, + client: client, + restartInterval: restartInterval, + componentStopping: componentStopping, + }, nil +} + +func (t *tailer) Run(ctx context.Context) { + ticker := time.NewTicker(t.restartInterval) + defer ticker.Stop() + + // start on initial call to Run. + t.startIfNotRunning() + + for { + select { + case <-ticker.C: + res, err := t.client.ContainerInspect(ctx, t.containerID) + if err != nil { + level.Error(t.logger).Log("msg", "error inspecting Docker container", "id", t.containerID, "error", err) + continue + } + + finished, err := time.Parse(time.RFC3339Nano, res.State.FinishedAt) + if err != nil { + level.Error(t.logger).Log("msg", "error parsing finished time for Docker container", "id", t.containerID, "error", err) + finished = time.Unix(0, 0) + } + + if res.State.Running || finished.Unix() >= t.last.Load() { + t.startIfNotRunning() + } + case <-ctx.Done(): + t.stop() + return + } } +} + +// startIfNotRunning starts processing container logs. The operation is idempotent, i.e. the processing cannot be started twice. +func (t *tailer) startIfNotRunning() { + t.mu.Lock() + defer t.mu.Unlock() + if !t.running { + level.Debug(t.logger).Log("msg", "starting process loop", "container", t.containerID) + + ctx := context.Background() + info, err := t.client.ContainerInspect(ctx, t.containerID) + if err != nil { + level.Error(t.logger).Log("msg", "could not inspect container info", "container", t.containerID, "err", err) + t.err = err + return + } + + reader, err := t.client.ContainerLogs(ctx, t.containerID, container.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: true, + Since: strconv.FormatInt(t.since.Load(), 10), + }) + if err != nil { + level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerID, "err", err) + t.err = err + return + } - t := &Target{ - logger: logger, - handler: handler, - since: atomic.NewInt64(since), - last: atomic.NewInt64(0), - positions: position, - containerName: containerID, - labels: labels, - labelsStr: labelsStr, - relabelConfig: relabelConfig, - metrics: metrics, - client: client, + ctx, cancel := context.WithCancel(ctx) + t.cancel = cancel + t.running = true + // processLoop will start 3 goroutines that we need to wait for if Stop is called. + t.wg.Add(3) + go t.processLoop(ctx, info.Config.Tty, reader) } +} + +// stop shuts down the target. +func (t *tailer) stop() { + t.mu.Lock() + defer t.mu.Unlock() + if t.running { + t.running = false + if t.cancel != nil { + t.cancel() + } + t.wg.Wait() + level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerID) - // NOTE (@tpaschalis) The original Promtail implementation would call - // t.StartIfNotRunning() right here to start tailing. - // We manage targets from a task's Run method. - return t, nil + // If the component is not stopping, then it means that the target for this component is gone and that + // we should clear the entry from the positions file. + if !t.componentStopping() { + t.positions.Remove(positions.CursorKey(t.containerID), t.labelsStr) + } + } } -func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser) { +func (t *tailer) Key() string { + return t.containerID +} + +func (t *tailer) DebugInfo() sourceInfo { + t.mu.Lock() + defer t.mu.Unlock() + running := t.running + + var errMsg string + if t.err != nil { + errMsg = t.err.Error() + } + + return sourceInfo{ + ID: t.containerID, + LastError: errMsg, + Labels: t.labelsStr, + IsRunning: running, + ReadOffset: t.positions.GetString(positions.CursorKey(t.containerID), t.labelsStr), + } +} + +func (t *tailer) processLoop(ctx context.Context, tty bool, reader io.ReadCloser) { defer reader.Close() // Start transferring @@ -106,7 +210,7 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser t.wg.Done() wstdout.Close() wstderr.Close() - t.Stop() + t.stop() }() var written int64 var err error @@ -121,9 +225,9 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser written, err = stdcopy.StdCopy(wcstdout, wcstderr, reader) } if err != nil { - level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerName, "err", err) + level.Warn(t.logger).Log("msg", "could not transfer logs", "written", written, "container", t.containerID, "err", err) } else { - level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerName) + level.Info(t.logger).Log("msg", "finished transferring logs", "written", written, "container", t.containerID) } }() @@ -133,7 +237,7 @@ func (t *Target) processLoop(ctx context.Context, tty bool, reader io.ReadCloser // Wait until done <-ctx.Done() - level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerName) + level.Debug(t.logger).Log("msg", "done processing Docker logs", "container", t.containerID) } // extractTsFromBytes parses an RFC3339Nano timestamp from the byte slice. @@ -157,7 +261,7 @@ func extractTsFromBytes(line []byte) (time.Time, []byte, error) { return ts, line[spaceIdx+1:], nil } -func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { +func (t *tailer) process(r io.Reader, logStreamLset model.LabelSet) { defer t.wg.Done() scanner := bufio.NewScanner(r) @@ -174,7 +278,7 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { continue } - t.handler.Chan() <- loki.Entry{ + t.recv.Chan() <- loki.Entry{ Labels: logStreamLset, Entry: push.Entry{ Timestamp: ts, @@ -189,7 +293,7 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { // problematic if we have the same container with a different set of // labels (e.g. duplicated and relabeled), but this shouldn't be the // case anyway. - t.positions.Put(positions.CursorKey(t.containerName), t.labelsStr, ts.Unix()) + t.positions.Put(positions.CursorKey(t.containerID), t.labelsStr, ts.Unix()) t.since.Store(ts.Unix()) t.last.Store(time.Now().Unix()) } @@ -199,107 +303,7 @@ func (t *Target) process(r io.Reader, logStreamLset model.LabelSet) { } } -// StartIfNotRunning starts processing container logs. The operation is idempotent , i.e. the processing cannot be started twice. -func (t *Target) StartIfNotRunning() { - t.mu.Lock() - defer t.mu.Unlock() - if !t.running { - level.Debug(t.logger).Log("msg", "starting process loop", "container", t.containerName) - - ctx := context.Background() - info, err := t.client.ContainerInspect(ctx, t.containerName) - if err != nil { - level.Error(t.logger).Log("msg", "could not inspect container info", "container", t.containerName, "err", err) - t.err = err - return - } - - reader, err := t.client.ContainerLogs(ctx, t.containerName, container.LogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Timestamps: true, - Since: strconv.FormatInt(t.since.Load(), 10), - }) - if err != nil { - level.Error(t.logger).Log("msg", "could not fetch logs for container", "container", t.containerName, "err", err) - t.err = err - return - } - - ctx, cancel := context.WithCancel(ctx) - t.cancel = cancel - t.running = true - // proccessLoop will start 3 goroutines that we need to wait for if Stop is called. - t.wg.Add(3) - go t.processLoop(ctx, info.Config.Tty, reader) - } -} - -// Stop shuts down the target. -func (t *Target) Stop() { - t.mu.Lock() - defer t.mu.Unlock() - if t.running { - t.running = false - if t.cancel != nil { - t.cancel() - } - t.wg.Wait() - level.Debug(t.logger).Log("msg", "stopped Docker target", "container", t.containerName) - } -} - -// Ready reports whether the target is running. -func (t *Target) Ready() bool { - t.mu.Lock() - defer t.mu.Unlock() - return t.running -} - -// LabelsStr returns the target's original labels string representation. -func (t *Target) LabelsStr() string { - return t.labelsStr -} - -// Name reports the container name. -func (t *Target) Name() string { - return t.containerName -} - -// Hash is used when comparing targets in tasks. -func (t *Target) Hash() uint64 { - return uint64(t.labels.Fingerprint()) -} - -// Path returns the target's container name. -func (t *Target) Path() string { - return t.containerName -} - -// Last returns the unix timestamp of the target's last processing loop. -func (t *Target) Last() int64 { return t.last.Load() } - -// Details returns target-specific details. -func (t *Target) Details() map[string]string { - t.mu.Lock() - running := t.running - - var errMsg string - if t.err != nil { - errMsg = t.err.Error() - } - t.mu.Unlock() - - return map[string]string{ - "id": t.containerName, - "error": errMsg, - "position": t.positions.GetString(positions.CursorKey(t.containerName), t.labelsStr), - "running": strconv.FormatBool(running), - } -} - -func (t *Target) getStreamLabels(logStream string) model.LabelSet { +func (t *tailer) getStreamLabels(logStream string) model.LabelSet { // Add all labels from the config, relabel and filter them. lb := labels.NewBuilder(labels.EmptyLabels()) for k, v := range t.labels { diff --git a/internal/component/loki/source/docker/internal/dockertarget/target_test.go b/internal/component/loki/source/docker/tailer_test.go similarity index 57% rename from internal/component/loki/source/docker/internal/dockertarget/target_test.go rename to internal/component/loki/source/docker/tailer_test.go index eeacc2b829..86aff4d4d7 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target_test.go +++ b/internal/component/loki/source/docker/tailer_test.go @@ -1,4 +1,4 @@ -package dockertarget +package docker // NOTE: This code is adapted from Promtail (90a1d4593e2d690b37333386383870865fe177bf). // The dockertarget package is used to configure and run the targets that can @@ -6,7 +6,9 @@ package dockertarget import ( "bytes" + "context" "encoding/json" + "io" "net/http" "net/http/httptest" "os" @@ -23,17 +25,19 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" ) -func TestDockerTarget(t *testing.T) { +const restartInterval = 20 * time.Millisecond + +func TestTailer(t *testing.T) { server := newDockerServer(t) defer server.Close() - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) + logger := log.NewNopLogger() entryHandler := loki.NewCollectingHandler() client, err := client.NewClientWithOpts(client.WithHost(server.URL)) require.NoError(t, err) @@ -44,18 +48,25 @@ func TestDockerTarget(t *testing.T) { }) require.NoError(t, err) - tgt, err := NewTarget( - NewMetrics(prometheus.NewRegistry()), + tailer, err := newTailer( + newMetrics(prometheus.NewRegistry()), logger, - entryHandler, + entryHandler.Receiver(), ps, "flog", model.LabelSet{"job": "docker"}, []*relabel.Config{}, client, + restartInterval, + func() bool { return false }, ) require.NoError(t, err) - tgt.StartIfNotRunning() + + ctx, cancel := context.WithCancel(t.Context()) + wg := sync.WaitGroup{} + wg.Go(func() { + tailer.Run(ctx) + }) expectedLines := []string{ "5.3.69.55 - - [09/Dec/2021:09:15:02 +0000] \"HEAD /brand/users/clicks-and-mortar/front-end HTTP/2.0\" 503 27087", @@ -69,14 +80,16 @@ func TestDockerTarget(t *testing.T) { assertExpectedLog(c, entryHandler, expectedLines) }, 5*time.Second, 100*time.Millisecond, "Expected log lines were not found within the time limit.") - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.False(c, tgt.Ready()) - }, 5*time.Second, 20*time.Millisecond, "Expected target to finish processing within the time limit.") - entryHandler.Clear() // restart target to simulate container restart - tgt.Stop() - tgt.StartIfNotRunning() + cancel() + wg.Wait() + + ctx, cancel = context.WithCancel(t.Context()) + defer cancel() + wg.Go(func() { + tailer.Run(ctx) + }) expectedLinesAfterRestart := []string{ "243.115.12.215 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /morph/exploit/granular HTTP/1.0\" 500 26468", "221.41.123.237 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /user-centric/whiteboard HTTP/2.0\" 205 22487", @@ -89,7 +102,7 @@ func TestDockerTarget(t *testing.T) { }, 5*time.Second, 100*time.Millisecond, "Expected log lines after restart were not found within the time limit.") } -func TestStartStopStressTest(t *testing.T) { +func TestTailerStartStopStressTest(t *testing.T) { server := newDockerServer(t) defer server.Close() @@ -105,19 +118,21 @@ func TestStartStopStressTest(t *testing.T) { client, err := client.NewClientWithOpts(client.WithHost(server.URL)) require.NoError(t, err) - tgt, err := NewTarget( - NewMetrics(prometheus.NewRegistry()), + tgt, err := newTailer( + newMetrics(prometheus.NewRegistry()), logger, - entryHandler, + entryHandler.Receiver(), ps, "flog", model.LabelSet{"job": "docker"}, []*relabel.Config{}, client, + restartInterval, + func() bool { return false }, ) require.NoError(t, err) - tgt.StartIfNotRunning() + tgt.startIfNotRunning() // Stress test the concurrency of StartIfNotRunning and Stop wg := sync.WaitGroup{} @@ -125,19 +140,83 @@ func TestStartStopStressTest(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - tgt.StartIfNotRunning() + tgt.startIfNotRunning() }() wg.Add(1) go func() { defer wg.Done() - tgt.Stop() + tgt.stop() }() } wg.Wait() } -func TestDockerChunkWriter(t *testing.T) { +func TestTailerRestart(t *testing.T) { + finishedAt := "2024-05-02T13:11:55.879889Z" + runningState := atomic.NewBool(true) + + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState.Load() }, + finishedAt: func() string { return finishedAt }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + go tailer.Run(t.Context()) + + // The container is already running, expect log lines. + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") + + // Stops the container. + runningState.Store(false) + time.Sleep(restartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. + entryHandler.Clear() + time.Sleep(restartInterval + 10*time.Millisecond) + assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. + + // Restart the container and expect log lines. + runningState.Store(true) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") +} + +func TestTailerNeverStarted(t *testing.T) { + runningState := false + finishedAt := "2024-05-02T13:11:55.879889Z" + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState }, + finishedAt: func() string { return finishedAt }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + + ctx, cancel := context.WithCancel(t.Context()) + go tailer.Run(ctx) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") + + require.NotPanics(t, func() { cancel() }) +} + +func TestChunkWriter(t *testing.T) { logger := log.NewNopLogger() var buf bytes.Buffer writer := newChunkWriter(&buf, logger) @@ -194,10 +273,12 @@ func newDockerServer(t *testing.T) *httptest.Server { default: w.Header().Set("Content-Type", "application/json") info := container.InspectResponse{ - ContainerJSONBase: &container.ContainerJSONBase{}, - Mounts: []container.MountPoint{}, - Config: &container.Config{Tty: false}, - NetworkSettings: &container.NetworkSettings{}, + ContainerJSONBase: &container.ContainerJSONBase{ + State: &container.State{}, + }, + Mounts: []container.MountPoint{}, + Config: &container.Config{Tty: false}, + NetworkSettings: &container.NetworkSettings{}, } writeErr = json.NewEncoder(w).Encode(info) } @@ -240,3 +321,54 @@ func containsString(slice []string, str string) bool { } return false } + +func setupTailer(t *testing.T, client clientMock) (*tailer, *loki.CollectingHandler) { + logger := log.NewNopLogger() + entryHandler := loki.NewCollectingHandler() + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + tailer, err := newTailer( + newMetrics(prometheus.NewRegistry()), + logger, + entryHandler.Receiver(), + ps, + "flog", + model.LabelSet{"job": "docker"}, + []*relabel.Config{}, + client, + restartInterval, + func() bool { return false }, + ) + require.NoError(t, err) + + return tailer, entryHandler +} + +type clientMock struct { + client.APIClient + logLine string + running func() bool + finishedAt func() string +} + +func (mock clientMock) ContainerInspect(ctx context.Context, c string) (container.InspectResponse, error) { + return container.InspectResponse{ + ContainerJSONBase: &container.ContainerJSONBase{ + ID: c, + State: &container.State{ + Running: mock.running(), + FinishedAt: mock.finishedAt(), + }, + }, + Config: &container.Config{Tty: true}, + }, nil +} + +func (mock clientMock) ContainerLogs(ctx context.Context, container string, options container.LogsOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(mock.logLine)), nil +} diff --git a/internal/component/loki/source/docker/internal/dockertarget/testdata/flog.log b/internal/component/loki/source/docker/testdata/flog.log similarity index 100% rename from internal/component/loki/source/docker/internal/dockertarget/testdata/flog.log rename to internal/component/loki/source/docker/testdata/flog.log diff --git a/internal/component/loki/source/docker/internal/dockertarget/testdata/flog_after_restart.log b/internal/component/loki/source/docker/testdata/flog_after_restart.log similarity index 100% rename from internal/component/loki/source/docker/internal/dockertarget/testdata/flog_after_restart.log rename to internal/component/loki/source/docker/testdata/flog_after_restart.log diff --git a/internal/component/loki/source/drain.go b/internal/component/loki/source/drain.go new file mode 100644 index 0000000000..acca25d41a --- /dev/null +++ b/internal/component/loki/source/drain.go @@ -0,0 +1,35 @@ +package source + +import ( + "context" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +// Drain consumes log entries from recv in a background goroutine while f executes. +// This prevents deadlocks that can occur when stopping components that may still be +// sending entries to the receiver channel. The draining goroutine will continue +// consuming entries until f returns, at which point the context is cancelled and +// the goroutine exits. +// +// This is typically used during component shutdown to drain any remaining entries +// from a receiver channel while performing cleanup operations. +func Drain(recv loki.LogsReceiver, f func()) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + for { + select { + case <-ctx.Done(): + return + case _, ok := <-recv.Chan(): + // Consume and discard entries to prevent channel blocking + if !ok { + return + } + } + } + }() + + f() +} diff --git a/internal/component/loki/source/drain_test.go b/internal/component/loki/source/drain_test.go new file mode 100644 index 0000000000..a29749ce89 --- /dev/null +++ b/internal/component/loki/source/drain_test.go @@ -0,0 +1,39 @@ +package source + +import ( + "sync" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/loki/pkg/push" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestDrain(t *testing.T) { + recv := loki.NewLogsReceiver() + + var wg sync.WaitGroup + wg.Go(func() { + for range 10 { + entry := loki.Entry{ + Labels: model.LabelSet{"test": "label"}, + Entry: push.Entry{ + Timestamp: time.Now(), + Line: "test log entry", + }, + } + recv.Chan() <- entry + } + }) + + completed := false + Drain(recv, func() { + time.Sleep(100 * time.Millisecond) + completed = true + }) + + wg.Wait() + require.True(t, completed, "Drain should complete without deadlock") +} diff --git a/internal/component/loki/source/file/decompresser.go b/internal/component/loki/source/file/decompresser.go index 2bed272e5c..10f9070ce0 100644 --- a/internal/component/loki/source/file/decompresser.go +++ b/internal/component/loki/source/file/decompresser.go @@ -339,8 +339,14 @@ func (d *decompressor) Key() positions.Entry { return d.key } -func (d *decompressor) IsRunning() bool { - return d.running.Load() +func (d *decompressor) DebugInfo() any { + offset, _ := d.positions.Get(d.key.Path, d.key.Labels) + return sourceDebugInfo{ + Path: d.key.Path, + Labels: d.key.Labels, + IsRunning: d.running.Load(), + ReadOffset: offset, + } } // cleanupMetrics removes all metrics exported by this reader diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 0da6c03cc0..863f5696d9 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "reflect" "sync" "time" @@ -17,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/discovery" + "github.com/grafana/alloy/internal/component/loki/source" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging/level" @@ -144,7 +144,7 @@ type Component struct { // new arguments. resolver resolver // scheduler owns the lifecycle of sources. - scheduler *Scheduler[positions.Entry] + scheduler *source.Scheduler[positions.Entry] // watcher is a background trigger that periodically invokes // scheduling when file matching is enabled. @@ -153,8 +153,7 @@ type Component struct { handler loki.LogsReceiver posFile positions.Positions - receiversMut sync.RWMutex - receivers []loki.LogsReceiver + fanout *loki.Fanout stopping atomic.Bool } @@ -184,9 +183,9 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, metrics: newMetrics(o.Registerer), handler: loki.NewLogsReceiver(), - receivers: args.ForwardTo, + fanout: loki.NewFanout(args.ForwardTo), posFile: positionsFile, - scheduler: NewScheduler[positions.Entry](), + scheduler: source.NewScheduler[positions.Entry](), watcher: time.NewTicker(args.FileMatch.SyncPeriod), } @@ -202,49 +201,24 @@ func New(o component.Options, args Arguments) (*Component, error) { func (c *Component) Run(ctx context.Context) error { defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping sources and positions file") - // We need to stop posFile first so we don't record entries we are draining c.posFile.Stop() - // Start black hole drain routine to prevent deadlock when we call c.t.Stop(). - drainCtx, cancelDrain := context.WithCancel(context.Background()) - defer cancelDrain() - go func() { - for { - select { - case <-drainCtx.Done(): - return - case <-c.handler.Chan(): // Ignore the remaining entries - } - } - }() - c.mut.Lock() - c.stopping.Store(true) - c.watcher.Stop() - c.scheduler.Stop() - close(c.handler.Chan()) - c.mut.Unlock() + + // Start black hole drain routine to prevent deadlock when we call c.scheduler.Stop(). + source.Drain(c.handler, func() { + c.mut.Lock() + c.stopping.Store(true) + c.watcher.Stop() + c.scheduler.Stop() + close(c.handler.Chan()) + c.mut.Unlock() + }) }() var wg sync.WaitGroup - wg.Go(func() { - for { - select { - case <-ctx.Done(): - return - case entry := <-c.handler.Chan(): - c.receiversMut.RLock() - for _, receiver := range c.receivers { - select { - case <-ctx.Done(): - c.receiversMut.RUnlock() - return - case receiver.Chan() <- entry: - } - } - c.receiversMut.RUnlock() - } - } - }) + + // Start consume and fanout loop + wg.Go(func() { source.Consume(ctx, c.handler, c.fanout) }) wg.Go(func() { for { @@ -276,16 +250,7 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() - c.receiversMut.RLock() - if receiversChanged(c.receivers, newArgs.ForwardTo) { - // Upgrade lock to write. - c.receiversMut.RUnlock() - c.receiversMut.Lock() - c.receivers = newArgs.ForwardTo - c.receiversMut.Unlock() - } else { - c.receiversMut.RUnlock() - } + c.fanout.UpdateChildren(newArgs.ForwardTo) // Choose resolver on FileMatch. if newArgs.FileMatch.Enabled { @@ -368,7 +333,7 @@ func (c *Component) scheduleSources() { c.scheduler.ScheduleSource(source) } - var toDelete []Source[positions.Entry] + var toDelete []source.Source[positions.Entry] // Avoid mutating the scheduler state during iteration. Collect sources to // remove and stop them in a separate loop. @@ -385,10 +350,10 @@ func (c *Component) scheduleSources() { } type debugInfo struct { - TargetsInfo []targetInfo `alloy:"targets_info,block"` + TargetsInfo []sourceDebugInfo `alloy:"targets_info,block"` } -type targetInfo struct { +type sourceDebugInfo struct { Path string `alloy:"path,attr"` Labels string `alloy:"labels,attr"` IsRunning bool `alloy:"is_running,attr"` @@ -403,13 +368,10 @@ func (c *Component) DebugInfo() any { defer c.mut.RUnlock() var res debugInfo for s := range c.scheduler.Sources() { - offset, _ := c.posFile.Get(s.Key().Path, s.Key().Labels) - res.TargetsInfo = append(res.TargetsInfo, targetInfo{ - Path: s.Key().Path, - Labels: s.Key().Labels, - IsRunning: s.IsRunning(), - ReadOffset: offset, - }) + ds, ok := s.(source.DebugSource) + if ok { + res.TargetsInfo = append(res.TargetsInfo, ds.DebugInfo().(sourceDebugInfo)) + } } return res } @@ -426,7 +388,7 @@ type sourceOptions struct { } // newSource will return a decompressor source if enabled, otherwise a tailer source. -func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], error) { +func (c *Component) newSource(opts sourceOptions) (source.Source[positions.Entry], error) { if opts.decompressionConfig.Enabled { decompressor, err := newDecompressor( c.metrics, @@ -452,7 +414,7 @@ func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], erro if err != nil { return nil, fmt.Errorf("failed to create tailer %w", err) } - return NewSourceWithRetry(tailer, backoff.Config{ + return source.NewSourceWithRetry(tailer, backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 10 * time.Second, }), nil @@ -461,15 +423,3 @@ func (c *Component) newSource(opts sourceOptions) (Source[positions.Entry], erro func (c *Component) IsStopping() bool { return c.stopping.Load() } - -func receiversChanged(prev, next []loki.LogsReceiver) bool { - if len(prev) != len(next) { - return true - } - for i := range prev { - if !reflect.DeepEqual(prev[i], next[i]) { - return true - } - } - return false -} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 9e89ba3e17..5ef536f1e6 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -416,10 +416,6 @@ func (t *tailer) Key() positions.Entry { return t.key } -func (t *tailer) IsRunning() bool { - return t.running.Load() -} - // cleanupMetrics removes all metrics exported by this tailer func (t *tailer) cleanupMetrics() { // When we stop tailing the file, also un-export metrics related to the file @@ -428,3 +424,13 @@ func (t *tailer) cleanupMetrics() { t.metrics.readBytes.DeleteLabelValues(t.key.Path) t.metrics.totalBytes.DeleteLabelValues(t.key.Path) } + +func (t *tailer) DebugInfo() any { + offset, _ := t.positions.Get(t.key.Path, t.key.Labels) + return sourceDebugInfo{ + Path: t.key.Path, + Labels: t.key.Labels, + IsRunning: t.running.Load(), + ReadOffset: offset, + } +} diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index 106e722124..56ddfe462b 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -355,7 +355,7 @@ func TestTailerCorruptedPositions(t *testing.T) { }() require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.True(c, tailer.IsRunning()) + assert.True(c, tailer.running.Load()) assert.Equal(c, "16", positionsFile.GetString(logFile.Name(), labels.String())) }, time.Second, 50*time.Millisecond) diff --git a/internal/component/loki/source/file/scheduler.go b/internal/component/loki/source/scheduler.go similarity index 83% rename from internal/component/loki/source/file/scheduler.go rename to internal/component/loki/source/scheduler.go index 610bde4b41..fe81607b3d 100644 --- a/internal/component/loki/source/file/scheduler.go +++ b/internal/component/loki/source/scheduler.go @@ -1,4 +1,4 @@ -package file +package source import ( "context" @@ -86,20 +86,33 @@ func (s *Scheduler[K]) Len() int { } // Stop will stop all running sources and wait for them to finish. +// Scheduler should not be reused after Stop is called. func (s *Scheduler[K]) Stop() { s.cancel() s.running.Wait() s.sources = make(map[K]scheduledSource[K]) } +// Reset will stop all running sources and wait for them to finish and reset +// Scheduler to a usable state. +func (s *Scheduler[K]) Reset() { + s.cancel() + s.running.Wait() + s.sources = make(map[K]scheduledSource[K]) + s.ctx, s.cancel = context.WithCancel(context.Background()) +} + type Source[K comparable] interface { // Run should start the source. // It should run until there is no more work or context is canceled. Run(ctx context.Context) // Key is used to uniquely identify the source. Key() K - // IsRunning reports if source is still running. - IsRunning() bool +} + +// DebugSource is an optional interface with debug information. +type DebugSource interface { + DebugInfo() any } func NewSourceWithRetry[K comparable](source Source[K], config backoff.Config) *SourceWithRetry[K] { @@ -126,8 +139,12 @@ func (s *SourceWithRetry[K]) Key() K { return s.source.Key() } -func (s *SourceWithRetry[K]) IsRunning() bool { - return s.source.IsRunning() +func (s *SourceWithRetry[K]) DebugInfo() any { + ss, ok := s.source.(DebugSource) + if !ok { + return nil + } + return ss.DebugInfo() } // scheduledSource is a source that is already scheduled. diff --git a/internal/component/loki/source/file/scheduler_test.go b/internal/component/loki/source/scheduler_test.go similarity index 99% rename from internal/component/loki/source/file/scheduler_test.go rename to internal/component/loki/source/scheduler_test.go index 0187c09dd1..e584444fbb 100644 --- a/internal/component/loki/source/file/scheduler_test.go +++ b/internal/component/loki/source/scheduler_test.go @@ -1,4 +1,4 @@ -package file +package source import ( "context"