diff --git a/CHANGELOG.md b/CHANGELOG.md index b9fdba84d5..288ff7edaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,8 @@ Main (unreleased) - Updated Loki dependencies to v3.6.2. (@thampiotr) +- Refactor tailer used in `loki.source.file` to reduce resource usage. (@kalleep) + ### Bugfixes - (_Public Preview_) Additions to `database_observability.postgres` component: diff --git a/go.mod b/go.mod index 498de85c83..7d3f0a8571 100644 --- a/go.mod +++ b/go.mod @@ -308,7 +308,6 @@ require ( google.golang.org/api v0.254.0 google.golang.org/grpc v1.76.0 google.golang.org/protobuf v1.36.10 - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible diff --git a/internal/component/loki/source/file/internal/tail/README.md b/internal/component/loki/source/file/internal/tail/README.md deleted file mode 100644 index a387d1da7b..0000000000 --- a/internal/component/loki/source/file/internal/tail/README.md +++ /dev/null @@ -1,32 +0,0 @@ - -**NOTE**: This is a fork of https://github.com/grafana/tail, which is a fork of https://github.com/hpcloud/tail. -The `grafana/tail` repo is no longer mainained because the Loki team has deprecated the Promtail project. -It is easier for the Alloy team to maintain this tail package inside the Alloy repo than to have a separate repository for it. - -Use outside of that context is not tested or supported. - -# Go package for tail-ing files - -A Go package striving to emulate the features of the BSD `tail` program. - -```Go -t, err := tail.TailFile("/var/log/nginx.log", tail.Config{Follow: true}) -for line := range t.Lines { - fmt.Println(line.Text) -} -``` - -See [API documentation](http://godoc.org/github.com/hpcloud/tail). - -## Log rotation - -Tail comes with full support for truncation/move detection as it is -designed to work with log rotation tools. - -## Installing - - go get github.com/hpcloud/tail/... - -## Windows support - -This package [needs assistance](https://github.com/hpcloud/tail/labels/Windows) for full Windows support. diff --git a/internal/component/loki/source/file/internal/tail/block.go b/internal/component/loki/source/file/internal/tail/block.go new file mode 100644 index 0000000000..083d2c0790 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/block.go @@ -0,0 +1,117 @@ +package tail + +import ( + "context" + "os" + "runtime" + + "github.com/grafana/dskit/backoff" + + "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext" +) + +// blockUntilExists blocks until the file specified in cfg exists or the context is canceled. +// It polls the file system at intervals defined by WatcherConfig polling frequencies. +// Returns an error if the context is canceled or an unrecoverable error occurs. +func blockUntilExists(ctx context.Context, cfg *Config) error { + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: cfg.WatcherConfig.MinPollFrequency, + MaxBackoff: cfg.WatcherConfig.MaxPollFrequency, + }) + + for backoff.Ongoing() { + if _, err := os.Stat(cfg.Filename); err == nil { + return nil + } else if !os.IsNotExist(err) { + return err + } + backoff.Wait() + } + + return backoff.Err() +} + +// event represents a file system event detected during polling. +type event int + +const ( + eventNone event = iota // no event detected + eventTruncated // file was truncated (size decreased) + eventModified // file was modified (size increased or modification time changed) + eventDeleted // file was deleted, moved, or renamed +) + +// blockUntilEvent blocks until it detects a file system event for the given file or the context is canceled. +// It polls the file system to detect modifications, truncations, deletions, or renames. +// The pos parameter is the current file position and is used to detect truncation events. +// Returns the detected event type and any error encountered. Returns eventNone if the context is canceled. +func blockUntilEvent(ctx context.Context, f *os.File, prevSize int64, cfg *Config) (event, error) { + // NOTE: it is important that we stat the open file here. Later we do os.Stat(cfg.Filename) + // and use os.IsSameFile to detect if file was rotated. + origFi, err := f.Stat() + if err != nil { + // If file no longer exists we treat it as a delete event. + if os.IsNotExist(err) { + return eventDeleted, nil + } + return eventNone, err + } + + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: cfg.WatcherConfig.MinPollFrequency, + MaxBackoff: cfg.WatcherConfig.MaxPollFrequency, + }) + + prevModTime := origFi.ModTime() + + for backoff.Ongoing() { + deletePending, err := fileext.IsDeletePending(f) + + // DeletePending is a windows state where the file has been queued + // for delete but won't actually get deleted until all handles are + // closed. It's a variation on the NotifyDeleted call below. + // + // IsDeletePending may fail in cases where the file handle becomes + // invalid, so we treat a failed call the same as a pending delete. + if err != nil || deletePending { + return eventDeleted, nil + } + + fi, err := os.Stat(cfg.Filename) + if err != nil { + // Windows cannot delete a file if a handle is still open (tail keeps one open) + // so it gives access denied to anything trying to read it until all handles are released. + if os.IsNotExist(err) || (runtime.GOOS == "windows" && os.IsPermission(err)) { + // File does not exist (has been deleted). + return eventDeleted, nil + } + return eventNone, err + } + + // File got moved/renamed? + if !os.SameFile(origFi, fi) { + return eventDeleted, nil + } + + // File got truncated? + currentSize := fi.Size() + if prevSize > 0 && prevSize > currentSize { + return eventTruncated, nil + } + + // File got bigger? + if prevSize < currentSize { + return eventModified, nil + } + + // File was appended to (changed)? + if fi.ModTime() != prevModTime { + return eventModified, nil + } + + // File hasn't changed so wait until next retry. + backoff.Wait() + } + + return eventNone, backoff.Err() +} diff --git a/internal/component/loki/source/file/internal/tail/block_test.go b/internal/component/loki/source/file/internal/tail/block_test.go new file mode 100644 index 0000000000..8baa555613 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/block_test.go @@ -0,0 +1,189 @@ +package tail + +import ( + "context" + "io" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext" +) + +func TestBlockUntilExists(t *testing.T) { + watcherConfig := WatcherConfig{ + MinPollFrequency: 5 * time.Millisecond, + MaxPollFrequency: 5 * time.Millisecond, + } + + t.Run("should block until file exists", func(t *testing.T) { + filename := filepath.Join(t.TempDir(), "eventually") + + go func() { + time.Sleep(10 * time.Millisecond) + createFileWithPath(t, filename, "") + }() + + err := blockUntilExists(context.Background(), &Config{ + Filename: filename, + WatcherConfig: watcherConfig, + }) + require.NoError(t, err) + }) + + t.Run("should exit when context is canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + + err := blockUntilExists(ctx, &Config{ + Filename: filepath.Join(t.TempDir(), "never"), + WatcherConfig: watcherConfig, + }) + require.ErrorIs(t, err, context.Canceled) + }) +} + +func TestBlockUntilEvent(t *testing.T) { + watcherConfig := WatcherConfig{ + MinPollFrequency: 5 * time.Millisecond, + MaxPollFrequency: 5 * time.Millisecond, + } + + t.Run("should return modified event when file is written to", func(t *testing.T) { + f := createEmptyFile(t, "startempty") + defer f.Close() + + go func() { + time.Sleep(10 * time.Millisecond) + _, err := f.WriteString("updated") + require.NoError(t, err) + }() + + event, err := blockUntilEvent(context.Background(), f, 0, &Config{ + Filename: f.Name(), + WatcherConfig: watcherConfig, + }) + require.NoError(t, err) + require.Equal(t, eventModified, event) + }) + + t.Run("should return modified event if mod time is updated", func(t *testing.T) { + f := createEmptyFile(t, "startempty") + defer f.Close() + + go func() { + time.Sleep(10 * time.Millisecond) + require.NoError(t, os.Chtimes(f.Name(), time.Now(), time.Now())) + }() + + event, err := blockUntilEvent(context.Background(), f, 0, &Config{ + Filename: f.Name(), + WatcherConfig: watcherConfig, + }) + require.NoError(t, err) + require.Equal(t, eventModified, event) + }) + + t.Run("should return deleted event if file is deleted", func(t *testing.T) { + f := createEmptyFile(t, "startempty") + require.NoError(t, f.Close()) + + // NOTE: important for windows that we open with correct flags. + f, err := fileext.OpenFile(f.Name()) + require.NoError(t, err) + defer f.Close() + + go func() { + time.Sleep(10 * time.Millisecond) + removeFile(t, f.Name()) + }() + + event, err := blockUntilEvent(context.Background(), f, 0, &Config{ + Filename: f.Name(), + WatcherConfig: watcherConfig, + }) + require.NoError(t, err) + require.Equal(t, eventDeleted, event) + }) + + t.Run("should return deleted event if file is deleted before", func(t *testing.T) { + f := createEmptyFile(t, "startempty") + require.NoError(t, f.Close()) + + // NOTE: important for windows that we open with correct flags. + f, err := fileext.OpenFile(f.Name()) + require.NoError(t, err) + defer f.Close() + + removeFile(t, f.Name()) + + event, err := blockUntilEvent(context.Background(), f, 0, &Config{ + Filename: f.Name(), + WatcherConfig: watcherConfig, + }) + require.NoError(t, err) + require.Equal(t, eventDeleted, event) + }) + + t.Run("should return truncated event", func(t *testing.T) { + f := createFileWithContent(t, "truncate", "content") + defer f.Close() + + offset, err := f.Seek(0, io.SeekEnd) + require.NoError(t, err) + + go func() { + time.Sleep(10 * time.Millisecond) + require.NoError(t, f.Truncate(0)) + }() + + event, err := blockUntilEvent(context.Background(), f, offset, &Config{ + Filename: f.Name(), + WatcherConfig: watcherConfig, + }) + require.NoError(t, err) + require.Equal(t, eventTruncated, event) + }) + + t.Run("should exit when context is canceled", func(t *testing.T) { + f := createEmptyFile(t, "startempty") + defer f.Close() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + + event, err := blockUntilEvent(ctx, f, 0, &Config{ + Filename: f.Name(), + WatcherConfig: watcherConfig, + }) + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, eventNone, event) + }) +} + +func createEmptyFile(t *testing.T, name string) *os.File { + path := filepath.Join(t.TempDir(), name) + f, err := os.Create(path) + require.NoError(t, err) + return f +} + +func createFileWithContent(t *testing.T, name, content string) *os.File { + path := createFile(t, name, content) + f, err := os.OpenFile(path, os.O_RDWR, 0) + require.NoError(t, err) + return f +} + +func createFileWithPath(t *testing.T, path, content string) { + require.NoError(t, os.WriteFile(path, []byte(content), 0600)) +} diff --git a/internal/component/loki/source/file/internal/tail/config.go b/internal/component/loki/source/file/internal/tail/config.go new file mode 100644 index 0000000000..2decfa3a73 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/config.go @@ -0,0 +1,41 @@ +package tail + +import ( + "time" + + "golang.org/x/text/encoding" +) + +// Config holds configuration for tailing a file. +type Config struct { + // Filename is the path to the file to tail. + Filename string + // Offset is the byte offset in the file where tailing should start. + // If 0, tailing starts from the beginning of the file. + Offset int64 + + // Decoder is an optional text decoder for non-UTF-8 encoded files. + // If the file is not UTF-8, the tailer must use the correct decoder + // or the output text may be corrupted. For example, if the file is + // "UTF-16 LE" encoded, the tailer would not separate new lines properly + // and the output could appear as garbled characters. + Decoder *encoding.Decoder + + // WatcherConfig controls how the file system is polled for changes. + WatcherConfig WatcherConfig +} + +// WatcherConfig controls the polling behavior for detecting file system events. +type WatcherConfig struct { + // MinPollFrequency and MaxPollFrequency specify the polling frequency range + // for detecting file system events. The actual polling frequency will vary + // within this range based on backoff behavior. + MinPollFrequency, MaxPollFrequency time.Duration +} + +// defaultWatcherConfig holds the default polling configuration used when +// WatcherConfig is not explicitly provided in Config. +var defaultWatcherConfig = WatcherConfig{ + MinPollFrequency: 250 * time.Millisecond, + MaxPollFrequency: 250 * time.Millisecond, +} diff --git a/internal/component/loki/source/file/internal/tail/file.go b/internal/component/loki/source/file/internal/tail/file.go new file mode 100644 index 0000000000..a09f9613bc --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/file.go @@ -0,0 +1,253 @@ +package tail + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + + "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +// NewFile creates a new File tailer for the specified file path. +// It opens the file and seeks to the provided offset if one is specified. +// The returned File can be used to read lines from the file as they are appended. +// The caller is responsible for calling Stop() when done to close the file and clean up resources. +func NewFile(logger log.Logger, cfg *Config) (*File, error) { + f, err := fileext.OpenFile(cfg.Filename) + if err != nil { + return nil, err + } + + if cfg.Offset != 0 { + // Seek to provided offset + if _, err := f.Seek(cfg.Offset, io.SeekStart); err != nil { + return nil, err + } + } + + if cfg.WatcherConfig == (WatcherConfig{}) { + cfg.WatcherConfig = defaultWatcherConfig + } + + cfg.WatcherConfig.MinPollFrequency = min(cfg.WatcherConfig.MinPollFrequency, cfg.WatcherConfig.MaxPollFrequency) + + ctx, cancel := context.WithCancel(context.Background()) + + return &File{ + cfg: cfg, + logger: logger, + file: f, + reader: newReader(f, cfg), + ctx: ctx, + cancel: cancel, + }, nil +} + +// File represents a file being tailed. It provides methods to read lines +// from the file as they are appended, handling file events such as truncation, +// deletion, and modification. File is safe for concurrent use. +type File struct { + cfg *Config + logger log.Logger + + // protects file, reader, and lastOffset. + mu sync.Mutex + file *os.File + reader *bufio.Reader + + lastOffset int64 + + ctx context.Context + cancel context.CancelFunc +} + +// Next reads and returns the next line from the file. +// It blocks until a line is available, file is closed or unrecoverable error occurs. +// If file was closed context.Canceled is returned. +func (f *File) Next() (*Line, error) { + select { + case <-f.ctx.Done(): + return nil, f.ctx.Err() + default: + } + + f.mu.Lock() + defer f.mu.Unlock() + +read: + text, err := f.readLine() + if err != nil { + if errors.Is(err, io.EOF) { + if err := f.wait(text != ""); err != nil { + return nil, err + } + goto read + } + return nil, err + } + + offset, err := f.offset() + if err != nil { + return nil, err + } + + f.lastOffset = offset + + return &Line{ + Text: text, + Offset: offset, + Time: time.Now(), + }, nil +} + +// Size returns the current size of the file in bytes. +// It is safe to call concurrently with other File methods. +func (f *File) Size() (int64, error) { + f.mu.Lock() + defer f.mu.Unlock() + + fi, err := f.file.Stat() + if err != nil { + return 0, err + } + return fi.Size(), nil +} + +// Stop closes the file and cancels any ongoing wait operations. +// After Stop is called, Next() will return errors for any subsequent calls. +// It is safe to call Stop multiple times. +func (f *File) Stop() error { + f.cancel() + f.mu.Lock() + defer f.mu.Unlock() + return f.file.Close() +} + +// wait blocks until a file event is detected (modification, truncation, or deletion). +// Returns an error if the context is canceled or an unrecoverable error occurs. +func (f *File) wait(partial bool) error { + offset, err := f.offset() + if err != nil { + return err + } + + event, err := blockUntilEvent(f.ctx, f.file, offset, f.cfg) + switch event { + case eventModified: + if partial { + // We need to reset to last successful offset because we consumed a partial line. + f.file.Seek(f.lastOffset, io.SeekStart) + f.reader.Reset(f.file) + } + return nil + case eventTruncated: + // We need to reopen the file when it was truncated. + return f.reopen(true) + case eventDeleted: + // In polling mode we could miss events when a file is deleted, so before we give up + // we try to reopen the file. + return f.reopen(false) + default: + return err + } +} + +// readLine reads a single line from the file, including the newline character. +// The newline and any trailing carriage return (for Windows line endings) are stripped. +func (f *File) readLine() (string, error) { + line, err := f.reader.ReadString('\n') + if err != nil { + return line, err + } + return strings.TrimRight(line, "\r\n"), err +} + +// offset returns the current byte offset in the file where the next read will occur. +// It accounts for buffered data in the reader. +func (f *File) offset() (int64, error) { + offset, err := f.file.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + + return offset - int64(f.reader.Buffered()), nil +} + +// reopen closes the current file handle and opens a new one for the same file path. +// If truncated is true, it indicates the file was truncated and we should reopen immediately. +// If truncated is false, it indicates the file was deleted or moved, and we should wait +// for it to be recreated before reopening. +// +// reopen handles the case where a file is reopened so quickly it's still the same file, +// which could cause the poller to hang on an open file handle to a file no longer being +// written to. It saves the current file handle info to ensure we only start tailing a +// different file instance. +func (f *File) reopen(truncated bool) error { + cf, err := f.file.Stat() + if !truncated && err != nil { + // We don't action on this error but are logging it, not expecting to see it happen and not sure if we + // need to action on it, cf is checked for nil later on to accommodate this + level.Debug(f.logger).Log("msg", "stat of old file returned, this is not expected and may result in unexpected behavior") + } + + f.file.Close() + + backoff := backoff.New(f.ctx, backoff.Config{ + MinBackoff: f.cfg.WatcherConfig.MinPollFrequency, + MaxBackoff: f.cfg.WatcherConfig.MaxPollFrequency, + MaxRetries: 20, + }) + + for backoff.Ongoing() { + file, err := fileext.OpenFile(f.cfg.Filename) + if err != nil { + if os.IsNotExist(err) { + level.Debug(f.logger).Log("msg", fmt.Sprintf("waiting for %s to appear...", f.cfg.Filename)) + if err := blockUntilExists(f.ctx, f.cfg); err != nil { + return fmt.Errorf("failed to detect creation of %s: %w", f.cfg.Filename, err) + } + continue + } + return fmt.Errorf("Unable to open file %s: %s", f.cfg.Filename, err) + } + + // File exists and is opened, get information about it. + nf, err := file.Stat() + if err != nil { + level.Debug(f.logger).Log("msg", "failed to stat new file to be tailed, will try to open it again") + file.Close() + backoff.Wait() + continue + } + + // Check to see if we are trying to reopen and tail the exact same file (and it was not truncated). + if !truncated && cf != nil && os.SameFile(cf, nf) { + file.Close() + backoff.Wait() + continue + } + + f.file = file + f.reader.Reset(f.file) + break + } + + return backoff.Err() +} + +func newReader(f *os.File, cfg *Config) *bufio.Reader { + if cfg.Decoder != nil { + return bufio.NewReader(cfg.Decoder.Reader(f)) + } + return bufio.NewReader(f) +} diff --git a/internal/component/loki/source/file/internal/tail/file_test.go b/internal/component/loki/source/file/internal/tail/file_test.go new file mode 100644 index 0000000000..a7265542c0 --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/file_test.go @@ -0,0 +1,259 @@ +package tail + +import ( + "context" + "os" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "golang.org/x/text/encoding/unicode" +) + +func TestFile(t *testing.T) { + verify := func(t *testing.T, f *File, expectedLine *Line, expectedErr error) { + t.Helper() + line, err := f.Next() + require.ErrorIs(t, err, expectedErr) + if expectedLine == nil { + require.Nil(t, line) + } else { + require.Equal(t, expectedLine.Text, line.Text) + require.Equal(t, expectedLine.Offset, line.Offset) + } + } + + t.Run("file must exist", func(t *testing.T) { + _, err := NewFile(log.NewNopLogger(), &Config{ + Filename: "/no/such/file", + }) + require.ErrorIs(t, err, os.ErrNotExist) + + name := createFile(t, "exists", "") + defer removeFile(t, name) + + _, err = NewFile(log.NewNopLogger(), &Config{ + Filename: name, + }) + require.NoError(t, err) + }) + + t.Run("over 4096 byte line", func(t *testing.T) { + testString := strings.Repeat("a", 4098) + + name := createFile(t, "over4096", "test\n"+testString+"\nhello\nworld\n") + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "test", Offset: 5}, nil) + verify(t, file, &Line{Text: testString, Offset: 4104}, nil) + verify(t, file, &Line{Text: "hello", Offset: 4110}, nil) + verify(t, file, &Line{Text: "world", Offset: 4116}, nil) + }) + + t.Run("read", func(t *testing.T) { + name := createFile(t, "read", "hello\nworld\ntest\n") + defer removeFile(t, name) + + const ( + first = 6 + middle = 12 + end = 17 + ) + + t.Run("start", func(t *testing.T) { + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + Offset: 0, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "hello", Offset: first}, nil) + verify(t, file, &Line{Text: "world", Offset: middle}, nil) + verify(t, file, &Line{Text: "test", Offset: end}, nil) + }) + + t.Run("skip first", func(t *testing.T) { + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + Offset: first, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "world", Offset: middle}, nil) + verify(t, file, &Line{Text: "test", Offset: end}, nil) + }) + + t.Run("last", func(t *testing.T) { + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + Offset: middle, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "test", Offset: end}, nil) + }) + }) + + t.Run("partial line", func(t *testing.T) { + name := createFile(t, "partial", "hello\nwo") + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Offset: 0, + Filename: name, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "hello", Offset: 6}, nil) + go func() { + time.Sleep(50 * time.Millisecond) + appendToFile(t, name, "rld\n") + }() + verify(t, file, &Line{Text: "world", Offset: 12}, nil) + }) + + t.Run("truncate", func(t *testing.T) { + name := createFile(t, "truncate", "a really long string goes here\nhello\nworld\n") + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: name, + WatcherConfig: WatcherConfig{ + MinPollFrequency: 50 * time.Millisecond, + MaxPollFrequency: 50 * time.Millisecond, + }, + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "a really long string goes here", Offset: 31}, nil) + verify(t, file, &Line{Text: "hello", Offset: 37}, nil) + verify(t, file, &Line{Text: "world", Offset: 43}, nil) + + go func() { + // truncate now + <-time.After(100 * time.Millisecond) + truncateFile(t, name, "h311o\nw0r1d\nendofworld\n") + }() + + verify(t, file, &Line{Text: "h311o", Offset: 6}, nil) + verify(t, file, &Line{Text: "w0r1d", Offset: 12}, nil) + verify(t, file, &Line{Text: "endofworld", Offset: 23}, nil) + + }) + + t.Run("stopped during wait", func(t *testing.T) { + name := createFile(t, "stopped", "hello\n") + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Offset: 0, + Filename: name, + }) + require.NoError(t, err) + + verify(t, file, &Line{Text: "hello", Offset: 6}, nil) + + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(t, file.Stop()) + }() + + _, err = file.Next() + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("stopped while waiting for file to be created", func(t *testing.T) { + name := createFile(t, "removed", "hello\n") + + file, err := NewFile(log.NewNopLogger(), &Config{ + Offset: 0, + Filename: name, + WatcherConfig: WatcherConfig{ + MinPollFrequency: 50 * time.Millisecond, + MaxPollFrequency: 50 * time.Millisecond, + }, + }) + require.NoError(t, err) + + verify(t, file, &Line{Text: "hello", Offset: 6}, nil) + removeFile(t, name) + + go func() { + time.Sleep(100 * time.Millisecond) + file.Stop() + }() + _, err = file.Next() + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("UTF-16LE", func(t *testing.T) { + file, err := NewFile(log.NewNopLogger(), &Config{ + Filename: "testdata/mssql.log", + Decoder: unicode.UTF16(unicode.LittleEndian, unicode.ExpectBOM).NewDecoder(), + }) + require.NoError(t, err) + defer file.Stop() + + verify(t, file, &Line{Text: "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", Offset: 528}, nil) + verify(t, file, &Line{Text: " Sep 24 2019 13:48:23 ", Offset: 552}, nil) + verify(t, file, &Line{Text: " Copyright (C) 2019 Microsoft Corporation", Offset: 595}, nil) + verify(t, file, &Line{Text: " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 (Build 20348: ) (Hypervisor)", Offset: 697}, nil) + verify(t, file, &Line{Text: "", Offset: 699}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", Offset: 756}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", Offset: 819}, nil) + verify(t, file, &Line{Text: "2025-03-11 11:11:02.72 Server All rights reserved.", Offset: 876}, nil) + }) + + t.Run("calls to next after stop", func(t *testing.T) { + name := createFile(t, "stopped", "hello\n") + defer removeFile(t, name) + + file, err := NewFile(log.NewNopLogger(), &Config{ + Offset: 0, + Filename: name, + }) + require.NoError(t, err) + file.Stop() + + verify(t, file, nil, context.Canceled) + }) +} + +func createFile(t *testing.T, name, content string) string { + path := t.TempDir() + "/" + name + require.NoError(t, os.WriteFile(path, []byte(content), 0600)) + return path +} + +func appendToFile(t *testing.T, name, content string) { + f, err := os.OpenFile(name, os.O_APPEND|os.O_WRONLY, 0600) + require.NoError(t, err) + defer f.Close() + _, err = f.WriteString(content) + require.NoError(t, err) +} + +func truncateFile(t *testing.T, name, content string) { + f, err := os.OpenFile(name, os.O_TRUNC|os.O_WRONLY, 0600) + require.NoError(t, err) + defer f.Close() + _, err = f.WriteString(content) + require.NoError(t, err) +} + +func removeFile(t *testing.T, name string) { + require.NoError(t, os.Remove(name)) +} diff --git a/internal/component/loki/source/file/internal/tail/tail_posix.go b/internal/component/loki/source/file/internal/tail/fileext/file_posix.go similarity index 83% rename from internal/component/loki/source/file/internal/tail/tail_posix.go rename to internal/component/loki/source/file/internal/tail/fileext/file_posix.go index 871dc1b288..de1f0fd648 100644 --- a/internal/component/loki/source/file/internal/tail/tail_posix.go +++ b/internal/component/loki/source/file/internal/tail/fileext/file_posix.go @@ -1,6 +1,6 @@ //go:build linux || darwin || freebsd || netbsd || openbsd -package tail +package fileext import ( "os" @@ -22,3 +22,7 @@ func OpenFile(name string) (file *os.File, err error) { } return os.Open(filename) } + +func IsDeletePending(_ *os.File) (bool, error) { + return false, nil +} diff --git a/internal/component/loki/source/file/internal/tail/winfile/winfile.go b/internal/component/loki/source/file/internal/tail/fileext/file_windows.go similarity index 59% rename from internal/component/loki/source/file/internal/tail/winfile/winfile.go rename to internal/component/loki/source/file/internal/tail/fileext/file_windows.go index a691207453..266ffb8e2c 100644 --- a/internal/component/loki/source/file/internal/tail/winfile/winfile.go +++ b/internal/component/loki/source/file/internal/tail/fileext/file_windows.go @@ -1,18 +1,30 @@ //go:build windows -package winfile +package fileext import ( "os" + "runtime" "syscall" "unsafe" + + "golang.org/x/sys/windows" ) // issue also described here -//https://codereview.appspot.com/8203043/ +// https://codereview.appspot.com/8203043/ + +// https://github.com/jnwhiteh/golang/blob/master/src/pkg/os/file_windows.go#L133 +func OpenFile(name string) (file *os.File, err error) { + f, e := open(name, os.O_RDONLY|syscall.O_CLOEXEC, 0) + if e != nil { + return nil, e + } + return os.NewFile(uintptr(f), name), nil +} // https://github.com/jnwhiteh/golang/blob/master/src/pkg/syscall/syscall_windows.go#L218 -func Open(path string, mode int, perm uint32) (fd syscall.Handle, err error) { +func open(path string, mode int, _ uint32) (fd syscall.Handle, err error) { if len(path) == 0 { return syscall.InvalidHandle, syscall.ERROR_FILE_NOT_FOUND } @@ -66,27 +78,38 @@ func makeInheritSa() *syscall.SecurityAttributes { return &sa } -// https://github.com/jnwhiteh/golang/blob/master/src/pkg/os/file_windows.go#L133 -func OpenFile(name string, flag int, perm os.FileMode) (file *os.File, err error) { - r, e := Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm)) - if e != nil { - return nil, e +func IsDeletePending(f *os.File) (bool, error) { + if f == nil { + return false, nil } - return os.NewFile(uintptr(r), name), nil -} -// https://github.com/jnwhiteh/golang/blob/master/src/pkg/os/file_posix.go#L61 -func syscallMode(i os.FileMode) (o uint32) { - o |= uint32(i.Perm()) - if i&os.ModeSetuid != 0 { - o |= syscall.S_ISUID - } - if i&os.ModeSetgid != 0 { - o |= syscall.S_ISGID + fi, err := getFileStandardInfo(f) + if err != nil { + return false, err } - if i&os.ModeSticky != 0 { - o |= syscall.S_ISVTX + + return fi.DeletePending, nil +} + +// From: https://github.com/microsoft/go-winio/blob/main/fileinfo.go +// FileStandardInfo contains extended information for the file. +// FILE_STANDARD_INFO in WinBase.h +// https://docs.microsoft.com/en-us/windows/win32/api/winbase/ns-winbase-file_standard_info +type fileStandardInfo struct { + AllocationSize, EndOfFile int64 + NumberOfLinks uint32 + DeletePending, Directory bool +} + +// GetFileStandardInfo retrieves ended information for the file. +func getFileStandardInfo(f *os.File) (*fileStandardInfo, error) { + si := &fileStandardInfo{} + if err := windows.GetFileInformationByHandleEx(windows.Handle(f.Fd()), + windows.FileStandardInfo, + (*byte)(unsafe.Pointer(si)), + uint32(unsafe.Sizeof(*si))); err != nil { + return nil, &os.PathError{Op: "GetFileInformationByHandleEx", Path: f.Name(), Err: err} } - // No mapping for Go's ModeTemporary (plan9 only). - return + runtime.KeepAlive(f) + return si, nil } diff --git a/internal/component/loki/source/file/internal/tail/line.go b/internal/component/loki/source/file/internal/tail/line.go new file mode 100644 index 0000000000..b52fa1842d --- /dev/null +++ b/internal/component/loki/source/file/internal/tail/line.go @@ -0,0 +1,14 @@ +package tail + +import "time" + +// Line represents a single line read from a tailed file. +type Line struct { + // Text is the content of the line, with line endings stripped. + Text string + // Offset is the byte offset in the file immediately after this line, + // which is where the next read will occur. + Offset int64 + // Time is the timestamp when the line was read from the file. + Time time.Time +} diff --git a/internal/component/loki/source/file/internal/tail/tail.go b/internal/component/loki/source/file/internal/tail/tail.go deleted file mode 100644 index 1eef9ac878..0000000000 --- a/internal/component/loki/source/file/internal/tail/tail.go +++ /dev/null @@ -1,401 +0,0 @@ -// Copyright (c) 2015 HPE Software Inc. All rights reserved. -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -package tail - -import ( - "bufio" - "errors" - "fmt" - "io" - "os" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "golang.org/x/text/encoding" - "gopkg.in/tomb.v1" - - "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/watch" -) - -var ( - ErrStop = errors.New("tail should now stop") -) - -type Line struct { - Text string - Time time.Time -} - -// SeekInfo represents arguments to `os.Seek` -type SeekInfo struct { - Offset int64 - Whence int // os.SEEK_* -} - -// Config is used to specify how a file must be tailed. -type Config struct { - Logger log.Logger - // Seek to this location before tailing - Location *SeekInfo - PollOptions watch.PollingFileWatcherOptions - - // Change the decoder if the file is not UTF-8. - // If the tailer doesn't use the right decoding, the output text may be gibberish. - // For example, if the file is "UTF-16 LE" encoded, the tailer would not separate - // the new lines properly and the output could come out as chinese characters. - Decoder *encoding.Decoder -} - -type Tail struct { - Filename string - Lines chan *Line - Config - - fileMut sync.Mutex - file *os.File - - readerMut sync.Mutex - reader *bufio.Reader - - watcher watch.FileWatcher - changes *watch.FileChanges - - tomb.Tomb // provides: Done, Kill, Dying -} - -// TailFile begins tailing the file. Output stream is made available -// via the `Tail.Lines` channel. To handle errors during tailing, -// invoke the `Wait` or `Err` method after finishing reading from the -// `Lines` channel. -func TailFile(filename string, config Config) (*Tail, error) { - t := &Tail{ - Filename: filename, - Lines: make(chan *Line), - Config: config, - } - - // when Logger was not specified in config, use default logger - if t.Logger == nil { - t.Logger = log.NewNopLogger() - } - - var err error - t.watcher, err = watch.NewPollingFileWatcher(filename, config.PollOptions) - if err != nil { - return nil, err - } - - t.file, err = OpenFile(t.Filename) - if err != nil { - return nil, err - } - - // Seek to requested location. - if t.Location != nil { - _, err := t.file.Seek(t.Location.Offset, t.Location.Whence) - if err != nil { - return nil, err - } - } - - t.watcher.SetFile(t.file) - - t.reader = t.getReader() - - go t.tailFileSync() - - return t, nil -} - -// Return the file's current position, like stdio's ftell(). -// But this value is not very accurate. -// it may readed one line in the chan(tail.Lines), -// so it may lost one line. -func (tail *Tail) Tell() (int64, error) { - tail.fileMut.Lock() - if tail.file == nil { - tail.fileMut.Unlock() - return 0, os.ErrNotExist - } - offset, err := tail.file.Seek(0, io.SeekCurrent) - tail.fileMut.Unlock() - if err != nil { - return 0, err - } - - tail.readerMut.Lock() - defer tail.readerMut.Unlock() - if tail.reader == nil { - return 0, nil - } - - offset -= int64(tail.reader.Buffered()) - return offset, nil -} - -// Size returns the length in bytes of the file being tailed, -// or 0 with an error if there was an error Stat'ing the file. -func (tail *Tail) Size() (int64, error) { - tail.fileMut.Lock() - f := tail.file - if f == nil { - tail.fileMut.Unlock() - return 0, os.ErrNotExist - } - fi, err := f.Stat() - tail.fileMut.Unlock() - - if err != nil { - return 0, err - } - size := fi.Size() - return size, nil -} - -// Stop stops the tailing activity. -func (tail *Tail) Stop() error { - tail.Kill(nil) - return tail.Wait() -} - -func (tail *Tail) close() { - close(tail.Lines) - tail.closeFile() -} - -func (tail *Tail) closeFile() { - tail.fileMut.Lock() - defer tail.fileMut.Unlock() - if tail.file != nil { - tail.file.Close() - tail.file = nil - } -} - -func (tail *Tail) reopen(truncated bool) error { - // There are cases where the file is reopened so quickly it's still the same file - // which causes the poller to hang on an open file handle to a file no longer being written to - // and which eventually gets deleted. Save the current file handle info to make sure we only - // start tailing a different file. - cf, err := tail.file.Stat() - if !truncated && err != nil { - level.Debug(tail.Logger).Log("msg", "stat of old file returned, this is not expected and may result in unexpected behavior") - // We don't action on this error but are logging it, not expecting to see it happen and not sure if we - // need to action on it, cf is checked for nil later on to accommodate this - } - - tail.closeFile() - retries := 20 - for { - var err error - tail.fileMut.Lock() - tail.file, err = OpenFile(tail.Filename) - tail.watcher.SetFile(tail.file) - tail.fileMut.Unlock() - if err != nil { - if os.IsNotExist(err) { - level.Debug(tail.Logger).Log("msg", fmt.Sprintf("Waiting for %s to appear...", tail.Filename)) - if err := tail.watcher.BlockUntilExists(&tail.Tomb); err != nil { - if err == tomb.ErrDying { - return err - } - return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err) - } - continue - } - return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err) - } - - // File exists and is opened, get information about it. - nf, err := tail.file.Stat() - if err != nil { - level.Debug(tail.Logger).Log("msg", "Failed to stat new file to be tailed, will try to open it again") - tail.closeFile() - continue - } - - // Check to see if we are trying to reopen and tail the exact same file (and it was not truncated). - if !truncated && cf != nil && os.SameFile(cf, nf) { - retries-- - if retries <= 0 { - return errors.New("gave up trying to reopen log file with a different handle") - } - - select { - case <-time.After(watch.DefaultPollingFileWatcherOptions.MaxPollFrequency): - tail.closeFile() - continue - case <-tail.Tomb.Dying(): - return tomb.ErrDying - } - } - break - } - return nil -} - -func (tail *Tail) readLine() (string, error) { - tail.readerMut.Lock() - line, err := tail.reader.ReadString('\n') - tail.readerMut.Unlock() - if err != nil { - // Note ReadString "returns the data read before the error" in - // case of an error, including EOF, so we return it as is. The - // caller is expected to process it if err is EOF. - return line, err - } - - line = strings.TrimRight(line, "\n") - - // Trim Windows line endings - line = strings.TrimSuffix(line, "\r") - - return line, err -} - -func (tail *Tail) tailFileSync() { - defer tail.Done() - defer tail.close() - - var ( - err error - offset int64 - oneMoreRun bool - ) - - // Read line by line. - for { - // grab the position in case we need to back up in the event of a half-line - offset, err = tail.Tell() - if err != nil { - tail.Kill(err) - return - } - - line, err := tail.readLine() - - // Process `line` even if err is EOF. - switch err { - case nil: - select { - case tail.Lines <- &Line{line, time.Now()}: - case <-tail.Dying(): - return - } - - case io.EOF: - if line != "" { - // this has the potential to never return the last line if - // it's not followed by a newline; seems a fair trade here - err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) - if err != nil { - tail.Kill(err) - return - } - } - - // oneMoreRun is set true when a file is deleted, - // this is to catch events which might get missed in polling mode. - // now that the last run is completed, finish deleting the file - if oneMoreRun { - oneMoreRun = false - err = tail.finishDelete() - if err != nil { - if err != ErrStop { - tail.Kill(err) - } - return - } - } - - // When EOF is reached, wait for more data to become available. - oneMoreRun, err = tail.waitForChanges() - if err != nil { - if err != ErrStop { - tail.Kill(err) - } - return - } - default: - // non-EOF error - tail.Killf("Error reading %s: %s", tail.Filename, err) - return - } - } -} - -// waitForChanges waits until the file has been appended, deleted, -// moved or truncated. When moved or deleted - the file will be -// reopened if ReOpen is true. Truncated files are always reopened. -func (tail *Tail) waitForChanges() (bool, error) { - if tail.changes == nil { - pos, err := tail.file.Seek(0, io.SeekCurrent) - if err != nil { - return false, err - } - tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos) - if err != nil { - return false, err - } - } - - select { - case <-tail.changes.Modified: - return false, nil - case <-tail.changes.Deleted: - // In polling mode we could miss events when a file is deleted, so before we give up our file handle - // run the poll one more time to catch anything we may have missed since the last poll. - return true, nil - case <-tail.changes.Truncated: - // Always reopen truncated files. - level.Debug(tail.Logger).Log("msg", fmt.Sprintf("Re-opening truncated file %s ...", tail.Filename)) - if err := tail.reopen(true); err != nil { - return false, err - } - level.Debug(tail.Logger).Log("msg", fmt.Sprintf("Successfully reopened truncated %s", tail.Filename)) - tail.readerMut.Lock() - tail.reader = tail.getReader() - tail.readerMut.Unlock() - return false, nil - case <-tail.Dying(): - return false, ErrStop - } -} - -func (tail *Tail) finishDelete() error { - tail.changes = nil - level.Debug(tail.Logger).Log("msg", fmt.Sprintf("Re-opening moved/deleted file %s ...", tail.Filename)) - if err := tail.reopen(false); err != nil { - return err - } - level.Debug(tail.Logger).Log("msg", fmt.Sprintf("Successfully reopened %s", tail.Filename)) - - tail.readerMut.Lock() - tail.reader = tail.getReader() - tail.readerMut.Unlock() - return nil -} - -func (tail *Tail) getReader() *bufio.Reader { - if tail.Decoder != nil { - return bufio.NewReader(tail.Decoder.Reader(tail.file)) - } else { - return bufio.NewReader(tail.file) - } -} - -func (tail *Tail) seekTo(pos SeekInfo) error { - _, err := tail.file.Seek(pos.Offset, pos.Whence) - if err != nil { - return fmt.Errorf("Seek error on %s: %s", tail.Filename, err) - } - // Reset the read buffer whenever the file is re-seek'ed - tail.readerMut.Lock() - tail.reader.Reset(tail.file) - tail.readerMut.Unlock() - return nil -} diff --git a/internal/component/loki/source/file/internal/tail/tail_test.go b/internal/component/loki/source/file/internal/tail/tail_test.go deleted file mode 100644 index 79943d81e9..0000000000 --- a/internal/component/loki/source/file/internal/tail/tail_test.go +++ /dev/null @@ -1,285 +0,0 @@ -// Copyright (c) 2015 HPE Software Inc. All rights reserved. -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -// TODO: -// * repeat all the tests with Poll:true - -package tail - -import ( - _ "fmt" - "io" - "os" - "strings" - "sync" - "testing" - "time" - - "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/watch" - "github.com/stretchr/testify/assert" - "golang.org/x/text/encoding/unicode" -) - -var testPollingOptions = watch.PollingFileWatcherOptions{ - // Use a smaller poll duration for faster test runs. Keep it below - // 100ms (which value is used as common delays for tests) - MinPollFrequency: 5 * time.Millisecond, - MaxPollFrequency: 5 * time.Millisecond, -} - -func TestTail(t *testing.T) { - verify := func(t *testing.T, tail *Tail, lines []string) { - got := make([]string, 0, len(lines)) - - var wg sync.WaitGroup - wg.Go(func() { - for { - line := <-tail.Lines - got = append(got, line.Text) - if len(got) == len(lines) { - return - } - } - }) - wg.Wait() - assert.Equal(t, lines, got) - } - - t.Run("file must exist", func(t *testing.T) { - _, err := TailFile("/no/such/file", Config{}) - assert.Error(t, err) - }) - - t.Run("should be able to stop", func(t *testing.T) { - tail, err := TailFile("README.md", Config{}) - assert.NoError(t, err) - assert.NoError(t, tail.Stop()) - }) - - t.Run("over 4096 byte line", func(t *testing.T) { - tailTest := NewTailTest("Over4096ByteLine", t) - testString := strings.Repeat("a", 4097) - tailTest.CreateFile("test.txt", "test\n"+testString+"\nhello\nworld\n") - defer tailTest.RemoveFile("test.txt") - tail := tailTest.StartTail("test.txt", Config{}) - defer tail.Stop() - - verify(t, tail, []string{"test", testString, "hello", "world"}) - }) - - t.Run("read full", func(t *testing.T) { - tailTest := NewTailTest("location-full", t) - tailTest.CreateFile("test.txt", "hello\nworld\n") - defer tailTest.RemoveFile("test.txt") - tail := tailTest.StartTail("test.txt", Config{Location: nil}) - defer tail.Stop() - - verify(t, tail, []string{"hello", "world"}) - }) - - t.Run("read end", func(t *testing.T) { - tailTest := NewTailTest("location-end", t) - tailTest.CreateFile("test.txt", "hello\nworld\n") - defer tailTest.RemoveFile("test.txt") - tail := tailTest.StartTail("test.txt", Config{Location: &SeekInfo{0, io.SeekEnd}}) - defer tail.Stop() - - go func() { - <-time.After(100 * time.Millisecond) - tailTest.AppendFile("test.txt", "more\ndata\n") - - <-time.After(100 * time.Millisecond) - tailTest.AppendFile("test.txt", "more\ndata\n") - }() - - verify(t, tail, []string{"more", "data", "more", "data"}) - }) - - t.Run("read middle", func(t *testing.T) { - tailTest := NewTailTest("location-middle", t) - tailTest.CreateFile("test.txt", "hello\nworld\n") - defer tailTest.RemoveFile("test.txt") - tail := tailTest.StartTail("test.txt", Config{Location: &SeekInfo{-6, io.SeekEnd}}) - defer tail.Stop() - - go func() { - <-time.After(100 * time.Millisecond) - tailTest.AppendFile("test.txt", "more\ndata\n") - - <-time.After(100 * time.Millisecond) - tailTest.AppendFile("test.txt", "more\ndata\n") - }() - - verify(t, tail, []string{"world", "more", "data", "more", "data"}) - }) - - t.Run("reseek", func(t *testing.T) { - tailTest := NewTailTest("reseek-polling", t) - tailTest.CreateFile("test.txt", "a really long string goes here\nhello\nworld\n") - defer tailTest.RemoveFile("test.txt") - tail := tailTest.StartTail("test.txt", Config{PollOptions: testPollingOptions}) - defer tail.Stop() - - go func() { - // truncate now - <-time.After(100 * time.Millisecond) - tailTest.TruncateFile("test.txt", "h311o\nw0r1d\nendofworld\n") - }() - - verify(t, tail, []string{"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"}) - }) - - t.Run("tell", func(t *testing.T) { - tailTest := NewTailTest("tell-position", t) - tailTest.CreateFile("test.txt", "hello\nworld\nagain\nmore\n") - defer tailTest.RemoveFile("test.txt") - - tail := tailTest.StartTail("test.txt", Config{Location: &SeekInfo{0, io.SeekStart}}) - - // read one line - <-tail.Lines - offset, err := tail.Tell() - assert.NoError(t, err) - - tail.Stop() - - tail = tailTest.StartTail("test.txt", Config{Location: &SeekInfo{offset, io.SeekStart}}) - l := <-tail.Lines - assert.Equal(t, "again", l.Text) - - tail.Stop() - }) - - t.Run("UTF-16LE", func(t *testing.T) { - tail, err := TailFile("testdata/mssql.log", Config{Decoder: unicode.UTF16(unicode.LittleEndian, unicode.ExpectBOM).NewDecoder()}) - assert.NoError(t, err) - defer tail.Stop() - - expectedLines := []string{ - "2025-03-11 11:11:02.58 Server Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) ", - " Sep 24 2019 13:48:23 ", - " Copyright (C) 2019 Microsoft Corporation", - " Enterprise Edition (64-bit) on Windows Server 2022 Standard 10.0 (Build 20348: ) (Hypervisor)", - "", - "2025-03-11 11:11:02.71 Server UTC adjustment: 1:00", - "2025-03-11 11:11:02.71 Server (c) Microsoft Corporation.", - "2025-03-11 11:11:02.72 Server All rights reserved.", - } - - verify(t, tail, expectedLines) - }) -} - -func TestTellRace(t *testing.T) { - tailTest := NewTailTest("tell-race", t) - tailTest.CreateFile("test.txt", "hello\nworld\n") - - tail := tailTest.StartTail("test.txt", Config{PollOptions: testPollingOptions}) - - <-tail.Lines - <-tail.Lines - - _, err := tail.Tell() - if err != nil { - t.Fatal("unexpected error", err) - } - - tailTest.TruncateFile("test.txt", "yay\nyay2\n") - - // wait for reopen to happen - time.Sleep(100 * time.Millisecond) - - _, err = tail.Tell() - if err != nil { - t.Fatal("unexpected error", err) - } - -} - -func TestSizeRace(t *testing.T) { - tailTest := NewTailTest("tell-race", t) - tailTest.CreateFile("test.txt", "hello\nworld\n") - tail := tailTest.StartTail("test.txt", Config{PollOptions: testPollingOptions}) - - <-tail.Lines - <-tail.Lines - - s1, err := tail.Size() - if err != nil { - t.Fatal("unexpected error", err) - } - - tailTest.TruncateFile("test.txt", "yay\nyay2\n") // smaller than before - - // wait for reopen to happen - time.Sleep(100 * time.Millisecond) - - s2, err := tail.Size() - if err != nil { - t.Fatal("unexpected error", err) - } - - if s2 == 0 || s2 > s1 { - t.Fatal("expected 0 < s2 < s1! s1:", s1, "s2:", s2) - } -} - -// Test library -type TailTest struct { - Name string - path string - t *testing.T -} - -func NewTailTest(name string, t *testing.T) TailTest { - tt := TailTest{name, t.TempDir() + "/" + name, t} - err := os.MkdirAll(tt.path, os.ModeTemporary|0700) - if err != nil { - tt.t.Fatal(err) - } - - return tt -} - -func (t TailTest) CreateFile(name string, contents string) { - assert.NoError(t.t, os.WriteFile(t.path+"/"+name, []byte(contents), 0600)) -} - -func (t TailTest) AppendToFile(name string, contents string) { - assert.NoError(t.t, os.WriteFile(t.path+"/"+name, []byte(contents), 0600|os.ModeAppend)) - -} - -func (t TailTest) RemoveFile(name string) { - err := os.Remove(t.path + "/" + name) - assert.NoError(t.t, err) - -} - -func (t TailTest) RenameFile(oldname string, newname string) { - oldname = t.path + "/" + oldname - newname = t.path + "/" + newname - assert.NoError(t.t, os.Rename(oldname, newname)) -} - -func (t TailTest) AppendFile(name string, contents string) { - f, err := os.OpenFile(t.path+"/"+name, os.O_APPEND|os.O_WRONLY, 0600) - assert.NoError(t.t, err) - defer f.Close() - _, err = f.WriteString(contents) - assert.NoError(t.t, err) -} - -func (t TailTest) TruncateFile(name string, contents string) { - f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0600) - assert.NoError(t.t, err) - defer f.Close() - _, err = f.WriteString(contents) - assert.NoError(t.t, err) -} - -func (t TailTest) StartTail(name string, config Config) *Tail { - tail, err := TailFile(t.path+"/"+name, config) - assert.NoError(t.t, err) - return tail -} diff --git a/internal/component/loki/source/file/internal/tail/tail_windows.go b/internal/component/loki/source/file/internal/tail/tail_windows.go deleted file mode 100644 index 7593c7617c..0000000000 --- a/internal/component/loki/source/file/internal/tail/tail_windows.go +++ /dev/null @@ -1,13 +0,0 @@ -//go:build windows - -package tail - -import ( - "os" - - "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/winfile" -) - -func OpenFile(name string) (file *os.File, err error) { - return winfile.OpenFile(name, os.O_RDONLY, 0) -} diff --git a/internal/component/loki/source/file/internal/tail/util/util.go b/internal/component/loki/source/file/internal/tail/util/util.go deleted file mode 100644 index 54151fe39f..0000000000 --- a/internal/component/loki/source/file/internal/tail/util/util.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2015 HPE Software Inc. All rights reserved. -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -package util - -import ( - "fmt" - "log" - "os" - "runtime/debug" -) - -type Logger struct { - *log.Logger -} - -var LOGGER = &Logger{log.New(os.Stderr, "", log.LstdFlags)} - -// fatal is like panic except it displays only the current goroutine's stack. -func Fatal(format string, v ...interface{}) { - // https://github.com/hpcloud/log/blob/master/log.go#L45 - LOGGER.Output(2, fmt.Sprintf("FATAL -- "+format, v...)+"\n"+string(debug.Stack())) - os.Exit(1) -} - -// partitionString partitions the string into chunks of given size, -// with the last chunk of variable size. -func PartitionString(s string, chunkSize int) []string { - if chunkSize <= 0 { - panic("invalid chunkSize") - } - length := len(s) - chunks := 1 + length/chunkSize - start := 0 - end := chunkSize - parts := make([]string, 0, chunks) - for { - if end > length { - end = length - } - parts = append(parts, s[start:end]) - if end == length { - break - } - start, end = end, end+chunkSize - } - return parts -} diff --git a/internal/component/loki/source/file/internal/tail/watch/file_posix.go b/internal/component/loki/source/file/internal/tail/watch/file_posix.go deleted file mode 100644 index dae82ae94d..0000000000 --- a/internal/component/loki/source/file/internal/tail/watch/file_posix.go +++ /dev/null @@ -1,9 +0,0 @@ -//go:build linux || darwin || freebsd || netbsd || openbsd - -package watch - -import "os" - -func IsDeletePending(_ *os.File) (bool, error) { - return false, nil -} diff --git a/internal/component/loki/source/file/internal/tail/watch/file_windows.go b/internal/component/loki/source/file/internal/tail/watch/file_windows.go deleted file mode 100644 index d1ca7c64d0..0000000000 --- a/internal/component/loki/source/file/internal/tail/watch/file_windows.go +++ /dev/null @@ -1,47 +0,0 @@ -//go:build windows - -package watch - -import ( - "os" - "runtime" - "unsafe" - - "golang.org/x/sys/windows" -) - -func IsDeletePending(f *os.File) (bool, error) { - if f == nil { - return false, nil - } - - fi, err := getFileStandardInfo(f) - if err != nil { - return false, err - } - - return fi.DeletePending, nil -} - -// From: https://github.com/microsoft/go-winio/blob/main/fileinfo.go -// FileStandardInfo contains extended information for the file. -// FILE_STANDARD_INFO in WinBase.h -// https://docs.microsoft.com/en-us/windows/win32/api/winbase/ns-winbase-file_standard_info -type fileStandardInfo struct { - AllocationSize, EndOfFile int64 - NumberOfLinks uint32 - DeletePending, Directory bool -} - -// GetFileStandardInfo retrieves ended information for the file. -func getFileStandardInfo(f *os.File) (*fileStandardInfo, error) { - si := &fileStandardInfo{} - if err := windows.GetFileInformationByHandleEx(windows.Handle(f.Fd()), - windows.FileStandardInfo, - (*byte)(unsafe.Pointer(si)), - uint32(unsafe.Sizeof(*si))); err != nil { - return nil, &os.PathError{Op: "GetFileInformationByHandleEx", Path: f.Name(), Err: err} - } - runtime.KeepAlive(f) - return si, nil -} diff --git a/internal/component/loki/source/file/internal/tail/watch/filechanges.go b/internal/component/loki/source/file/internal/tail/watch/filechanges.go deleted file mode 100644 index f80aead9ad..0000000000 --- a/internal/component/loki/source/file/internal/tail/watch/filechanges.go +++ /dev/null @@ -1,36 +0,0 @@ -package watch - -type FileChanges struct { - Modified chan bool // Channel to get notified of modifications - Truncated chan bool // Channel to get notified of truncations - Deleted chan bool // Channel to get notified of deletions/renames -} - -func NewFileChanges() *FileChanges { - return &FileChanges{ - make(chan bool, 1), make(chan bool, 1), make(chan bool, 1)} -} - -func (fc *FileChanges) NotifyModified() { - sendOnlyIfEmpty(fc.Modified) -} - -func (fc *FileChanges) NotifyTruncated() { - sendOnlyIfEmpty(fc.Truncated) -} - -func (fc *FileChanges) NotifyDeleted() { - sendOnlyIfEmpty(fc.Deleted) -} - -// sendOnlyIfEmpty sends on a bool channel only if the channel has no -// backlog to be read by other goroutines. This concurrency pattern -// can be used to notify other goroutines if and only if they are -// looking for it (i.e., subsequent notifications can be compressed -// into one). -func sendOnlyIfEmpty(ch chan bool) { - select { - case ch <- true: - default: - } -} diff --git a/internal/component/loki/source/file/internal/tail/watch/polling.go b/internal/component/loki/source/file/internal/tail/watch/polling.go deleted file mode 100644 index 688d77508a..0000000000 --- a/internal/component/loki/source/file/internal/tail/watch/polling.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright (c) 2015 HPE Software Inc. All rights reserved. -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -package watch - -import ( - "fmt" - "os" - "runtime" - "sync" - "time" - - "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/util" - "gopkg.in/tomb.v1" -) - -// PollingFileWatcher polls the file for changes. -type PollingFileWatcher struct { - File *os.File - Filename string - Size int64 - Options PollingFileWatcherOptions - mtx sync.RWMutex // protects File and Size fields -} - -// PollingFileWatcherOptions customizes a PollingFileWatcher. -type PollingFileWatcherOptions struct { - // MinPollFrequency and MaxPollFrequency specify how frequently a - // PollingFileWatcher should poll the file. - // - // PollingFileWatcher starts polling at MinPollFrequency, and will - // exponentially increase the polling frequency up to MaxPollFrequency if no - // new entries are found. The polling frequency is reset to MinPollFrequency - // whenever a new log entry is found or if the polled file changes. - MinPollFrequency, MaxPollFrequency time.Duration -} - -// DefaultPollingFileWatcherOptions holds default values for -// PollingFileWatcherOptions. -var DefaultPollingFileWatcherOptions = PollingFileWatcherOptions{ - MinPollFrequency: 250 * time.Millisecond, - MaxPollFrequency: 250 * time.Millisecond, -} - -func NewPollingFileWatcher(filename string, opts PollingFileWatcherOptions) (*PollingFileWatcher, error) { - if opts == (PollingFileWatcherOptions{}) { - opts = DefaultPollingFileWatcherOptions - } - - if opts.MinPollFrequency == 0 || opts.MaxPollFrequency == 0 { - return nil, fmt.Errorf("MinPollFrequency and MaxPollFrequency must be greater than 0") - } else if opts.MaxPollFrequency < opts.MinPollFrequency { - return nil, fmt.Errorf("MaxPollFrequency must be larger than MinPollFrequency") - } - - return &PollingFileWatcher{ - File: nil, - Filename: filename, - Size: 0, - Options: opts, - }, nil -} - -func (fw *PollingFileWatcher) BlockUntilExists(t *tomb.Tomb) error { - bo := newPollBackoff(fw.Options) - - for { - if _, err := os.Stat(fw.Filename); err == nil { - return nil - } else if !os.IsNotExist(err) { - return err - } - select { - case <-time.After(bo.WaitTime()): - bo.Backoff() - continue - case <-t.Dying(): - return tomb.ErrDying - } - } -} - -func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) { - origFi, err := os.Stat(fw.Filename) - if err != nil { - return nil, err - } - - changes := NewFileChanges() - var prevModTime time.Time - - // XXX: use tomb.Tomb to cleanly manage these goroutines. replace - // the fatal (below) with tomb's Kill. - - fw.mtx.Lock() - fw.Size = pos - fw.mtx.Unlock() - - bo := newPollBackoff(fw.Options) - - go func() { - fw.mtx.RLock() - prevSize := fw.Size - fw.mtx.RUnlock() - for { - select { - case <-t.Dying(): - return - default: - } - - time.Sleep(bo.WaitTime()) - fw.mtx.RLock() - file := fw.File - fw.mtx.RUnlock() - deletePending, err := IsDeletePending(file) - - // DeletePending is a windows state where the file has been queued - // for delete but won't actually get deleted until all handles are - // closed. It's a variation on the NotifyDeleted call below. - // - // IsDeletePending may fail in cases where the file handle becomes - // invalid, so we treat a failed call the same as a pending delete. - if err != nil || deletePending { - fw.closeFile() - changes.NotifyDeleted() - return - } - - fi, err := os.Stat(fw.Filename) - if err != nil { - // Windows cannot delete a file if a handle is still open (tail keeps one open) - // so it gives access denied to anything trying to read it until all handles are released. - if os.IsNotExist(err) || (runtime.GOOS == "windows" && os.IsPermission(err)) { - // File does not exist (has been deleted). - changes.NotifyDeleted() - return - } - - // XXX: report this error back to the user - util.Fatal("Failed to stat file %v: %v", fw.Filename, err) - } - - // File got moved/renamed? - if !os.SameFile(origFi, fi) { - changes.NotifyDeleted() - return - } - - // File got truncated? - fw.mtx.Lock() - fw.Size = fi.Size() - currentSize := fw.Size - fw.mtx.Unlock() - - if prevSize > 0 && prevSize > currentSize { - changes.NotifyTruncated() - prevSize = currentSize - bo.Reset() - continue - } - // File got bigger? - if prevSize > 0 && prevSize < currentSize { - changes.NotifyModified() - prevSize = currentSize - bo.Reset() - continue - } - prevSize = currentSize - - // File was appended to (changed)? - modTime := fi.ModTime() - if modTime != prevModTime { - prevModTime = modTime - changes.NotifyModified() - bo.Reset() - continue - } - - // File hasn't changed; increase backoff for next sleep. - bo.Backoff() - } - }() - - return changes, nil -} - -func (fw *PollingFileWatcher) SetFile(f *os.File) { - fw.mtx.Lock() - fw.File = f - fw.mtx.Unlock() -} - -func (fw *PollingFileWatcher) closeFile() { - fw.mtx.Lock() - if fw.File != nil { - _ = fw.File.Close() // Best effort close - } - fw.mtx.Unlock() -} - -type pollBackoff struct { - current time.Duration - opts PollingFileWatcherOptions -} - -func newPollBackoff(opts PollingFileWatcherOptions) *pollBackoff { - return &pollBackoff{ - current: opts.MinPollFrequency, - opts: opts, - } -} - -func (pb *pollBackoff) WaitTime() time.Duration { - return pb.current -} - -func (pb *pollBackoff) Reset() { - pb.current = pb.opts.MinPollFrequency -} - -func (pb *pollBackoff) Backoff() { - pb.current = pb.current * 2 - if pb.current > pb.opts.MaxPollFrequency { - pb.current = pb.opts.MaxPollFrequency - } -} diff --git a/internal/component/loki/source/file/internal/tail/watch/watch.go b/internal/component/loki/source/file/internal/tail/watch/watch.go deleted file mode 100644 index 18d3045668..0000000000 --- a/internal/component/loki/source/file/internal/tail/watch/watch.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2015 HPE Software Inc. All rights reserved. -// Copyright (c) 2013 ActiveState Software Inc. All rights reserved. - -package watch - -import ( - "gopkg.in/tomb.v1" - "os" -) - -// FileWatcher monitors file-level events. -type FileWatcher interface { - // BlockUntilExists blocks until the file comes into existence. - BlockUntilExists(*tomb.Tomb) error - - // ChangeEvents reports on changes to a file, be it modification, - // deletion, renames or truncations. Returned FileChanges group of - // channels will be closed, thus become unusable, after a deletion - // or truncation event. - // In order to properly report truncations, ChangeEvents requires - // the caller to pass their current offset in the file. - ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error) - - SetFile(f *os.File) -} diff --git a/internal/component/loki/source/file/tailer.go b/internal/component/loki/source/file/tailer.go index 9e89ba3e17..886784ebcf 100644 --- a/internal/component/loki/source/file/tailer.go +++ b/internal/component/loki/source/file/tailer.go @@ -22,7 +22,6 @@ import ( "github.com/grafana/alloy/internal/component/common/loki" "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail" - "github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/watch" "github.com/grafana/alloy/internal/component/loki/source/internal/positions" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" @@ -40,9 +39,7 @@ type tailer struct { tailFromEnd bool onPositionsFileError OnPositionsFileError - pollOptions watch.PollingFileWatcherOptions - - posAndSizeMtx sync.Mutex + watcherConfig tail.WatcherConfig running *atomic.Bool @@ -50,7 +47,7 @@ type tailer struct { report sync.Once - tail *tail.Tail + file *tail.File decoder *encoding.Decoder } @@ -74,12 +71,12 @@ func newTailer( receiver: receiver, positions: pos, key: positions.Entry{Path: opts.path, Labels: opts.labels.String()}, - labels: opts.labels, + labels: opts.labels.Merge(model.LabelSet{labelFilename: model.LabelValue(opts.path)}), running: atomic.NewBool(false), tailFromEnd: opts.tailFromEnd, legacyPositionUsed: opts.legacyPositionUsed, onPositionsFileError: opts.onPositionsFileError, - pollOptions: watch.PollingFileWatcherOptions{ + watcherConfig: tail.WatcherConfig{ MinPollFrequency: opts.fileWatch.MinPollFrequency, MaxPollFrequency: opts.fileWatch.MaxPollFrequency, }, @@ -155,8 +152,7 @@ func (t *tailer) Run(ctx context.Context) { default: } - handler, err := t.initRun() - + err := t.initRun() if err != nil { // We are retrying tailers until the target has disappeared. // We are mostly interested in this log if this happens directly when @@ -166,7 +162,6 @@ func (t *tailer) Run(ctx context.Context) { }) return } - defer handler.Stop() // We call report so that retries won't log. t.report.Do(func() {}) @@ -177,7 +172,7 @@ func (t *tailer) Run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) go func() { // readLines closes done on exit - t.readLines(handler, done) + t.readLines(done) cancel() }() @@ -188,21 +183,21 @@ func (t *tailer) Run(ctx context.Context) { t.stop(done) } -func (t *tailer) initRun() (loki.EntryHandler, error) { +func (t *tailer) initRun() error { fi, err := os.Stat(t.key.Path) if err != nil { - return nil, fmt.Errorf("failed to tail file: %w", err) + return fmt.Errorf("failed to tail file: %w", err) } pos, err := t.positions.Get(t.key.Path, t.key.Labels) if err != nil { switch t.onPositionsFileError { case OnPositionsFileErrorSkip: - return nil, fmt.Errorf("failed to get file position: %w", err) + return fmt.Errorf("failed to get file position: %w", err) case OnPositionsFileErrorRestartEnd: pos, err = getLastLinePosition(t.key.Path) if err != nil { - return nil, fmt.Errorf("failed to get last line position after positions error: %w", err) + return fmt.Errorf("failed to get last line position after positions error: %w", err) } level.Info(t.logger).Log("msg", "retrieved the position of the last line after positions error") default: @@ -219,7 +214,7 @@ func (t *tailer) initRun() (loki.EntryHandler, error) { if pos == 0 && t.legacyPositionUsed { pos, err = t.positions.Get(t.key.Path, "{}") if err != nil { - return nil, fmt.Errorf("failed to get file position with empty labels: %w", err) + return fmt.Errorf("failed to get file position with empty labels: %w", err) } } @@ -243,23 +238,20 @@ func (t *tailer) initRun() (loki.EntryHandler, error) { } } - tail, err := tail.TailFile(t.key.Path, tail.Config{ - Location: &tail.SeekInfo{Offset: pos, Whence: 0}, - Logger: t.logger, - PollOptions: t.pollOptions, - Decoder: t.decoder, + tail, err := tail.NewFile(t.logger, &tail.Config{ + Filename: t.key.Path, + Offset: pos, + Decoder: t.decoder, + WatcherConfig: t.watcherConfig, }) if err != nil { - return nil, fmt.Errorf("failed to tail the file: %w", err) + return fmt.Errorf("failed to tail the file: %w", err) } - t.tail = tail + t.file = tail - labelsMiddleware := t.labels.Merge(model.LabelSet{labelFilename: model.LabelValue(t.key.Path)}) - handler := loki.AddLabelsMiddleware(labelsMiddleware).Wrap(loki.NewEntryHandler(t.receiver.Chan(), func() {})) - - return handler, nil + return nil } func getDecoder(encoding string) (*encoding.Decoder, error) { @@ -274,120 +266,63 @@ func getDecoder(encoding string) (*encoding.Decoder, error) { return encoder.NewDecoder(), nil } -// updatePosition is run in a goroutine and checks the current size of the file -// and saves it to the positions file at a regular interval. If there is ever -// an error it stops the tailer and exits, the tailer will be re-opened by the -// backoff retry method if it still exists and will start reading from the -// last successful entry in the positions file. -func (t *tailer) updatePosition(posquit chan struct{}) { - positionSyncPeriod := t.positions.SyncPeriod() - positionWait := time.NewTicker(positionSyncPeriod) - defer func() { - positionWait.Stop() - level.Info(t.logger).Log("msg", "position timer: exited", "path", t.key.Path) - // NOTE: metrics must be cleaned up after the position timer exits, as markPositionAndSize() updates metrics. - t.cleanupMetrics() - }() - - for { - select { - case <-positionWait.C: - err := t.markPositionAndSize() - if err != nil { - level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.key.Path, "error", err) - err := t.tail.Stop() - if err != nil { - level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.key.Path, "error", err) - } - return - } - case <-posquit: - return - } - } -} - -// readLines consumes the t.tail.Lines channel from the -// underlying tailer. It will only exit when that channel is closed. This is -// important to avoid a deadlock in the underlying tailer which can happen if -// there are unread lines in this channel and the Stop method on the tailer is -// called, the underlying tailer will never exit if there are unread lines in -// the t.tail.Lines channel -func (t *tailer) readLines(handler loki.EntryHandler, done chan struct{}) { +// readLines reads lines from the tailed file by calling Next() in a loop. +// It processes each line by sending it to the receiver's channel and updates +// position tracking periodically. It exits when Next() returns an error, +// this happens when the tail.File is stopped or or we have a unrecoverable error. +func (t *tailer) readLines(done chan struct{}) { level.Info(t.logger).Log("msg", "tail routine: started", "path", t.key.Path) + var ( + entries = t.receiver.Chan() + lastOffset = int64(0) + positionInterval = t.positions.SyncPeriod() + lastUpdatedPosition = time.Time{} + ) - posquit, posdone := make(chan struct{}), make(chan struct{}) - go func() { - t.updatePosition(posquit) - close(posdone) - }() - - // This function runs in a goroutine, if it exits this tailer will never do any more tailing. - // Clean everything up. defer func() { level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.key.Path) - // Shut down the position marker thread - close(posquit) - <-posdone + size, _ := t.file.Size() + t.updateStats(lastOffset, size) close(done) }() - entries := handler.Chan() for { - line, ok := <-t.tail.Lines - if !ok { - level.Info(t.logger).Log("msg", "tail routine: tail channel closed, stopping tailer", "path", t.key.Path, "reason", t.tail.Tomb.Err()) + line, err := t.file.Next() + if err != nil { + // We get context.Canceled if tail.File was stopped so we don't have to log it. + if !errors.Is(err, context.Canceled) { + level.Error(t.logger).Log("msg", "tail routine: stopping tailer", "path", t.key.Path, "err", err) + } return } t.metrics.readLines.WithLabelValues(t.key.Path).Inc() entries <- loki.Entry{ - // Allocate the expected size of labels. This matches the number of labels added by the middleware - // as configured in initRun(). - Labels: make(model.LabelSet, len(t.labels)+1), + Labels: t.labels, Entry: push.Entry{ Timestamp: line.Time, Line: line.Text, }, } - } -} -func (t *tailer) markPositionAndSize() error { - // Lock this update because it can be called in two different goroutines - t.posAndSizeMtx.Lock() - defer t.posAndSizeMtx.Unlock() - - size, err := t.tail.Size() - if err != nil { - // If the file no longer exists, no need to save position information - if err == os.ErrNotExist { - level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist", "path", t.key.Path) - return nil + lastOffset = line.Offset + if time.Since(lastUpdatedPosition) >= positionInterval { + lastUpdatedPosition = time.Now() + size, _ := t.file.Size() + t.updateStats(lastOffset, size) } - return err - } - - pos, err := t.tail.Tell() - if err != nil { - return err } +} +func (t *tailer) updateStats(offset int64, size int64) { // Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped. t.metrics.totalBytes.WithLabelValues(t.key.Path).Set(float64(size)) - t.metrics.readBytes.WithLabelValues(t.key.Path).Set(float64(pos)) - t.positions.Put(t.key.Path, t.key.Labels, pos) - - return nil + t.metrics.readBytes.WithLabelValues(t.key.Path).Set(float64(offset)) + t.positions.Put(t.key.Path, t.key.Labels, offset) } func (t *tailer) stop(done chan struct{}) { - // Save the current position before shutting down tailer to ensure that if the file is tailed again - // it start where it left off. - if err := t.markPositionAndSize(); err != nil { - level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.key.Path, "error", err) - } - if err := t.tail.Stop(); err != nil { + if err := t.file.Stop(); err != nil { if util.IsEphemeralOrFileClosed(err) { // Don't log as error if the file is already closed, or we got an ephemeral error - it's a common case // when files are rotating while being read and the tailer would have stopped correctly anyway. @@ -405,6 +340,9 @@ func (t *tailer) stop(done chan struct{}) { level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.key.Path) + // We need to cleanup created metrics + t.cleanupMetrics() + // If the component is not stopping, then it means that the target for this component is gone and that // we should clear the entry from the positions file. if !t.componentStopping() { diff --git a/internal/component/loki/source/file/tailer_test.go b/internal/component/loki/source/file/tailer_test.go index 106e722124..03e8f4db2f 100644 --- a/internal/component/loki/source/file/tailer_test.go +++ b/internal/component/loki/source/file/tailer_test.go @@ -354,11 +354,8 @@ func TestTailerCorruptedPositions(t *testing.T) { close(done) }() - require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.True(c, tailer.IsRunning()) - assert.Equal(c, "16", positionsFile.GetString(logFile.Name(), labels.String())) - }, time.Second, 50*time.Millisecond) - + // tailer needs some time to start + time.Sleep(50 * time.Millisecond) _, err = logFile.Write([]byte("writing some text\n")) require.NoError(t, err) select {