Skip to content

Commit f431ef0

Browse files
committed
WiP flakeguard DX integration
1 parent 0fe1e95 commit f431ef0

File tree

8 files changed

+424
-321
lines changed

8 files changed

+424
-321
lines changed

tools/flakeguard/cmd/aggregate_results.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ var AggregateResultsCmd = &cobra.Command{
3434
splunkURL, _ := cmd.Flags().GetString("splunk-url")
3535
splunkToken, _ := cmd.Flags().GetString("splunk-token")
3636
splunkEvent, _ := cmd.Flags().GetString("splunk-event")
37+
dxURL, _ := cmd.Flags().GetString("dx-url")
38+
dxToken, _ := cmd.Flags().GetString("dx-token")
3739

3840
initialDirSize, err := getDirSize(resultsPath)
3941
if err != nil {
@@ -58,6 +60,7 @@ var AggregateResultsCmd = &cobra.Command{
5860
resultsPath,
5961
reports.WithReportID(reportID),
6062
reports.WithSplunk(splunkURL, splunkToken, splunkEvent),
63+
reports.WithDX(dxURL, dxToken),
6164
reports.WithBranchName(branchName),
6265
reports.WithBaseSha(baseSHA),
6366
reports.WithHeadSha(headSHA),
@@ -193,6 +196,8 @@ func init() {
193196
AggregateResultsCmd.Flags().String("splunk-url", "", "Optional url to simultaneously send the test results to splunk")
194197
AggregateResultsCmd.Flags().String("splunk-token", "", "Optional Splunk HEC token to simultaneously send the test results to splunk")
195198
AggregateResultsCmd.Flags().String("splunk-event", "manual", "Optional Splunk event to send as the triggering event for the test results")
199+
AggregateResultsCmd.Flags().String("dx-url", "", "Optional url to simultaneously send the test results to DX")
200+
AggregateResultsCmd.Flags().String("dx-token", "", "Optional token to simultaneously send the test results to DX")
196201

197202
if err := AggregateResultsCmd.MarkFlagRequired("results-path"); err != nil {
198203
log.Fatal().Err(err).Msg("Error marking flag as required")

tools/flakeguard/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/spf13/cobra v1.8.1
1212
github.com/stretchr/testify v1.9.0
1313
golang.org/x/oauth2 v0.24.0
14+
golang.org/x/sync v0.9.0
1415
golang.org/x/text v0.20.0
1516
)
1617

tools/flakeguard/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
4343
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
4444
golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE=
4545
golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
46+
golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ=
47+
golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
4648
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4749
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4850
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

tools/flakeguard/reports/data.go

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (testReport *TestReport) GenerateSummaryData() {
9090
// TestResult contains the results and outputs of a single test
9191
type TestResult struct {
9292
// ReportID is the ID of the report this test result belongs to
93-
// used mostly for Splunk logging
93+
// used mostly for Splunk analysis
9494
ReportID string `json:"report_id"`
9595
TestName string `json:"test_name"`
9696
TestPackage string `json:"test_package"`
@@ -146,50 +146,6 @@ type SummaryData struct {
146146
PassPercent string `json:"pass_percent"`
147147
}
148148

149-
// SplunkType represents what type of data is being sent to Splunk, e.g. a report or a result.
150-
// This is a custom field to help us distinguish what kind of data we're sending.
151-
type SplunkType string
152-
153-
const (
154-
Report SplunkType = "report"
155-
Result SplunkType = "result"
156-
157-
// https://docs.splunk.com/Splexicon:Sourcetype
158-
SplunkSourceType = "flakeguard_json"
159-
// https://docs.splunk.com/Splexicon:Index
160-
SplunkIndex = "github_flakeguard_runs"
161-
)
162-
163-
// SplunkTestReport is the full wrapper structure sent to Splunk for the full test report (sans results)
164-
type SplunkTestReport struct {
165-
Event SplunkTestReportEvent `json:"event"` // https://docs.splunk.com/Splexicon:Event
166-
SourceType string `json:"sourcetype"` // https://docs.splunk.com/Splexicon:Sourcetype
167-
Index string `json:"index"` // https://docs.splunk.com/Splexicon:Index
168-
}
169-
170-
// SplunkTestReportEvent contains the actual meat of the Splunk test report event
171-
type SplunkTestReportEvent struct {
172-
Event string `json:"event"`
173-
Type SplunkType `json:"type"`
174-
Data TestReport `json:"data"`
175-
// Incomplete indicates that there were issues uploading test results and the report is incomplete
176-
Incomplete bool `json:"incomplete"`
177-
}
178-
179-
// SplunkTestResult is the full wrapper structure sent to Splunk for a single test result
180-
type SplunkTestResult struct {
181-
Event SplunkTestResultEvent `json:"event"` // https://docs.splunk.com/Splexicon:Event
182-
SourceType string `json:"sourcetype"` // https://docs.splunk.com/Splexicon:Sourcetype
183-
Index string `json:"index"` // https://docs.splunk.com/Splexicon:Index
184-
}
185-
186-
// SplunkTestResultEvent contains the actual meat of the Splunk test result event
187-
type SplunkTestResultEvent struct {
188-
Event string `json:"event"`
189-
Type SplunkType `json:"type"`
190-
Data TestResult `json:"data"`
191-
}
192-
193149
// Data Processing Functions
194150

195151
func FilterResults(report *TestReport, maxPassRatio float64) *TestReport {

tools/flakeguard/reports/io.go

Lines changed: 23 additions & 222 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,15 @@ package reports
22

33
import (
44
"bufio"
5-
"bytes"
6-
"encoding/binary"
75
"encoding/json"
8-
"errors"
96
"fmt"
107
"io"
118
"os"
129
"path/filepath"
13-
"strings"
14-
"sync"
15-
"time"
1610

17-
"github.com/go-resty/resty/v2"
1811
"github.com/google/uuid"
1912
"github.com/rs/zerolog/log"
13+
"golang.org/x/sync/errgroup"
2014
)
2115

2216
// FileSystem interface and implementations
@@ -47,6 +41,8 @@ type aggregateOptions struct {
4741
splunkURL string
4842
splunkToken string
4943
splunkEvent string
44+
dxURL string
45+
dxToken string
5046
branchName string
5147
baseSha string
5248
headSha string
@@ -66,6 +62,7 @@ func WithReportID(reportID string) AggregateOption {
6662
}
6763

6864
// WithSplunk also sends the aggregation to a Splunk instance as events.
65+
// https://www.splunk.com/
6966
func WithSplunk(url, token string, event string) AggregateOption {
7067
return func(opts *aggregateOptions) {
7168
opts.splunkURL = url
@@ -74,6 +71,15 @@ func WithSplunk(url, token string, event string) AggregateOption {
7471
}
7572
}
7673

74+
// WithDX also sends the aggregation to DX.
75+
// https://getdx.com/
76+
func WithDX(url, token string) AggregateOption {
77+
return func(opts *aggregateOptions) {
78+
opts.dxURL = url
79+
opts.dxToken = token
80+
}
81+
}
82+
7783
func WithBranchName(branchName string) AggregateOption {
7884
return func(opts *aggregateOptions) {
7985
opts.branchName = branchName
@@ -444,7 +450,6 @@ func aggregate(reportChan <-chan *TestReport, errChan <-chan error, opts *aggreg
444450
testMap = make(map[string]TestResult)
445451
excludedTests = map[string]struct{}{}
446452
selectedTests = map[string]struct{}{}
447-
sendToSplunk = opts.splunkURL != ""
448453
)
449454

450455
for report := range reportChan {
@@ -497,222 +502,18 @@ func aggregate(reportChan <-chan *TestReport, errChan <-chan error, opts *aggreg
497502
fullReport.Results = aggregatedResults
498503
fullReport.GenerateSummaryData()
499504

500-
if sendToSplunk {
501-
err = sendDataToSplunk(opts, *fullReport)
502-
if err != nil {
503-
return fullReport, fmt.Errorf("error sending data to Splunk: %w", err)
504-
}
505-
}
506-
return fullReport, err
507-
}
508-
509-
// sendDataToSplunk sends a truncated TestReport and each individual TestResults to Splunk as events
510-
func sendDataToSplunk(opts *aggregateOptions, report TestReport) error {
511-
start := time.Now()
512-
results := report.Results
513-
report.Results = nil // Don't send results to Splunk, doing that individually
514-
// Dry-run mode for example runs
515-
isExampleRun := strings.Contains(opts.splunkURL, "splunk.example.com")
516-
517-
client := resty.New().
518-
SetBaseURL(opts.splunkURL).
519-
SetAuthScheme("Splunk").
520-
SetAuthToken(opts.splunkToken).
521-
SetHeader("Content-Type", "application/json").
522-
SetLogger(ZerologRestyLogger{})
523-
524-
log.Debug().Str("report id", report.ID).Int("results", len(results)).Msg("Sending aggregated data to Splunk")
525-
526-
const (
527-
resultsBatchSize = 10
528-
splunkSizeLimitBytes = 100_000_000 // 100MB. Actual limit is over 800MB, but that's excessive
529-
exampleSplunkReportFileName = "example_results/example_splunk_report.json"
530-
exampleSplunkResultsFileName = "example_results/example_splunk_results_batch_%d.json"
531-
)
532-
533-
var (
534-
splunkErrs = []error{}
535-
resultsBatch = []SplunkTestResult{}
536-
successfulResultsSent = 0
537-
batchNum = 1
538-
)
539-
540-
for resultCount, result := range results {
541-
// No need to send log outputs to Splunk
542-
result.FailedOutputs = nil
543-
result.PassedOutputs = nil
544-
result.PackageOutputs = nil
545-
546-
resultsBatch = append(resultsBatch, SplunkTestResult{
547-
Event: SplunkTestResultEvent{
548-
Event: opts.splunkEvent,
549-
Type: Result,
550-
Data: result,
551-
},
552-
SourceType: SplunkSourceType,
553-
Index: SplunkIndex,
554-
})
555-
556-
if len(resultsBatch) >= resultsBatchSize ||
557-
resultCount == len(results)-1 ||
558-
binary.Size(resultsBatch) >= splunkSizeLimitBytes {
559-
560-
batchData, testNames, err := batchSplunkResults(resultsBatch)
561-
if err != nil {
562-
return fmt.Errorf("error batching results: %w", err)
563-
}
564-
565-
if isExampleRun {
566-
exampleSplunkResultsFileName := fmt.Sprintf(exampleSplunkResultsFileName, batchNum)
567-
exampleSplunkResultsFile, err := os.Create(exampleSplunkResultsFileName)
568-
if err != nil {
569-
return fmt.Errorf("error creating example Splunk results file: %w", err)
570-
}
571-
for _, result := range resultsBatch {
572-
jsonResult, err := json.Marshal(result)
573-
if err != nil {
574-
return fmt.Errorf("error marshaling result for '%s' to json: %w", result.Event.Data.TestName, err)
575-
}
576-
_, err = exampleSplunkResultsFile.Write(jsonResult)
577-
if err != nil {
578-
return fmt.Errorf("error writing result for '%s' to file: %w", result.Event.Data.TestName, err)
579-
}
580-
}
581-
err = exampleSplunkResultsFile.Close()
582-
if err != nil {
583-
return fmt.Errorf("error closing example Splunk results file: %w", err)
584-
}
585-
} else {
586-
resp, err := client.R().SetBody(batchData.String()).Post("")
587-
if err != nil {
588-
splunkErrs = append(splunkErrs,
589-
fmt.Errorf("error sending results for [%s] to Splunk: %w", strings.Join(testNames, ", "), err),
590-
)
591-
}
592-
if resp.IsError() {
593-
splunkErrs = append(splunkErrs,
594-
fmt.Errorf("error sending result for [%s] to Splunk: %s", strings.Join(testNames, ", "), resp.String()),
595-
)
596-
}
597-
if err == nil && !resp.IsError() {
598-
successfulResultsSent += len(resultsBatch)
599-
}
600-
}
601-
resultsBatch = []SplunkTestResult{}
602-
batchNum++
603-
}
604-
}
605-
606-
if isExampleRun {
607-
log.Info().Msg("Example Run. See 'example_results/splunk_results' for the results that would be sent to splunk")
505+
var eg errgroup.Group
506+
eg.Go(func() error {
507+
return sendDataToSplunk(opts, *fullReport)
508+
})
509+
eg.Go(func() error {
510+
return sendDataToDX(opts, *fullReport)
511+
})
512+
if err := eg.Wait(); err != nil {
513+
log.Error().Err(err).Msg("Error sending data to 3rd party services")
608514
}
609515

610-
reportData := SplunkTestReport{
611-
Event: SplunkTestReportEvent{
612-
Event: opts.splunkEvent,
613-
Type: Report,
614-
Data: report,
615-
Incomplete: len(splunkErrs) > 0,
616-
},
617-
SourceType: SplunkSourceType,
618-
Index: SplunkIndex,
619-
}
620-
621-
if isExampleRun {
622-
exampleSplunkReportFile, err := os.Create(exampleSplunkReportFileName)
623-
if err != nil {
624-
return fmt.Errorf("error creating example Splunk report file: %w", err)
625-
}
626-
jsonReport, err := json.Marshal(reportData)
627-
if err != nil {
628-
return fmt.Errorf("error marshaling report: %w", err)
629-
}
630-
_, err = exampleSplunkReportFile.Write(jsonReport)
631-
if err != nil {
632-
return fmt.Errorf("error writing report: %w", err)
633-
}
634-
log.Info().Msgf("Example Run. See '%s' for the results that would be sent to splunk", exampleSplunkReportFileName)
635-
} else {
636-
resp, err := client.R().SetBody(reportData).Post("")
637-
if err != nil {
638-
splunkErrs = append(splunkErrs, fmt.Errorf("error sending report '%s' to Splunk: %w", report.ID, err))
639-
}
640-
if resp.IsError() {
641-
splunkErrs = append(splunkErrs, fmt.Errorf("error sending report '%s' to Splunk: %s", report.ID, resp.String()))
642-
}
643-
}
644-
645-
if len(splunkErrs) > 0 {
646-
log.Error().
647-
Int("successfully sent", successfulResultsSent).
648-
Int("total results", len(results)).
649-
Errs("errors", splunkErrs).
650-
Str("report id", report.ID).
651-
Str("duration", time.Since(start).String()).
652-
Msg("Errors occurred while sending test results to Splunk")
653-
} else {
654-
log.Debug().
655-
Int("successfully sent", successfulResultsSent).
656-
Int("total results", len(results)).
657-
Int("result batches", batchNum).
658-
Str("duration", time.Since(start).String()).
659-
Str("report id", report.ID).
660-
Msg("All results sent successfully to Splunk")
661-
}
662-
663-
return errors.Join(splunkErrs...)
664-
}
665-
666-
// batchSplunkResults creates a batch of TestResult objects as individual JSON objects
667-
// Splunk doesn't accept JSON arrays, they want individual events as single JSON objects
668-
// https://docs.splunk.com/Documentation/Splunk/9.4.0/Data/FormateventsforHTTPEventCollector
669-
func batchSplunkResults(results []SplunkTestResult) (batchData bytes.Buffer, resultTestNames []string, err error) {
670-
for _, result := range results {
671-
data, err := json.Marshal(result)
672-
if err != nil {
673-
return batchData, nil, fmt.Errorf("error marshaling result for '%s': %w", result.Event.Data.TestName, err)
674-
}
675-
if _, err := batchData.Write(data); err != nil {
676-
return batchData, nil, fmt.Errorf("error writing result for '%s': %w", result.Event.Data.TestName, err)
677-
}
678-
if _, err := batchData.WriteRune('\n'); err != nil {
679-
return batchData, nil, fmt.Errorf("error writing newline for '%s': %w", result.Event.Data.TestName, err)
680-
}
681-
resultTestNames = append(resultTestNames, result.Event.Data.TestName)
682-
}
683-
return batchData, resultTestNames, nil
684-
}
685-
686-
// unBatchSplunkResults un-batches a batch of TestResult objects into a slice of TestResult objects
687-
func unBatchSplunkResults(batch []byte) ([]*SplunkTestResult, error) {
688-
results := make([]*SplunkTestResult, 0, bytes.Count(batch, []byte{'\n'}))
689-
scanner := bufio.NewScanner(bytes.NewReader(batch))
690-
691-
maxCapacity := 1024 * 1024 // 1 MB
692-
buf := make([]byte, maxCapacity)
693-
scanner.Buffer(buf, maxCapacity)
694-
695-
var pool sync.Pool
696-
pool.New = func() interface{} { return new(SplunkTestResult) }
697-
698-
for scanner.Scan() {
699-
line := scanner.Bytes()
700-
if len(bytes.TrimSpace(line)) == 0 {
701-
continue // Skip empty lines
702-
}
703-
704-
result := pool.Get().(*SplunkTestResult)
705-
if err := json.Unmarshal(line, result); err != nil {
706-
return results, fmt.Errorf("error unmarshaling result: %w", err)
707-
}
708-
results = append(results, result)
709-
}
710-
711-
if err := scanner.Err(); err != nil {
712-
return results, fmt.Errorf("error scanning: %w", err)
713-
}
714-
715-
return results, nil
516+
return fullReport, err
716517
}
717518

718519
// aggregateReports aggregates multiple TestReport objects into a single TestReport

0 commit comments

Comments
 (0)