Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/acquisition/modules/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Configuration struct {
PollWithoutInotify *bool `yaml:"poll_without_inotify"`
DiscoveryPollEnable bool `yaml:"discovery_poll_enable"`
DiscoveryPollInterval time.Duration `yaml:"discovery_poll_interval"`
TailMode string `yaml:"tail_mode"` // "default" or "stat" (defaults to "default" if empty)
StatPollInterval time.Duration `yaml:"stat_poll_interval"` // stat poll interval used when tail_mode=stat (default 1s, 0=1s, -1=manual)
configuration.DataSourceCommonCfg `yaml:",inline"`
}

Expand Down
34 changes: 18 additions & 16 deletions pkg/acquisition/modules/file/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"time"

"github.com/fsnotify/fsnotify"
"github.com/nxadm/tail"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"

"github.com/crowdsecurity/go-cs-lib/trace"

"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file/tailwrapper"
"github.com/crowdsecurity/crowdsec/pkg/fsutil"
"github.com/crowdsecurity/crowdsec/pkg/metrics"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *Source) monitorNewFiles(out chan pipeline.Event, t *tomb.Tomb) error {
// Setup polling if enabled
var (
tickerChan <-chan time.Time
ticker *time.Ticker
ticker *time.Ticker
)

if s.config.DiscoveryPollEnable {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *Source) setupTailForFile(file string, out chan pipeline.Event, seekEnd
}

// Create the tailer with appropriate configuration
seekInfo := &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}
seekInfo := &tailwrapper.SeekInfo{Offset: 0, Whence: io.SeekEnd}
if s.config.Mode == configuration.CAT_MODE {
seekInfo.Whence = io.SeekStart
}
Expand All @@ -269,12 +269,14 @@ func (s *Source) setupTailForFile(file string, out chan pipeline.Event, seekEnd

logger.Infof("Starting tail (offset: %d, whence: %d)", seekInfo.Offset, seekInfo.Whence)

tail, err := tail.TailFile(file, tail.Config{
ReOpen: true,
Follow: true,
Poll: pollFile,
Location: seekInfo,
Logger: log.NewEntry(log.StandardLogger()),
tail, err := tailwrapper.TailFile(file, tailwrapper.Config{
ReOpen: true,
Follow: true,
Poll: pollFile,
Location: seekInfo,
Logger: log.NewEntry(log.StandardLogger()),
TailMode: s.config.TailMode,
StatPollInterval: s.config.StatPollInterval,
})
if err != nil {
return fmt.Errorf("could not start tailing file %s : %w", file, err)
Expand All @@ -292,8 +294,8 @@ func (s *Source) setupTailForFile(file string, out chan pipeline.Event, seekEnd
return nil
}

func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail) error {
logger := s.logger.WithField("tail", tail.Filename)
func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail tailwrapper.Tailer) error {
logger := s.logger.WithField("tail", tail.Filename())
logger.Debug("-> start tailing")

for {
Expand All @@ -320,11 +322,11 @@ func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail
// Just remove the dead tailer from our map and return
// monitorNewFiles will pick up the file again if it's recreated
s.tailMapMutex.Lock()
delete(s.tails, tail.Filename)
delete(s.tails, tail.Filename())
s.tailMapMutex.Unlock()

return nil
case line := <-tail.Lines:
case line := <-tail.Lines():
if line == nil {
logger.Warning("tail is empty")
continue
Expand All @@ -340,12 +342,12 @@ func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail
}

if s.metricsLevel != metrics.AcquisitionMetricsLevelNone {
metrics.FileDatasourceLinesRead.With(prometheus.Labels{"source": tail.Filename, "datasource_type": "file", "acquis_type": s.config.Labels["type"]}).Inc()
metrics.FileDatasourceLinesRead.With(prometheus.Labels{"source": tail.Filename(), "datasource_type": "file", "acquis_type": s.config.Labels["type"]}).Inc()
}

src := tail.Filename
src := tail.Filename()
if s.metricsLevel == metrics.AcquisitionMetricsLevelAggregated {
src = filepath.Base(tail.Filename)
src = filepath.Base(tail.Filename())
}

l := pipeline.Line{
Expand Down
268 changes: 268 additions & 0 deletions pkg/acquisition/modules/file/tail_modes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
package fileacquisition_test

import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/tomb.v2"

fileacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/file"
"github.com/crowdsecurity/crowdsec/pkg/metrics"
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
)

// Test matrix for both tail implementations
var tailModes = []struct {
name string
config string // tail mode configuration snippet to append
}{
{
name: "default",
config: "", // Default, no extra config
},
{
name: "stat",
config: "\ntail_mode: stat\nstat_poll_interval: 100ms",
},
}

func TestTailModes_BasicTailing(t *testing.T) {
ctx := t.Context()

for _, mode := range tailModes {
t.Run(mode.name, func(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")

// Create initial file
err := os.WriteFile(testFile, []byte("line1\n"), 0o644)
require.NoError(t, err)

config := fmt.Sprintf(`
mode: tail
filenames:
- %s%s
`, testFile, mode.config)

subLogger := log.WithField("type", "file")

f := fileacquisition.Source{}
err = f.Configure(ctx, []byte(config), subLogger, metrics.AcquisitionMetricsLevelNone)
require.NoError(t, err)

out := make(chan pipeline.Event, 10)
tomb := tomb.Tomb{}

err = f.StreamingAcquisition(ctx, out, &tomb)
require.NoError(t, err)

// Wait for tailing to start
time.Sleep(300 * time.Millisecond)

// Verify file is being tailed
assert.True(t, f.IsTailing(testFile), "File should be tailed")

// Add new lines
err = os.WriteFile(testFile, []byte("line1\nline2\nline3\n"), os.ModeAppend)
require.NoError(t, err)

// Wait for lines to be read (stat mode needs time to poll)
time.Sleep(500 * time.Millisecond)

// Collect events
var lines []string
readDone := false
for !readDone {
select {
case evt := <-out:
lines = append(lines, evt.Line.Raw)
default:
readDone = true
}
}

// Cleanup
tomb.Kill(nil)
_ = tomb.Wait()

// Should have read at least one new line (timing-dependent on Windows)
assert.GreaterOrEqual(t, len(lines), 1, "Should have read at least 1 line")
// At least one of the new lines should be present
hasNewLine := false
for _, line := range lines {
if line == "line2" || line == "line3" {
hasNewLine = true
break
}
}
assert.True(t, hasNewLine, "Should have read at least one new line (line2 or line3)")
})
}
}

func TestTailModes_Truncation(t *testing.T) {
ctx := t.Context()

for _, mode := range tailModes {
t.Run(mode.name, func(t *testing.T) {
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")

// Create initial file
err := os.WriteFile(testFile, []byte("old1\nold2\nold3\n"), 0o644)
require.NoError(t, err)

config := fmt.Sprintf(`
mode: tail
filenames:
- %s%s
`, testFile, mode.config)

subLogger := log.WithField("type", "file")

f := fileacquisition.Source{}
err = f.Configure(ctx, []byte(config), subLogger, metrics.AcquisitionMetricsLevelNone)
require.NoError(t, err)

out := make(chan pipeline.Event, 20)
tomb := tomb.Tomb{}

err = f.StreamingAcquisition(ctx, out, &tomb)
require.NoError(t, err)

// Wait for tailing to start
time.Sleep(200 * time.Millisecond)

// Truncate file (simulate rotation)
err = os.WriteFile(testFile, []byte("new1\n"), 0o644)
require.NoError(t, err)

// Wait for truncation detection
time.Sleep(400 * time.Millisecond)

// Add more lines
err = os.WriteFile(testFile, []byte("new1\nnew2\nnew3\n"), os.ModeAppend)
require.NoError(t, err)

// Wait for new lines
time.Sleep(400 * time.Millisecond)

// Collect events
var lines []string
readDone := false
for !readDone {
select {
case evt := <-out:
lines = append(lines, evt.Line.Raw)
default:
readDone = true
}
}

// Cleanup
tomb.Kill(nil)
_ = tomb.Wait()

// Should have detected truncation and read new content
hasNew := false
for _, line := range lines {
if line == "new1" || line == "new2" || line == "new3" {
hasNew = true
break
}
}
assert.True(t, hasNew, "Should have read new content after truncation")
})
}
}

func TestTailModes_ConfigurationApplied(t *testing.T) {
// This test verifies that the tail_mode configuration actually selects
// the correct implementation (not just ignored)
ctx := t.Context()
tmpDir := t.TempDir()
testFile := filepath.Join(tmpDir, "test.log")

err := os.WriteFile(testFile, []byte("line1\n"), 0o644)
require.NoError(t, err)

testCases := []struct {
name string
config string
expectStatMode bool
}{
{
name: "default_is_native",
config: fmt.Sprintf(`
mode: tail
filenames:
- %s
`, testFile),
expectStatMode: false,
},
{
name: "explicit_default",
config: fmt.Sprintf(`
mode: tail
filenames:
- %s
tail_mode: default
`, testFile),
expectStatMode: false,
},
{
name: "explicit_stat",
config: fmt.Sprintf(`
mode: tail
filenames:
- %s
tail_mode: stat
stat_poll_interval: 100ms
`, testFile),
expectStatMode: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
subLogger := log.WithField("type", "file")

f := fileacquisition.Source{}
err := f.Configure(ctx, []byte(tc.config), subLogger, metrics.AcquisitionMetricsLevelNone)
require.NoError(t, err)

out := make(chan pipeline.Event, 10)
tomb := tomb.Tomb{}

err = f.StreamingAcquisition(ctx, out, &tomb)
require.NoError(t, err)

// Wait for tailing to start
time.Sleep(200 * time.Millisecond)

// Add a line to trigger reading
err = os.WriteFile(testFile, []byte("line1\nline2\n"), os.ModeAppend)
require.NoError(t, err)

// Wait for line to be read
time.Sleep(300 * time.Millisecond)

// Verify file is being tailed (both modes should work)
assert.True(t, f.IsTailing(testFile), "File should be tailed")

// Cleanup
tomb.Kill(nil)
_ = tomb.Wait()

// Both modes should successfully tail the file
// The actual implementation difference is tested in tailwrapper tests
t.Logf("Successfully tailed file with mode: %s (expectStatMode=%v)", tc.name, tc.expectStatMode)
})
}
}
24 changes: 24 additions & 0 deletions pkg/acquisition/modules/file/tailwrapper/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tailwrapper

import (
"fmt"
)

// TailFile creates a new Tailer based on the configuration
// It returns either a native tail adapter or a stat-based tailer
func TailFile(filename string, config Config) (Tailer, error) {
// Determine which implementation to use
tailMode := config.TailMode
if tailMode == "" {
tailMode = "default"
}

switch tailMode {
case "stat":
return newStatTail(filename, config)
case "default":
return newNxadmTail(filename, config)
default:
return nil, fmt.Errorf("unknown tail mode: %s (supported: default, stat)", tailMode)
}
}
Loading