Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e36f3b6
tailerv2: implement first iteration
kalleep Dec 2, 2025
e11be39
Refactor to handle wait in Next
kalleep Dec 2, 2025
06a306a
Use tailerv2
kalleep Dec 2, 2025
53a4759
Rename structure to file
kalleep Dec 2, 2025
73829d1
remove old package and rename
kalleep Dec 2, 2025
4b66c5c
Remove watcher and have free standing blocking functions
kalleep Dec 2, 2025
957eae8
remove unused metrics and cleanup metrics when stopped
kalleep Dec 3, 2025
f92e036
Add comments
kalleep Dec 4, 2025
c49f3f9
remove legacy build tags
kalleep Dec 4, 2025
42402cf
Update comment
kalleep Dec 4, 2025
6bda270
remove unused function
kalleep Dec 4, 2025
2fca395
Remove usage of Entry handler and set labels directly, this is done to
kalleep Dec 4, 2025
4b3e1b2
Fix comment and update log level
kalleep Dec 4, 2025
2289416
remove sleep in test
kalleep Dec 4, 2025
75a4a40
fix issue where file got deleted before next was called and blocking is
kalleep Dec 4, 2025
b68fa13
use exported function to check for error
kalleep Dec 4, 2025
4b519a0
Update internal/component/loki/source/file/internal/tail/file_test.go
kalleep Dec 4, 2025
87132c0
Update internal/component/loki/source/file/internal/tail/file.go
kalleep Dec 4, 2025
b299b70
Update comment
kalleep Dec 4, 2025
20228d3
If tail.File have been stopped all calls to Next are now returning
kalleep Dec 4, 2025
8faf64e
update comment
kalleep Dec 4, 2025
e868389
stat open file so we get correct comparison
kalleep Dec 5, 2025
ef75b26
Add tests
kalleep Dec 5, 2025
295243f
add tests for blockUntilExists
kalleep Dec 5, 2025
2cbf1ee
Open file with correct flags on windows
kalleep Dec 8, 2025
9e7e6b3
trim both carriage return and newline
kalleep Dec 10, 2025
8666510
Use configured watcher config
kalleep Dec 10, 2025
9f08d05
remove unused dependency
kalleep Dec 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ require (
google.golang.org/api v0.254.0
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools v2.2.0+incompatible
Expand Down
32 changes: 0 additions & 32 deletions internal/component/loki/source/file/internal/tail/README.md

This file was deleted.

117 changes: 117 additions & 0 deletions internal/component/loki/source/file/internal/tail/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package tail

import (
"context"
"os"
"runtime"

"github.com/grafana/dskit/backoff"

"github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext"
)

// blockUntilExists blocks until the file specified in cfg exists or the context is canceled.
// It polls the file system at intervals defined by WatcherConfig polling frequencies.
// Returns an error if the context is canceled or an unrecoverable error occurs.
func blockUntilExists(ctx context.Context, cfg *Config) error {
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: cfg.WatcherConfig.MinPollFrequency,
MaxBackoff: cfg.WatcherConfig.MaxPollFrequency,
})

for backoff.Ongoing() {
if _, err := os.Stat(cfg.Filename); err == nil {
return nil
} else if !os.IsNotExist(err) {
return err
}
backoff.Wait()
}

return backoff.Err()
}

// event represents a file system event detected during polling.
type event int

const (
eventNone event = iota // no event detected
eventTruncated // file was truncated (size decreased)
eventModified // file was modified (size increased or modification time changed)
eventDeleted // file was deleted, moved, or renamed
)

// blockUntilEvent blocks until it detects a file system event for the given file or the context is canceled.
// It polls the file system to detect modifications, truncations, deletions, or renames.
// The pos parameter is the current file position and is used to detect truncation events.
// Returns the detected event type and any error encountered. Returns eventNone if the context is canceled.
func blockUntilEvent(ctx context.Context, f *os.File, prevSize int64, cfg *Config) (event, error) {
// NOTE: it is important that we stat the open file here. Later we do os.Stat(cfg.Filename)
// and use os.IsSameFile to detect if file was rotated.
origFi, err := f.Stat()
if err != nil {
// If file no longer exists we treat it as a delete event.
if os.IsNotExist(err) {
return eventDeleted, nil
}
return eventNone, err
}

backoff := backoff.New(ctx, backoff.Config{
MinBackoff: cfg.WatcherConfig.MinPollFrequency,
MaxBackoff: cfg.WatcherConfig.MaxPollFrequency,
})

prevModTime := origFi.ModTime()

for backoff.Ongoing() {
deletePending, err := fileext.IsDeletePending(f)

// DeletePending is a windows state where the file has been queued
// for delete but won't actually get deleted until all handles are
// closed. It's a variation on the NotifyDeleted call below.
//
// IsDeletePending may fail in cases where the file handle becomes
// invalid, so we treat a failed call the same as a pending delete.
if err != nil || deletePending {
return eventDeleted, nil
}

fi, err := os.Stat(cfg.Filename)
if err != nil {
// Windows cannot delete a file if a handle is still open (tail keeps one open)
// so it gives access denied to anything trying to read it until all handles are released.
if os.IsNotExist(err) || (runtime.GOOS == "windows" && os.IsPermission(err)) {
// File does not exist (has been deleted).
return eventDeleted, nil
}
return eventNone, err
}

// File got moved/renamed?
if !os.SameFile(origFi, fi) {
return eventDeleted, nil
}

// File got truncated?
currentSize := fi.Size()
if prevSize > 0 && prevSize > currentSize {
return eventTruncated, nil
}

// File got bigger?
if prevSize < currentSize {
return eventModified, nil
}

// File was appended to (changed)?
if fi.ModTime() != prevModTime {
return eventModified, nil
}

// File hasn't changed so wait until next retry.
backoff.Wait()
}

return eventNone, backoff.Err()
}
189 changes: 189 additions & 0 deletions internal/component/loki/source/file/internal/tail/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package tail

import (
"context"
"io"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component/loki/source/file/internal/tail/fileext"
)

func TestBlockUntilExists(t *testing.T) {
watcherConfig := WatcherConfig{
MinPollFrequency: 5 * time.Millisecond,
MaxPollFrequency: 5 * time.Millisecond,
}

t.Run("should block until file exists", func(t *testing.T) {
filename := filepath.Join(t.TempDir(), "eventually")

go func() {
time.Sleep(10 * time.Millisecond)
createFileWithPath(t, filename, "")
}()

err := blockUntilExists(context.Background(), &Config{
Filename: filename,
WatcherConfig: watcherConfig,
})
require.NoError(t, err)
})

t.Run("should exit when context is canceled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()

err := blockUntilExists(ctx, &Config{
Filename: filepath.Join(t.TempDir(), "never"),
WatcherConfig: watcherConfig,
})
require.ErrorIs(t, err, context.Canceled)
})
}

func TestBlockUntilEvent(t *testing.T) {
watcherConfig := WatcherConfig{
MinPollFrequency: 5 * time.Millisecond,
MaxPollFrequency: 5 * time.Millisecond,
}

t.Run("should return modified event when file is written to", func(t *testing.T) {
f := createEmptyFile(t, "startempty")
defer f.Close()

go func() {
time.Sleep(10 * time.Millisecond)
_, err := f.WriteString("updated")
require.NoError(t, err)
}()

event, err := blockUntilEvent(context.Background(), f, 0, &Config{
Filename: f.Name(),
WatcherConfig: watcherConfig,
})
require.NoError(t, err)
require.Equal(t, eventModified, event)
})

t.Run("should return modified event if mod time is updated", func(t *testing.T) {
f := createEmptyFile(t, "startempty")
defer f.Close()

go func() {
time.Sleep(10 * time.Millisecond)
require.NoError(t, os.Chtimes(f.Name(), time.Now(), time.Now()))
}()

event, err := blockUntilEvent(context.Background(), f, 0, &Config{
Filename: f.Name(),
WatcherConfig: watcherConfig,
})
require.NoError(t, err)
require.Equal(t, eventModified, event)
})

t.Run("should return deleted event if file is deleted", func(t *testing.T) {
f := createEmptyFile(t, "startempty")
require.NoError(t, f.Close())

// NOTE: important for windows that we open with correct flags.
f, err := fileext.OpenFile(f.Name())
require.NoError(t, err)
defer f.Close()

go func() {
time.Sleep(10 * time.Millisecond)
removeFile(t, f.Name())
}()

event, err := blockUntilEvent(context.Background(), f, 0, &Config{
Filename: f.Name(),
WatcherConfig: watcherConfig,
})
require.NoError(t, err)
require.Equal(t, eventDeleted, event)
})

t.Run("should return deleted event if file is deleted before", func(t *testing.T) {
f := createEmptyFile(t, "startempty")
require.NoError(t, f.Close())

// NOTE: important for windows that we open with correct flags.
f, err := fileext.OpenFile(f.Name())
require.NoError(t, err)
defer f.Close()

removeFile(t, f.Name())

event, err := blockUntilEvent(context.Background(), f, 0, &Config{
Filename: f.Name(),
WatcherConfig: watcherConfig,
})
require.NoError(t, err)
require.Equal(t, eventDeleted, event)
})

t.Run("should return truncated event", func(t *testing.T) {
f := createFileWithContent(t, "truncate", "content")
defer f.Close()

offset, err := f.Seek(0, io.SeekEnd)
require.NoError(t, err)

go func() {
time.Sleep(10 * time.Millisecond)
require.NoError(t, f.Truncate(0))
}()

event, err := blockUntilEvent(context.Background(), f, offset, &Config{
Filename: f.Name(),
WatcherConfig: watcherConfig,
})
require.NoError(t, err)
require.Equal(t, eventTruncated, event)
})

t.Run("should exit when context is canceled", func(t *testing.T) {
f := createEmptyFile(t, "startempty")
defer f.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()

event, err := blockUntilEvent(ctx, f, 0, &Config{
Filename: f.Name(),
WatcherConfig: watcherConfig,
})
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, eventNone, event)
})
}

func createEmptyFile(t *testing.T, name string) *os.File {
path := filepath.Join(t.TempDir(), name)
f, err := os.Create(path)
require.NoError(t, err)
return f
}

func createFileWithContent(t *testing.T, name, content string) *os.File {
path := createFile(t, name, content)
f, err := os.OpenFile(path, os.O_RDWR, 0)
require.NoError(t, err)
return f
}

func createFileWithPath(t *testing.T, path, content string) {
require.NoError(t, os.WriteFile(path, []byte(content), 0600))
}
41 changes: 41 additions & 0 deletions internal/component/loki/source/file/internal/tail/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tail

import (
"time"

"golang.org/x/text/encoding"
)

// Config holds configuration for tailing a file.
type Config struct {
// Filename is the path to the file to tail.
Filename string
// Offset is the byte offset in the file where tailing should start.
// If 0, tailing starts from the beginning of the file.
Offset int64

// Decoder is an optional text decoder for non-UTF-8 encoded files.
// If the file is not UTF-8, the tailer must use the correct decoder
// or the output text may be corrupted. For example, if the file is
// "UTF-16 LE" encoded, the tailer would not separate new lines properly
// and the output could appear as garbled characters.
Decoder *encoding.Decoder

// WatcherConfig controls how the file system is polled for changes.
WatcherConfig WatcherConfig
}

// WatcherConfig controls the polling behavior for detecting file system events.
type WatcherConfig struct {
// MinPollFrequency and MaxPollFrequency specify the polling frequency range
// for detecting file system events. The actual polling frequency will vary
// within this range based on backoff behavior.
MinPollFrequency, MaxPollFrequency time.Duration
}

// defaultWatcherConfig holds the default polling configuration used when
// WatcherConfig is not explicitly provided in Config.
var defaultWatcherConfig = WatcherConfig{
MinPollFrequency: 250 * time.Millisecond,
MaxPollFrequency: 250 * time.Millisecond,
}
Loading
Loading