Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion tools/flakeguard/cmd/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ var AggregateResultsCmd = &cobra.Command{
splunkToken, _ := cmd.Flags().GetString("splunk-token")
splunkEvent, _ := cmd.Flags().GetString("splunk-event")

initialDirSize, err := getDirSize(resultsPath)
if err != nil {
log.Error().Err(err).Str("path", resultsPath).Msg("Error getting initial directory size")
// intentionally don't exit here, as we can still proceed with the aggregation
}

// Ensure the output directory exists
if err := fs.MkdirAll(outputDir, 0755); err != nil {
log.Error().Err(err).Str("path", outputDir).Msg("Error creating output directory")
Expand Down Expand Up @@ -160,7 +166,14 @@ var AggregateResultsCmd = &cobra.Command{
log.Error().Stack().Err(err).Msg("Error saving aggregated test report")
os.Exit(ErrorExitCode)
}
log.Info().Str("report", aggregatedReportPath).Msg("Aggregation complete")

finalDirSize, err := getDirSize(resultsPath)
if err != nil {
log.Error().Err(err).Str("path", resultsPath).Msg("Error getting final directory size")
// intentionally don't exit here, as we can still proceed with the aggregation
}
diskSpaceUsed := byteCountSI(finalDirSize - initialDirSize)
log.Info().Str("disk space used", diskSpaceUsed).Str("report", aggregatedReportPath).Msg("Aggregation complete")
},
}

Expand All @@ -185,3 +198,33 @@ func init() {
log.Fatal().Err(err).Msg("Error marking flag as required")
}
}

// getDirSize returns the size of a directory in bytes
// helpful for tracking how much data is being produced on disk
func getDirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return nil
})
return size, err
}

// byteCountSI returns a human-readable byte count (decimal SI units)
func byteCountSI(b int64) string {
const unit = 1000
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "kMGTPE"[exp])
}
14 changes: 13 additions & 1 deletion tools/flakeguard/cmd/generate_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ var GenerateReportCmd = &cobra.Command{
githubRunID, _ := cmd.Flags().GetInt64("github-run-id")
artifactName, _ := cmd.Flags().GetString("failed-tests-artifact-name")

initialDirSize, err := getDirSize(outputDir)
if err != nil {
log.Error().Err(err).Str("path", outputDir).Msg("Error getting initial directory size")
// intentionally don't exit here, as we can still proceed with the generation
}

// Get the GitHub token from environment variable
githubToken := os.Getenv("GITHUB_TOKEN")
if githubToken == "" {
Expand Down Expand Up @@ -160,7 +166,13 @@ var GenerateReportCmd = &cobra.Command{
log.Info().Msg("PR comment markdown generated successfully")
}

log.Info().Str("output", outputDir).Msg("Reports generated successfully")
finalDirSize, err := getDirSize(outputDir)
if err != nil {
log.Error().Err(err).Str("path", outputDir).Msg("Error getting initial directory size")
// intentionally don't exit here, as we can still proceed with the generation
}
diskSpaceUsed := byteCountSI(finalDirSize - initialDirSize)
log.Info().Str("disk space used", diskSpaceUsed).Str("output", outputDir).Msg("Reports generated successfully")
},
}

Expand Down
19 changes: 17 additions & 2 deletions tools/flakeguard/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"os/exec"
"path/filepath"

"github.com/rs/zerolog/log"
"github.com/smartcontractkit/chainlink-testing-framework/tools/flakeguard/reports"
Expand Down Expand Up @@ -40,6 +41,13 @@ var RunTestsCmd = &cobra.Command{
shuffleSeed, _ := cmd.Flags().GetString("shuffle-seed")
omitOutputsOnSuccess, _ := cmd.Flags().GetBool("omit-test-outputs-on-success")

outputDir := filepath.Dir(outputPath)
initialDirSize, err := getDirSize(outputDir)
if err != nil {
log.Error().Err(err).Str("path", outputDir).Msg("Error getting initial directory size")
// intentionally don't exit here, as we can still proceed with the run
}

if maxPassRatio < 0 || maxPassRatio > 1 {
log.Error().Float64("max pass ratio", maxPassRatio).Msg("Error: max pass ratio must be between 0 and 1")
os.Exit(ErrorExitCode)
Expand Down Expand Up @@ -113,10 +121,17 @@ var RunTestsCmd = &cobra.Command{
return !tr.Skipped && tr.PassRatio < maxPassRatio
})

finalDirSize, err := getDirSize(outputDir)
if err != nil {
log.Error().Err(err).Str("path", outputDir).Msg("Error getting initial directory size")
// intentionally don't exit here, as we can still proceed with the run
}
diskSpaceUsed := byteCountSI(finalDirSize - initialDirSize)

if len(flakyTests) > 0 {
log.Info().Int("count", len(flakyTests)).Str("pass ratio threshold", fmt.Sprintf("%.2f%%", maxPassRatio*100)).Msg("Found flaky tests")
log.Info().Str("disk space used", diskSpaceUsed).Int("count", len(flakyTests)).Str("pass ratio threshold", fmt.Sprintf("%.2f%%", maxPassRatio*100)).Msg("Found flaky tests")
} else {
log.Info().Msg("No flaky tests found")
log.Info().Str("disk space used", diskSpaceUsed).Msg("No flaky tests found")
}

fmt.Printf("\nFlakeguard Summary\n")
Expand Down
81 changes: 54 additions & 27 deletions tools/flakeguard/reports/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package reports
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -519,31 +521,28 @@ func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
SetHeader("Content-Type", "application/json").
SetLogger(ZerologRestyLogger{})

log.Debug().Str("report id", report.ID).Int("results", len(report.Results)).Msg("Sending aggregated data to Splunk")
log.Debug().Str("report id", report.ID).Int("results", len(results)).Msg("Sending aggregated data to Splunk")

const (
resultsBatchSize = 10
splunkSizeLimitBytes = 100_000_000 // 100MB. Actual limit is over 800MB, but that's excessive
exampleSplunkReportFileName = "example_results/example_splunk_report.json"
exampleSplunkResultsFileName = "example_results/example_splunk_results_batch_%d.json"
)

var (
splunkErrs = []error{}
resultsBatchSize = 10
resultsBatch = []SplunkTestResult{}
successfulResultsSent = 0
batchNum = 1
)

var (
exampleSplunkResultsFileName = "example_results/example_splunk_results.json"
exampleSplunkResultsFile *os.File
exampleSplunkReportFileName = "example_results/example_splunk_report.json"
exampleSplunkReportFile *os.File
err error
)

if isExampleRun {
exampleSplunkResultsFile, err = os.Create(exampleSplunkResultsFileName)
if err != nil {
return fmt.Errorf("error creating example Splunk results file: %w", err)
}
defer exampleSplunkResultsFile.Close()
}
for resultCount, result := range results {
// No need to send log outputs to Splunk
result.FailedOutputs = nil
result.PassedOutputs = nil
result.PackageOutputs = nil

resultsBatch = append(resultsBatch, SplunkTestResult{
Event: SplunkTestResultEvent{
Event: opts.splunkEvent,
Expand All @@ -554,13 +553,21 @@ func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
Index: SplunkIndex,
})

if len(resultsBatch) >= resultsBatchSize || resultCount == len(results)-1 {
if len(resultsBatch) >= resultsBatchSize ||
resultCount == len(results)-1 ||
binary.Size(resultsBatch) >= splunkSizeLimitBytes {

batchData, testNames, err := batchSplunkResults(resultsBatch)
if err != nil {
return fmt.Errorf("error batching results: %w", err)
}

if isExampleRun {
exampleSplunkResultsFileName := fmt.Sprintf(exampleSplunkResultsFileName, batchNum)
exampleSplunkResultsFile, err := os.Create(exampleSplunkResultsFileName)
if err != nil {
return fmt.Errorf("error creating example Splunk results file: %w", err)
}
for _, result := range resultsBatch {
jsonResult, err := json.Marshal(result)
if err != nil {
Expand All @@ -571,6 +578,10 @@ func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
return fmt.Errorf("error writing result for '%s' to file: %w", result.Event.Data.TestName, err)
}
}
err = exampleSplunkResultsFile.Close()
if err != nil {
return fmt.Errorf("error closing example Splunk results file: %w", err)
}
} else {
resp, err := client.R().SetBody(batchData.String()).Post("")
if err != nil {
Expand All @@ -588,11 +599,12 @@ func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
}
}
resultsBatch = []SplunkTestResult{}
batchNum++
}
}

if isExampleRun {
log.Info().Msgf("Example Run. See '%s' for the results that would be sent to splunk", exampleSplunkResultsFileName)
log.Info().Msg("Example Run. See 'example_results/splunk_results' for the results that would be sent to splunk")
}

reportData := SplunkTestReport{
Expand All @@ -607,7 +619,7 @@ func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
}

if isExampleRun {
exampleSplunkReportFile, err = os.Create(exampleSplunkReportFileName)
exampleSplunkReportFile, err := os.Create(exampleSplunkReportFileName)
if err != nil {
return fmt.Errorf("error creating example Splunk report file: %w", err)
}
Expand Down Expand Up @@ -642,6 +654,7 @@ func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
log.Debug().
Int("successfully sent", successfulResultsSent).
Int("total results", len(results)).
Int("result batches", batchNum).
Str("duration", time.Since(start).String()).
Str("report id", report.ID).
Msg("All results sent successfully to Splunk")
Expand Down Expand Up @@ -672,20 +685,34 @@ func batchSplunkResults(results []SplunkTestResult) (batchData bytes.Buffer, res

// unBatchSplunkResults un-batches a batch of TestResult objects into a slice of TestResult objects
func unBatchSplunkResults(batch []byte) ([]*SplunkTestResult, error) {
var results []*SplunkTestResult
scanner := bufio.NewScanner(bufio.NewReader(bytes.NewReader(batch)))
results := make([]*SplunkTestResult, 0, bytes.Count(batch, []byte{'\n'}))
scanner := bufio.NewScanner(bytes.NewReader(batch))

maxCapacity := 1024 * 1024 // 1 MB
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity)

var pool sync.Pool
pool.New = func() interface{} { return new(SplunkTestResult) }

for scanner.Scan() {
line := scanner.Text()
if strings.TrimSpace(line) == "" {
line := scanner.Bytes()
if len(bytes.TrimSpace(line)) == 0 {
continue // Skip empty lines
}
var result *SplunkTestResult
if err := json.Unmarshal([]byte(line), &result); err != nil {

result := pool.Get().(*SplunkTestResult)
if err := json.Unmarshal(line, result); err != nil {
return results, fmt.Errorf("error unmarshaling result: %w", err)
}
results = append(results, result)
}
return results, scanner.Err()

if err := scanner.Err(); err != nil {
return results, fmt.Errorf("error scanning: %w", err)
}

return results, nil
}

// aggregateReports aggregates multiple TestReport objects into a single TestReport
Expand Down
12 changes: 4 additions & 8 deletions tools/flakeguard/reports/io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ const (
splunkToken = "test-token"
splunkEvent = "test"
reportID = "123"
totalTestRuns = 255
totalTestRuns = 270
testRunCount = 15
uniqueTests = 18
uniqueTests = 19
)

func TestAggregateResultFilesSplunk(t *testing.T) {
Expand Down Expand Up @@ -140,9 +140,7 @@ func BenchmarkAggregateResultFiles(b *testing.B) {
zerolog.SetGlobalLevel(zerolog.Disabled)
for i := 0; i < b.N; i++ {
_, err := LoadAndAggregate("./testdata", WithReportID(reportID))
if err != nil {
b.Fatalf("LoadAndAggregate failed: %v", err)
}
require.NoError(b, err, "LoadAndAggregate failed")
}
}

Expand All @@ -156,8 +154,6 @@ func BenchmarkAggregateResultFilesSplunk(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := LoadAndAggregate("./testdata", WithReportID(reportID), WithSplunk(srv.URL, splunkToken, "test"))
if err != nil {
b.Fatalf("LoadAndAggregate failed: %v", err)
}
require.NoError(b, err, "LoadAndAggregate failed")
}
}
Loading
Loading