diff --git a/internal/impl/io/input_file_tail.go b/internal/impl/io/input_file_tail.go new file mode 100644 index 0000000000..7052d01f0e --- /dev/null +++ b/internal/impl/io/input_file_tail.go @@ -0,0 +1,395 @@ +package io + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os" + "strconv" + "time" + + "github.com/Jeffail/shutdown" + "github.com/warpstreamlabs/bento/public/service" +) + +const ( + ftiFieldPath = "path" + ftiFieldPollInterval = "poll_interval" + ftiFieldMaxLineBufferSize = "line_buffer" + ftiFieldStartPosition = "start_position" +) + +func fileTailInputSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Local"). + Summary("Tails a file"). + Description(` +Reads from a file continuously with handling of log rotation. + +### Metadata + +This input adds the following metadata fields to each message: + +` + "```text" + ` +- file_tail_path +- file_tail_position +` + "```" + ` + +You can access these metadata fields using +[function interpolation](/docs/configuration/interpolation#bloblang-queries). + `). + Field(service.NewStringField(ftiFieldPath). + Description("Path to file to consume lines from."). + Example("./folder/file.log")). + Field(service.NewDurationField(ftiFieldPollInterval). + Description("The time between checks for new lines."). + Default("1s"). + Example("1s"). + Example("200ms")). + Field(service.NewIntField(ftiFieldMaxLineBufferSize). + Description("Number of lines to buffer internally before backpressure is applied."). + Default(1000). + Advanced()). + Field(service.NewStringEnumField(ftiFieldStartPosition, []string{"start", "end"}...). + Description("Where to begin reading the file from. `start` will consume all existing lines in the file, `end` will only consume new lines after input has started."). + Default("start")). + Field(service.NewAutoRetryNacksToggleField()) +} + +func init() { + err := service.RegisterInput( + "file_tail", + fileTailInputSpec(), + func(pConf *service.ParsedConfig, mgr *service.Resources) (in service.Input, err error) { + in, err = NewFileTailInput(pConf, mgr) + if err != nil { + return nil, err + } + return service.AutoRetryNacksToggled(pConf, in) + }) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +type FileTailInput struct { + tail tail + + logger *service.Logger + shutSig *shutdown.Signaller +} + +func NewFileTailInput(pConf *service.ParsedConfig, mgr *service.Resources) (*FileTailInput, error) { + path, err := pConf.FieldString(ftiFieldPath) + if err != nil { + return nil, err + } + + pollInterval, err := pConf.FieldDuration(ftiFieldPollInterval) + if err != nil { + return nil, err + } + + lineBufferSize, err := pConf.FieldInt(ftiFieldMaxLineBufferSize) + if err != nil { + return nil, err + } + + startPosition, err := pConf.FieldString(ftiFieldStartPosition) + if err != nil { + return nil, err + } + + tailOpts := []tailOpt{ + withPollInterval(pollInterval), + withLineChanBufferSize(lineBufferSize), + withStartPosition(startPosition), + withLogger(mgr.Logger()), + } + + tail, err := newTail(path, tailOpts...) + if err != nil { + return nil, err + } + + return &FileTailInput{ + tail: tail, + logger: mgr.Logger(), + shutSig: shutdown.NewSignaller(), + }, nil +} + +func (fti *FileTailInput) Connect(ctx context.Context) error { + if fti.shutSig.IsHardStopSignalled() { + return nil + } + + ctx, cancel := context.WithCancel(ctx) + fti.tail.cancel = cancel + + go fti.tail.watch(ctx) + + return nil +} + +func (fti *FileTailInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + select { + case err, ok := <-fti.tail.errChan: + if !ok { + return nil, nil, service.ErrNotConnected + } + fti.logger.Error(err.Error()) + return nil, nil, service.ErrNotConnected + + case tl, ok := <-fti.tail.lineChan: + if !ok { + return nil, nil, service.ErrNotConnected + } + + msg := service.NewMessage(tl.line) + msg.MetaSet("file_tail_path", fti.tail.path) + msg.MetaSet("file_tail_position", strconv.Itoa(int(tl.position))) + + return msg, func(ctx context.Context, res error) error { + return nil + }, nil + + case <-ctx.Done(): + return nil, nil, ctx.Err() + + case <-fti.shutSig.HardStopChan(): + return nil, nil, nil + } +} + +func (fti *FileTailInput) Close(ctx context.Context) error { + fti.shutSig.TriggerHardStop() + fti.tail.cancel() + + <-fti.tail.doneChan + + close(fti.tail.errChan) + close(fti.tail.lineChan) + + fti.tail.file.Close() + return nil +} + +//------------------------------------------------------------------------------ + +type tail struct { + path string + pollInterval time.Duration + + file *os.File + fileInfo os.FileInfo + + logger *service.Logger + + reader *bufio.Reader + + lineChan chan tailLine + errChan chan error + doneChan chan struct{} + + cancel context.CancelFunc +} + +type tailLine struct { + line []byte + position int64 +} + +type tailOpt func(*tail) error + +func withPollInterval(pollInterval time.Duration) tailOpt { + return func(t *tail) error { + t.pollInterval = pollInterval + return nil + } +} + +func withLineChanBufferSize(bs int) tailOpt { + return func(t *tail) error { + t.lineChan = make(chan tailLine, bs) + return nil + } +} + +func withStartPosition(sp string) tailOpt { + return func(t *tail) error { + if sp == "end" { + _, err := t.file.Seek(0, io.SeekEnd) + return err + } + return nil + } +} + +func withLogger(logger *service.Logger) tailOpt { + return func(t *tail) error { + t.logger = logger + return nil + } +} + +func newTail(path string, opts ...tailOpt) (tail, error) { + file, err := os.Open(path) + if err != nil { + return tail{}, err + } + + fileInfo, err := file.Stat() + if err != nil { + file.Close() + return tail{}, err + } + + reader := bufio.NewReader(file) + + t := &tail{ + path: path, + pollInterval: time.Second, + + file: file, + fileInfo: fileInfo, + + reader: reader, + lineChan: make(chan tailLine, 1000), + errChan: make(chan error), + doneChan: make(chan struct{}), + } + + for _, o := range opts { + err := o(t) + if err != nil { + file.Close() + return tail{}, err + } + } + + return *t, nil +} + +func (t *tail) watch(ctx context.Context) { + ticker := time.NewTicker(t.pollInterval) + defer ticker.Stop() + + for { + + select { + case <-ctx.Done(): + t.doneChan <- struct{}{} + return + default: + } + + line, err := t.reader.ReadBytes('\n') + line = bytes.TrimRight(line, "\r\n") + + if len(line) > 0 { + pos, err := t.file.Seek(0, io.SeekCurrent) + if err != nil { + t.errChan <- err + return + } + + tl := tailLine{ + line: line, + position: pos, + } + + select { + case t.lineChan <- tl: + case <-ctx.Done(): + t.doneChan <- struct{}{} + return + } + } + + if err != nil { + if err == io.EOF { + + select { + case <-ticker.C: + case <-ctx.Done(): + t.doneChan <- struct{}{} + return + } + + if err := t.reopenIfMovedOrTruncated(); err != nil { + t.errChan <- err + return + } + + } else { + t.errChan <- fmt.Errorf("reader: %w", err) + return + } + } + } +} + +func (t *tail) reopenIfMovedOrTruncated() error { + tempInfo, err := os.Stat(t.path) + if err != nil { + return fmt.Errorf("stat file: %w", err) + } + + pos, err := t.file.Seek(0, io.SeekCurrent) + if err != nil { + return fmt.Errorf("seek file: %w", err) + } + + var truncation bool + var moved bool + + if !os.SameFile(t.fileInfo, tempInfo) { + if t.logger != nil { + t.logger.Info(fmt.Sprintf("Handling rotation for %v", t.path)) + } + moved = true + } + + if pos > tempInfo.Size() && !moved { + if t.logger != nil { + t.logger.Info(fmt.Sprintf("Handling truncation for %v", t.path)) + } + truncation = true + } + + if !truncation && !moved { + return nil + } + + err = t.handleMoveOrTruncation() + if err != nil { + return fmt.Errorf("handle rotation: %w", err) + } + + return nil +} + +func (t *tail) handleMoveOrTruncation() error { + file, err := os.Open(t.path) + if err != nil { + return err + } + + fileInfo, err := os.Stat(t.path) + if err != nil { + return err + } + + t.file.Close() + + t.file = file + t.fileInfo = fileInfo + t.reader = bufio.NewReader(file) + + return nil +} diff --git a/internal/impl/io/input_file_tail_test.go b/internal/impl/io/input_file_tail_test.go new file mode 100644 index 0000000000..031c4b798d --- /dev/null +++ b/internal/impl/io/input_file_tail_test.go @@ -0,0 +1,297 @@ +package io + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/component/testutil" + "github.com/warpstreamlabs/bento/internal/manager/mock" +) + +func TestFileTail_Basic(t *testing.T) { + fullPath := createFile(t, "Hello Alice") + + inputConf, err := testutil.InputFromYAML(fmt.Sprintf(` +file_tail: + path: %v +`, fullPath)) + require.NoError(t, err) + + s, err := mock.NewManager().NewInput(inputConf) + require.NoError(t, err) + + var receivedMsgs []string + msgArrivedChan := make(chan struct{}) + + var receivedPositions []string + + go func() { + for msg := range s.TransactionChan() { + bytes := msg.Payload.Get(0).AsBytes() + position := msg.Payload.Get(0).MetaGetStr("file_tail_position") + + receivedPositions = append(receivedPositions, position) + + receivedMsgs = append(receivedMsgs, string(bytes)) + + err := msg.Ack(context.Background(), nil) + require.NoError(t, err) + msgArrivedChan <- struct{}{} + } + }() + + <-msgArrivedChan + + appendLine(t, fullPath, "Hello Bob") + + <-msgArrivedChan + + s.TriggerStopConsuming() + + err = s.WaitForClose(context.Background()) + require.NoError(t, err) + + expectedMsgs := []string{"Hello Alice", "Hello Bob"} + expectedPositions := []string{"12", "22"} + + assert.Equal(t, expectedPositions, receivedPositions) + assert.Equal(t, expectedMsgs, receivedMsgs) +} + +func TestFileTail_StartPositionEnd(t *testing.T) { + fullPath := createFile(t, "Hello Alice") + + inputConf, err := testutil.InputFromYAML(fmt.Sprintf(` +file_tail: + path: %v + start_position: end +`, fullPath)) + require.NoError(t, err) + + s, err := mock.NewManager().NewInput(inputConf) + require.NoError(t, err) + + var receivedMsgs []string + msgArrivedChan := make(chan struct{}) + + var receivedPositions []string + + go func() { + for msg := range s.TransactionChan() { + bytes := msg.Payload.Get(0).AsBytes() + position := msg.Payload.Get(0).MetaGetStr("file_tail_position") + + receivedPositions = append(receivedPositions, position) + + receivedMsgs = append(receivedMsgs, string(bytes)) + + err := msg.Ack(context.Background(), nil) + require.NoError(t, err) + msgArrivedChan <- struct{}{} + } + }() + + appendLine(t, fullPath, "Hello Bob") + + <-msgArrivedChan + + s.TriggerStopConsuming() + + err = s.WaitForClose(context.Background()) + require.NoError(t, err) + + expectedMsgs := []string{"Hello Bob"} + expectedPositions := []string{"22"} + + assert.Equal(t, expectedPositions, receivedPositions) + assert.Equal(t, expectedMsgs, receivedMsgs) +} + +func TestFileTail_FileRotation(t *testing.T) { + fullPath := createFile(t, "Hello Alice") + + inputConf, err := testutil.InputFromYAML(fmt.Sprintf(` +file_tail: + path: %v +`, fullPath)) + require.NoError(t, err) + + s, err := mock.NewManager().NewInput(inputConf) + require.NoError(t, err) + + var receivedMsgs []string + msgArrivedChan := make(chan struct{}) + + go func() { + for msg := range s.TransactionChan() { + bytes := msg.Payload.Get(0).AsBytes() + + receivedMsgs = append(receivedMsgs, string(bytes)) + + err := msg.Ack(context.Background(), nil) + require.NoError(t, err) + msgArrivedChan <- struct{}{} + } + }() + + <-msgArrivedChan + + err = os.Rename(fullPath, filepath.Join(filepath.Dir(fullPath), "log1.txt")) + require.NoError(t, err) + + // create new file with same path as before - with init data + err = os.WriteFile(fullPath, []byte("Hello Bob"), 0o644) + require.NoError(t, err) + + <-msgArrivedChan + + s.TriggerStopConsuming() + + expectedMsgs := []string{"Hello Alice", "Hello Bob"} + + assert.Equal(t, expectedMsgs, receivedMsgs) +} + +func TestFileTail_FileTruncation(t *testing.T) { + fullPath := createFile(t, "Hello Alice") + + inputConf, err := testutil.InputFromYAML(fmt.Sprintf(` +file_tail: + path: %v +`, fullPath)) + require.NoError(t, err) + + s, err := mock.NewManager().NewInput(inputConf) + require.NoError(t, err) + + var receivedMsgs []string + msgArrivedChan := make(chan struct{}) + + go func() { + for msg := range s.TransactionChan() { + bytes := msg.Payload.Get(0).AsBytes() + + receivedMsgs = append(receivedMsgs, string(bytes)) + + err := msg.Ack(context.Background(), nil) + require.NoError(t, err) + msgArrivedChan <- struct{}{} + } + }() + + <-msgArrivedChan + + err = os.Truncate(fullPath, 0) + require.NoError(t, err) + + appendLine(t, fullPath, "Hello Bob") + + <-msgArrivedChan + + s.TriggerStopConsuming() + + expectedMsgs := []string{"Hello Alice", "Hello Bob"} + + assert.Equal(t, expectedMsgs, receivedMsgs) +} + +func TestFileTail_Shutdown(t *testing.T) { + fullPath := createFile(t, "Hello World") + + inputConf, err := testutil.InputFromYAML(fmt.Sprintf(` +file_tail: + path: %v +`, fullPath)) + require.NoError(t, err) + + s, err := mock.NewManager().NewInput(inputConf) + require.NoError(t, err) + + i := 0 + go func() { + for msg := range s.TransactionChan() { + _ = msg.Payload.Get(0).AsBytes() + + err := msg.Ack(context.Background(), nil) + require.NoError(t, err) + i++ + } + }() + + // append a new lines to 'log.txt' + f, err := os.OpenFile(fullPath, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + defer f.Close() + + terminate := make(chan struct{}) + defer func() { + terminate <- struct{}{} + }() + + go func() { + for { + select { + case <-terminate: + return + default: + _, err = f.WriteString("Hello World\n") + } + } + }() + + time.Sleep(time.Second * 2) + + s.TriggerStopConsuming() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + assert.NoError(t, s.WaitForClose(ctx)) +} + +func TestFileTail_ErrorHandling(t *testing.T) { + fullPath := createFile(t, "Hello Alice") + + tail, err := newTail(fullPath) + require.NoError(t, err) + + ctx := context.Background() + go tail.watch(ctx) + + <-tail.lineChan + + err = os.Remove(fullPath) + require.NoError(t, err) + + tErr := <-tail.errChan + + assert.Contains(t, tErr.Error(), "no such file or directory") +} + +func createFile(t *testing.T, content string) (fullPath string) { + t.Helper() + + tmpDir := t.TempDir() + + fullPath = filepath.Join(tmpDir, "log.txt") + + err := os.WriteFile(fullPath, []byte(content+"\n"), 0o644) + require.NoError(t, err) + + return fullPath +} + +func appendLine(t *testing.T, fullPath string, content string) { + f, err := os.OpenFile(fullPath, os.O_APPEND|os.O_WRONLY, 0o644) + require.NoError(t, err) + + _, err = f.WriteString(content + "\n") + require.NoError(t, err) + + defer f.Close() +} diff --git a/website/docs/components/inputs/file_tail.md b/website/docs/components/inputs/file_tail.md new file mode 100644 index 0000000000..af0d25fb98 --- /dev/null +++ b/website/docs/components/inputs/file_tail.md @@ -0,0 +1,131 @@ +--- +title: file_tail +slug: file_tail +type: input +status: experimental +categories: ["Local"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Tails a file + + + + + + +```yml +# Common config fields, showing default values +input: + label: "" + file_tail: + path: ./folder/file.log # No default (required) + poll_interval: 1s + start_position: start + auto_replay_nacks: true +``` + + + + +```yml +# All config fields, showing default values +input: + label: "" + file_tail: + path: ./folder/file.log # No default (required) + poll_interval: 1s + line_buffer: 1000 + start_position: start + auto_replay_nacks: true +``` + + + + +Reads from a file continuously with handling of log rotation. + +### Metadata + +This input adds the following metadata fields to each message: + +```text +- file_tail_path +- file_tail_position +``` + +You can access these metadata fields using +[function interpolation](/docs/configuration/interpolation#bloblang-queries). + + +## Fields + +### `path` + +Path to file to consume lines from. + + +Type: `string` + +```yml +# Examples + +path: ./folder/file.log +``` + +### `poll_interval` + +The time between checks for new lines. + + +Type: `string` +Default: `"1s"` + +```yml +# Examples + +poll_interval: 1s + +poll_interval: 200ms +``` + +### `line_buffer` + +Number of lines to buffer internally before backpressure is applied. + + +Type: `int` +Default: `1000` + +### `start_position` + +Where to begin reading the file from. `start` will consume all existing lines in the file, `end` will only consume new lines after input has started. + + +Type: `string` +Default: `"start"` +Options: `start`, `end`. + +### `auto_replay_nacks` + +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. + + +Type: `bool` +Default: `true` + +