Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 145 additions & 16 deletions input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,42 +202,156 @@ func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReade

// FileInput can read requests generated by FileOutput
type FileInput struct {
mu sync.Mutex
data chan []byte
exit chan bool
path string
readers []*fileInputReader
speedFactor float64
loop bool
readDepth int
dryRun bool
maxWait time.Duration
mu sync.Mutex
data chan []byte
exit chan bool
path string
readers []*fileInputReader
processedFiles map[string]bool
speedFactor float64
loop bool
readDepth int
dryRun bool
maxWait time.Duration
watchInterval time.Duration
watching bool

stats *expvar.Map
}

// FileInputConfig configuration for the FileInput
type FileInputConfig struct {
Loop bool
ReadDepth int
MaxWait time.Duration
DryRun bool
WatchNewFiles bool // Whether to watch for new files matching the pattern
WatchInterval time.Duration // Interval to check for new files
}

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
config := &FileInputConfig{
Loop: loop,
ReadDepth: readDepth,
MaxWait: maxWait,
DryRun: dryRun,
WatchNewFiles: Settings.InputFileWatch,
WatchInterval: Settings.InputFileWatchInterval,
}
return NewFileInputWithConfig(path, config)
}

// NewFileInputWithConfig constructor for FileInput with detailed configuration.
func NewFileInputWithConfig(path string, config *FileInputConfig) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool)
i.path = path
i.speedFactor = 1
i.loop = loop
i.readDepth = readDepth
i.loop = config.Loop
i.readDepth = config.ReadDepth
i.stats = expvar.NewMap("file-" + path)
i.dryRun = dryRun
i.maxWait = maxWait
i.dryRun = config.DryRun
i.maxWait = config.MaxWait
i.processedFiles = make(map[string]bool)
i.watching = config.WatchNewFiles

if config.WatchInterval > 0 {
i.watchInterval = config.WatchInterval
} else {
i.watchInterval = 5 * time.Second
}

if err := i.init(); err != nil {
return
}

go i.emit()

if i.watching {
go i.watchForNewFiles()
}

return
}

// watchForNewFiles periodically checks for new files matching the path pattern
func (i *FileInput) watchForNewFiles() {
ticker := time.NewTicker(i.watchInterval)
defer ticker.Stop()

for {
select {
case <-i.exit:
return
case <-ticker.C:
i.checkForNewFiles()
}
}
}

// checkForNewFiles looks for new files that match the pattern and adds them to readers
func (i *FileInput) checkForNewFiles() {
defer i.mu.Unlock()
i.mu.Lock()

var matches []string
var err error

if strings.HasPrefix(i.path, "s3://") {
sess := session.Must(session.NewSession(awsConfig()))
svc := s3.New(sess)

bucket, key := parseS3Url(i.path)

params := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(key),
}

resp, err := svc.ListObjects(params)
if err != nil {
Debug(2, "[INPUT-FILE] Error while retrieving list of files from S3", i.path, err)
return
}

for _, c := range resp.Contents {
path := "s3://" + bucket + "/" + (*c.Key)
matches = append(matches, path)
}
} else if matches, err = filepath.Glob(i.path); err != nil {
Debug(2, "[INPUT-FILE] Wrong file pattern", i.path, err)
return
}

if len(matches) == 0 {
return
}

newFilesFound := false

// Check for new files that haven't been processed yet
for _, path := range matches {
if i.processedFiles[path] {
continue
}

Debug(2, fmt.Sprintf("[INPUT-FILE] Found new file: %s", path))
reader := newFileInputReader(path, i.readDepth, i.dryRun)
if reader != nil {
i.readers = append(i.readers, reader)
i.processedFiles[path] = true
newFilesFound = true
}
}

if newFilesFound {
i.stats.Add("reader_count", int64(len(i.readers)))
i.stats.Add("new_files_found", 1)
}
}

func parseS3Url(path string) (bucket, key string) {
path = path[5:] // stripping `s3://`
sep := strings.IndexByte(path, '/')
Expand Down Expand Up @@ -272,7 +386,9 @@ func (i *FileInput) init() (err error) {
}

for _, c := range resp.Contents {
matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
path := "s3://" + bucket + "/" + (*c.Key)
matches = append(matches, path)
i.processedFiles[path] = true
}
} else if matches, err = filepath.Glob(i.path); err != nil {
Debug(2, "[INPUT-FILE] Wrong file pattern", i.path, err)
Expand All @@ -288,6 +404,7 @@ func (i *FileInput) init() (err error) {

for idx, p := range matches {
i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun)
i.processedFiles[p] = true
}

i.stats.Add("reader_count", int64(len(matches)))
Expand Down Expand Up @@ -341,6 +458,7 @@ func (i *FileInput) emit() {
minWait = math.MaxInt64

i.stats.Add("negative_wait", 0)
i.stats.Add("watch_pauses", 0)

for {
select {
Expand All @@ -356,7 +474,14 @@ func (i *FileInput) emit() {
i.init()
lastTime = -1
continue
} else if i.watching {
// When watching for new files, we just wait and continue
i.stats.Add("watch_pauses", 1)
Debug(2, fmt.Sprintf("[INPUT-FILE] No active readers, waiting for new files matching pattern '%s'", i.path))
time.Sleep(i.watchInterval)
continue
} else {
// If not watching, we break out and exit
break
}
}
Expand Down Expand Up @@ -420,7 +545,11 @@ func (i *FileInput) emit() {
i.stats.Set("max_wait", time.Duration(maxWait))
i.stats.Set("min_wait", time.Duration(minWait))

Debug(2, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
if i.watching {
Debug(2, fmt.Sprintf("[INPUT-FILE] No more active readers. Will continue watching for new files matching '%s'\n", i.path))
} else {
Debug(2, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
}

if i.dryRun {
fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",
Expand Down
71 changes: 65 additions & 6 deletions input_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@ package goreplay

import (
"bytes"
cryptoRand "crypto/rand"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"math/big"
"os"
"sync"
"testing"
"time"
)

// generateSecureRandomID generates a cryptographically secure random ID for use in tests
func generateSecureRandomID(t *testing.T) int64 {
randomBigInt, err := cryptoRand.Int(cryptoRand.Reader, big.NewInt(1<<62))
if err != nil {
t.Fatalf("Failed to generate secure random number: %v", err)
}
return randomBigInt.Int64()
}

func TestInputFileWithGET(t *testing.T) {
input := NewTestInput()
rg := NewRequestGenerator([]PluginReader{input}, func() { input.EmitGET() }, 1)
Expand Down Expand Up @@ -88,7 +98,7 @@ func TestInputFileWithGETAndPOST(t *testing.T) {
}

func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
rnd := rand.Int63()
rnd := generateSecureRandomID(t)

file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
file1.Write([]byte("1 1 1\ntest1"))
Expand Down Expand Up @@ -118,7 +128,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
}

func TestInputFileRequestsWithLatency(t *testing.T) {
rnd := rand.Int63()
rnd := generateSecureRandomID(t)

file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
defer file.Close()
Expand Down Expand Up @@ -146,7 +156,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) {
}

func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
rnd := rand.Int63()
rnd := generateSecureRandomID(t)

file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
file1.Write([]byte("1 1 1\nrequest1"))
Expand Down Expand Up @@ -189,7 +199,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
}

func TestInputFileLoop(t *testing.T) {
rnd := rand.Int63()
rnd := generateSecureRandomID(t)

file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
file.Write([]byte("1 1 1\ntest1"))
Expand All @@ -210,7 +220,7 @@ func TestInputFileLoop(t *testing.T) {
}

func TestInputFileCompressed(t *testing.T) {
rnd := rand.Int63()
rnd := generateSecureRandomID(t)

output := NewFileOutput(fmt.Sprintf("/tmp/%d_0.gz", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true})
for i := 0; i < 1000; i++ {
Expand All @@ -235,6 +245,55 @@ func TestInputFileCompressed(t *testing.T) {
os.Remove(name2)
}

func TestInputFileWatchForNewFiles(t *testing.T) {
rnd := generateSecureRandomID(t)
basePath := fmt.Sprintf("/tmp/%d", rnd)

// Create first file
file1, _ := os.OpenFile(fmt.Sprintf("%s_1", basePath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
file1.Write([]byte("1 1 1\ntest1"))
file1.Write([]byte(payloadSeparator))
file1.Close()

// Initialize input with watching enabled and short watch interval
config := &FileInputConfig{
Loop: false,
ReadDepth: 100,
MaxWait: 0,
DryRun: false,
WatchNewFiles: true,
WatchInterval: 300 * time.Millisecond, // Faster interval for testing
}

input := NewFileInputWithConfig(fmt.Sprintf("%s_*", basePath), config)

// Read the first message
msg1, err := input.PluginRead()
if err != nil || string(msg1.Data) != "test1" {
t.Error("Should read first file correctly:", err)
}

// Add a second file while input is running
file2, _ := os.OpenFile(fmt.Sprintf("%s_2", basePath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
file2.Write([]byte("1 1 2\ntest2"))
file2.Write([]byte(payloadSeparator))
file2.Close()

// Wait for file discovery and processing (at least 2 watch intervals)
time.Sleep(700 * time.Millisecond)

// Should be able to read from the newly added file
msg2, err := input.PluginRead()
if err != nil || string(msg2.Data) != "test2" {
t.Error("Should read newly added file correctly:", err)
}

// Clean up
input.Close()
os.Remove(file1.Name())
os.Remove(file2.Name())
}

type CaptureFile struct {
msgs []*Message
file *os.File
Expand Down
18 changes: 11 additions & 7 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@ type AppSettings struct {
OutputWebSocketConfig WebSocketOutputConfig
OutputWebSocketStats bool `json:"output-ws-stats"`

InputFile []string `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
InputFileReadDepth int `json:"input-file-read-depth"`
InputFileDryRun bool `json:"input-file-dry-run"`
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
OutputFile []string `json:"output-file"`
OutputFileConfig FileOutputConfig
InputFile []string `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
InputFileReadDepth int `json:"input-file-read-depth"`
InputFileDryRun bool `json:"input-file-dry-run"`
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
InputFileWatch bool `json:"input-file-watch"`
InputFileWatchInterval time.Duration `json:"input-file-watch-interval"`
OutputFile []string `json:"output-file"`
OutputFileConfig FileOutputConfig

InputRAW []string `json:"input_raw"`
InputRAWConfig RAWInputConfig
Expand Down Expand Up @@ -167,6 +169,8 @@ func init() {
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")
flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.")
flag.DurationVar(&Settings.InputFileMaxWait, "input-file-max-wait", 0, "Set the maximum time between requests. Can help in situations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s")
flag.BoolVar(&Settings.InputFileWatch, "input-file-watch", true, "Watch for new files matching pattern. When turned on, Gor will continue running after processing all existing files, watching for new ones.")
flag.DurationVar(&Settings.InputFileWatchInterval, "input-file-watch-interval", 5*time.Second, "Interval for checking for new files. Example: --input-file-watch-interval 10s")

flag.Var(&MultiOption{&Settings.OutputFile}, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor")
flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")
Expand Down