diff --git a/Makefile b/Makefile index 9eada7d..c12224b 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,8 @@ endif # Tool binaries GOLANGCI_LINT := $(GOBIN)/golangci-lint +YQ := $(GOBIN)/yq +PROMTOOL := $(GOBIN)/promtool # Include integration testing targets include test.mk @@ -33,6 +35,7 @@ lint: $(GOLANGCI_LINT) $(GOLANGCI_LINT): ./hack/install-golangci-lint.sh + # ---------------- # Test # ---------------- @@ -98,3 +101,4 @@ undeploy: ## precommit> run linting and unit tests .PHONY: precommit precommit: lint test + diff --git a/cmd/simulate/simulate.go b/cmd/simulate/simulate.go index c049cee..0c02d26 100644 --- a/cmd/simulate/simulate.go +++ b/cmd/simulate/simulate.go @@ -23,24 +23,26 @@ import ( func must(err error) { if err != nil { - panic(err) + panic(err) } } var outputFile = "cluster-health-analyzer-openmetrics.txt" var scenarioFile string +var alertsOnly bool var SimulateCmd = &cobra.Command{ Use: "simulate", Short: "Generate simulated data in openmetrics format", Run: func(cmd *cobra.Command, args []string) { - simulate(outputFile, scenarioFile) + simulate(outputFile, scenarioFile, alertsOnly) }, } func init() { SimulateCmd.Flags().StringVarP(&outputFile, "output", "o", outputFile, "output file") SimulateCmd.Flags().StringVarP(&scenarioFile, "scenario", "s", "", "CSV file with the scenario to simulate") + SimulateCmd.Flags().BoolVar(&alertsOnly, "alerts-only", false, "Only output ALERTS metrics (for integration testing)") } var defaultRelativeIntervals = []utils.RelativeInterval{ @@ -354,8 +356,17 @@ func parseIntervalsFromCSV(file io.Reader) ([]utils.RelativeInterval, error) { return intervals, nil } +// endTimeBuffer adds extra time to ensure alerts are still "firing" when queried. +// Without this buffer, alerts end exactly at time.Now() and immediately start +// becoming stale in Prometheus (5-minute staleness window). This caused integration +// tests to only see ~80% of alerts because the processor runs after a delay and +// queries at a later time when some alerts have already gone stale. +const endTimeBuffer = 10 * time.Minute + func buildAlertIntervals(scenarioFile string) ([]processor.Interval, error) { - end := model.TimeFromUnixNano(time.Now().UnixNano()) + // Add buffer to end time to ensure alerts are still active when the processor runs. + // This compensates for delays in test setup and processor polling intervals. + end := model.TimeFromUnixNano(time.Now().Add(endTimeBuffer).UnixNano()) intervals := defaultRelativeIntervals if scenarioFile != "" { csvIntervals, err := readIntervalsFromCSV(scenarioFile) @@ -404,7 +415,7 @@ func fmtInterval( return nil } -func simulate(outputFile, scenarioFile string) { +func simulate(outputFile, scenarioFile string, alertsOnly bool) { // Build sample intervals. intervals, err := buildAlertIntervals(scenarioFile) must(err) @@ -422,25 +433,6 @@ func simulate(outputFile, scenarioFile string) { } } - startToIntervals := make(map[model.Time][]processor.Interval) - - // Group them by time. - for _, i := range intervals { - startToIntervals[i.Start] = append(startToIntervals[i.Start], i) - } - - // Prepare the changeset - changes := make(processor.ChangeSet, len(startToIntervals)) - for t, intervals := range startToIntervals { - changes = append(changes, processor.Change{ - Timestamp: t, - Intervals: intervals, - }) - } - sort.Slice(changes, func(i, j int) bool { - return changes[i].Timestamp.Before(changes[j].Timestamp) - }) - f, err := os.Create(outputFile) must(err) defer f.Close() // nolint:errcheck @@ -456,6 +448,15 @@ func simulate(outputFile, scenarioFile string) { must(err) } + // When alertsOnly is set, skip the processed metrics - they should be + // computed by the cluster-health-analyzer being tested. + if alertsOnly { + _, err = fmt.Fprint(w, "# EOF") + must(err) + slog.Info("Openmetrics file saved (alerts only)", "output", outputFile) + return + } + // Output cluster_health_components fprintln(w, "# HELP cluster_health_components Cluster health components ranking") fprintln(w, "# TYPE cluster_health_components gauge") @@ -468,6 +469,25 @@ func simulate(outputFile, scenarioFile string) { must(err) } + startToIntervals := make(map[model.Time][]processor.Interval) + + // Group them by time. + for _, i := range intervals { + startToIntervals[i.Start] = append(startToIntervals[i.Start], i) + } + + // Prepare the changeset + changes := make(processor.ChangeSet, len(startToIntervals)) + for t, intervals := range startToIntervals { + changes = append(changes, processor.Change{ + Timestamp: t, + Intervals: intervals, + }) + } + sort.Slice(changes, func(i, j int) bool { + return changes[i].Timestamp.Before(changes[j].Timestamp) + }) + gc := &processor.GroupsCollection{} var groupedIntervalsSet []processor.GroupedInterval @@ -476,7 +496,7 @@ func simulate(outputFile, scenarioFile string) { groupedIntervalsSet = append(groupedIntervalsSet, groupedIntervals...) } - // Output cluster_health_components:map + // Output cluster_health_components_map fprintln(w, "# HELP cluster_health_components_map Cluster health components mapping") fprintln(w, "# TYPE cluster_health_components_map gauge") @@ -489,8 +509,6 @@ func simulate(outputFile, scenarioFile string) { err := fmtInterval(w, "cluster_health_components_map", healthMap.Labels(), gi.Start, gi.End, step, float64(healthMap.Health)) must(err) } - _, err = fmt.Fprint(w, "# EOF") - must(err) groups := make(map[string][]processor.GroupedInterval) for _, gi := range groupedIntervalsSet { @@ -516,6 +534,9 @@ func simulate(outputFile, scenarioFile string) { slog.Info("Generated incidents", "num", len(groups)) + _, err = fmt.Fprint(w, "# EOF") + must(err) + slog.Info("Openmetrics file saved", "output", outputFile) } diff --git a/test.mk b/test.mk index e03a8bc..5a19b56 100644 --- a/test.mk +++ b/test.mk @@ -52,7 +52,7 @@ undeploy-integration: ## test-integration> run integration tests (assumes deployment exists) .PHONY: test-integration test-integration: - $(GINKGO) -v ./test/integration/... + $(GINKGO) -v --label-filter="!stress&&!stress-simulate" ./test/integration/... ## help-integration> show integration testing workflow and related targets .PHONY: help-integration diff --git a/test/integration/fixtures/labels.go b/test/integration/fixtures/labels.go new file mode 100644 index 0000000..29037c3 --- /dev/null +++ b/test/integration/fixtures/labels.go @@ -0,0 +1,7 @@ +package fixtures + +// Common test labels used for resource identification and cleanup. +const ( + StressTestLabel = "test-type=stress" + CrashLoopTestLabel = "test-type=crashloop" +) diff --git a/test/integration/fixtures/templates.go b/test/integration/fixtures/templates.go index e201e04..1876917 100644 --- a/test/integration/fixtures/templates.go +++ b/test/integration/fixtures/templates.go @@ -9,45 +9,50 @@ import ( //go:embed testdata/*.yaml var testDataFS embed.FS +// Template paths relative to testdata directory. const ( - DeploymentFile = "testdata/crashing_deployment.yaml" - RuleFile = "testdata/crashloop_prometheusrule.yaml" + DeploymentTemplate = "testdata/crashing_deployment.yaml" + PrometheusRuleTemplate = "testdata/crashloop_prometheusrule.yaml" ) +// DeploymentReplacements returns the standard replacements for a deployment template. +func DeploymentReplacements(name, namespace, testType string) map[string]string { + return map[string]string{ + "{{NAME}}": name, + "{{NAMESPACE}}": namespace, + "{{TEST_TYPE}}": testType, + } +} + +// RuleReplacements returns the standard replacements for a PrometheusRule template. +func RuleReplacements(ruleName, alertName, podPrefix, testType string) map[string]string { + return map[string]string{ + "{{RULE_NAME}}": ruleName, + "{{ALERT_NAME}}": alertName, + "{{POD_PREFIX}}": podPrefix, + "{{TEST_TYPE}}": testType, + } +} + +// PodSelector returns the label selector for pods created by a deployment. +func PodSelector(deploymentName string) string { + return "app=" + deploymentName +} + // RenderDeployment loads the embedded deployment template and renders it with the given parameters. func RenderDeployment(name, namespace, testType string) (string, error) { - content, err := testDataFS.ReadFile(DeploymentFile) + content, err := testDataFS.ReadFile(DeploymentTemplate) if err != nil { return "", err } - - data := map[string]string{ - "Name": name, - "Namespace": namespace, - "TestType": testType, - } - - return framework.Render("deployment", content, data) + return framework.RenderTemplate(string(content), DeploymentReplacements(name, namespace, testType)), nil } // RenderRule loads the embedded PrometheusRule template and renders it with the given parameters. func RenderRule(ruleName, alertName, podPrefix, testType string) (string, error) { - content, err := testDataFS.ReadFile(RuleFile) + content, err := testDataFS.ReadFile(PrometheusRuleTemplate) if err != nil { return "", err } - - data := map[string]string{ - "RuleName": ruleName, - "AlertName": alertName, - "PodPrefix": podPrefix, - "TestType": testType, - } - - return framework.Render("rule", content, data) -} - -// PodSelector returns the label selector for pods created by a deployment. -func PodSelector(deploymentName string) string { - return "app=" + deploymentName + return framework.RenderTemplate(string(content), RuleReplacements(ruleName, alertName, podPrefix, testType)), nil } diff --git a/test/integration/framework/cluster.go b/test/integration/framework/cluster.go index 7081c70..e9d1489 100644 --- a/test/integration/framework/cluster.go +++ b/test/integration/framework/cluster.go @@ -41,6 +41,7 @@ func (c *Cluster) DeleteByLabel(ctx context.Context, resourceType, labelSelector return c.run(ctx, "delete", resourceType, "-l", labelSelector, "--ignore-not-found") } +// IsGone checks if a resource no longer exists. // Use this with Eventually to wait for deletion to complete. func (c *Cluster) IsGone(ctx context.Context, resourceType, name string) (bool, error) { err := c.run(ctx, "get", resourceType, name) diff --git a/test/integration/framework/matchers.go b/test/integration/framework/matchers.go index 444df1f..86e4338 100644 --- a/test/integration/framework/matchers.go +++ b/test/integration/framework/matchers.go @@ -77,7 +77,7 @@ type validIncidentMatcher struct { missing []string empty []string invalid []string - incident Incident + incident *Incident } var requiredIncidentLabels = []string{ @@ -90,9 +90,9 @@ var requiredIncidentLabels = []string{ } func (m *validIncidentMatcher) Match(actual interface{}) (bool, error) { - incident, ok := actual.(Incident) + incident, ok := actual.(*Incident) if !ok { - return false, fmt.Errorf("BeValidIncident expects Incident, got %T", actual) + return false, fmt.Errorf("BeValidIncident expects *Incident, got %T", actual) } if incident == nil { @@ -105,7 +105,7 @@ func (m *validIncidentMatcher) Match(actual interface{}) (bool, error) { m.invalid = nil for _, label := range requiredIncidentLabels { - value, exists := incident[label] + value, exists := incident.Labels[label] if !exists { m.missing = append(m.missing, label) } else if value == "" { @@ -114,14 +114,14 @@ func (m *validIncidentMatcher) Match(actual interface{}) (bool, error) { } // Validate layer is a known OpenShift layer - if layer, exists := incident["layer"]; exists && layer != "" { + if layer, exists := incident.Labels["layer"]; exists && layer != "" { if _, ok := ValidLayers[layer]; !ok { m.invalid = append(m.invalid, fmt.Sprintf("layer=%q", layer)) } } // Validate component is a known OpenShift component - if component, exists := incident["component"]; exists && component != "" { + if component, exists := incident.Labels["component"]; exists && component != "" { if _, ok := ValidComponents[component]; !ok { m.invalid = append(m.invalid, fmt.Sprintf("component=%q", component)) } @@ -147,19 +147,19 @@ func (m *validIncidentMatcher) FailureMessage(actual interface{}) string { parts = append(parts, fmt.Sprintf("invalid values: %v", m.invalid)) } - alertname := m.incident["src_alertname"] + alertname := m.incident.Labels["src_alertname"] if alertname == "" { alertname = "(unknown)" } return fmt.Sprintf("Expected incident %s to be valid, but: %s\nAll labels: %v", - alertname, strings.Join(parts, "; "), m.incident) + alertname, strings.Join(parts, "; "), m.incident.Labels) } func (m *validIncidentMatcher) NegatedFailureMessage(actual interface{}) string { alertname := "(unknown)" if m.incident != nil { - if name := m.incident["src_alertname"]; name != "" { + if name := m.incident.Labels["src_alertname"]; name != "" { alertname = name } } diff --git a/test/integration/framework/prometheus.go b/test/integration/framework/prometheus.go index e6539a7..e1fd463 100644 --- a/test/integration/framework/prometheus.go +++ b/test/integration/framework/prometheus.go @@ -33,13 +33,15 @@ func NewPrometheusClient(prometheusURL, token string) (*PrometheusClient, error) return &PrometheusClient{loader: loader}, nil } -// Alert represents a Prometheus alert as a simple label map. -type Alert map[string]string +// Alert represents a Prometheus alert. +type Alert struct { + Labels map[string]string +} // GetAlerts returns all firing alerts matching the alertname pattern. // Pattern can be exact ("MyAlert") or regex ("MyAlert.*"). // If queryTime is zero, uses time.Now(). -func (p *PrometheusClient) GetAlerts(ctx context.Context, alertnamePattern string, queryTime time.Time) ([]Alert, error) { +func (p *PrometheusClient) GetAlerts(ctx context.Context, alertnamePattern string, queryTime time.Time) ([]*Alert, error) { if queryTime.IsZero() { queryTime = time.Now() } @@ -49,20 +51,22 @@ func (p *PrometheusClient) GetAlerts(ctx context.Context, alertnamePattern strin return nil, fmt.Errorf("failed to query alerts: %w", err) } - alerts := make([]Alert, 0, len(results)) + alerts := make([]*Alert, 0, len(results)) for _, ls := range results { - alerts = append(alerts, Alert(labelSetToMap(ls))) + alerts = append(alerts, &Alert{Labels: labelSetToMap(ls)}) } return alerts, nil } -// Incident represents a processed incident from cluster_health_components_map as a simple label map. -type Incident map[string]string +// Incident represents a processed incident from cluster_health_components_map. +type Incident struct { + Labels map[string]string +} // GetIncidents returns all incidents matching the alertname pattern. // Pattern can be exact ("MyAlert") or regex ("MyAlert.*"). // If queryTime is zero, uses time.Now(). -func (p *PrometheusClient) GetIncidents(ctx context.Context, alertnamePattern string, queryTime time.Time) ([]Incident, error) { +func (p *PrometheusClient) GetIncidents(ctx context.Context, alertnamePattern string, queryTime time.Time) ([]*Incident, error) { if queryTime.IsZero() { queryTime = time.Now() } @@ -72,9 +76,9 @@ func (p *PrometheusClient) GetIncidents(ctx context.Context, alertnamePattern st return nil, fmt.Errorf("failed to query incidents: %w", err) } - incidents := make([]Incident, 0, len(results)) + incidents := make([]*Incident, 0, len(results)) for _, ls := range results { - incidents = append(incidents, Incident(labelSetToMap(ls))) + incidents = append(incidents, &Incident{Labels: labelSetToMap(ls)}) } return incidents, nil } diff --git a/test/integration/framework/template.go b/test/integration/framework/template.go index 178bf7a..d1a41c6 100644 --- a/test/integration/framework/template.go +++ b/test/integration/framework/template.go @@ -1,23 +1,36 @@ +// Package framework provides utilities for integration testing. package framework import ( - "bytes" "fmt" - "text/template" + "os" + "strings" ) -// Render handles the core logic of parsing and executing a template. -// It accepts any data structure (map or struct). -func Render(name string, templateContent []byte, data any) (string, error) { - tmpl, err := template.New(name).Parse(string(templateContent)) +// LoadTemplate reads a file and returns its content as a string. +func LoadTemplate(path string) (string, error) { + data, err := os.ReadFile(path) if err != nil { - return "", fmt.Errorf("failed to parse template %s: %w", name, err) + return "", fmt.Errorf("failed to read template %s: %w", path, err) } + return string(data), nil +} - var buf bytes.Buffer - if err := tmpl.Execute(&buf, data); err != nil { - return "", fmt.Errorf("failed to execute template %s: %w", name, err) +// RenderTemplate replaces all placeholders in template with their values. +// Placeholders should be in the format {{KEY}}. +func RenderTemplate(template string, replacements map[string]string) string { + result := template + for placeholder, value := range replacements { + result = strings.ReplaceAll(result, placeholder, value) } + return result +} - return buf.String(), nil +// LoadAndRender loads a template file and renders it with the given replacements. +func LoadAndRender(path string, replacements map[string]string) (string, error) { + template, err := LoadTemplate(path) + if err != nil { + return "", err + } + return RenderTemplate(template, replacements), nil } diff --git a/test/integration/simulate/injector.go b/test/integration/simulate/injector.go new file mode 100644 index 0000000..c6efffd --- /dev/null +++ b/test/integration/simulate/injector.go @@ -0,0 +1,116 @@ +package simulate + +import ( + "fmt" + "os" + "path/filepath" + "time" +) + +// DefaultPrometheusPods are the default Prometheus pods to try (in order). +var DefaultPrometheusPods = []string{"prometheus-k8s-0", "prometheus-k8s-1"} + +// Injector injects simulated alerts into Prometheus. +type Injector struct { + projectRoot string + prometheusNS string + prometheusPods []string +} + +// NewInjector creates an injector for the given Prometheus instance. +// Uses DefaultPrometheusPods for fallback if one pod is unavailable. +func NewInjector(prometheusNS string) (*Injector, error) { + return NewInjectorWithPods(prometheusNS, DefaultPrometheusPods...) +} + +// NewInjectorWithPods creates an injector with specific Prometheus pods. +// Pods are tried in order - if copying to the first pod fails, the next is tried. +func NewInjectorWithPods(prometheusNS string, prometheusPods ...string) (*Injector, error) { + if len(prometheusPods) == 0 { + prometheusPods = DefaultPrometheusPods + } + + root, err := findProjectRoot() + if err != nil { + return nil, err + } + return &Injector{ + projectRoot: root, + prometheusNS: prometheusNS, + prometheusPods: prometheusPods, + }, nil +} + +// InjectionResult contains information about the injection for querying. +type InjectionResult struct { + // QueryTime is the time to use when querying for the injected alerts. + // Alerts with maxEnd exist at approximately this time. + QueryTime time.Time + + // UsedPod is the Prometheus pod that was successfully used for injection. + UsedPod string +} + +// WipePrometheusData deletes all Prometheus data and restarts the pods. +// This guarantees a clean slate before running tests. +// WARNING: This destroys ALL metrics data. Only use on dedicated test clusters. +func (i *Injector) WipePrometheusData() error { + return WipePrometheusData(i.prometheusNS, i.prometheusPods) +} + +// Inject takes a scenario and injects only ALERTS into Prometheus. +// It handles the full workflow: CSV -> simulate (alerts-only) -> TSDB blocks -> copy to Prometheus. +// The cluster_health_components_map metrics are NOT injected - they should be computed +// by the cluster-health-analyzer being tested. +// Returns InjectionResult with the query time to use for finding the alerts. +func (i *Injector) Inject(scenario *ScenarioBuilder) (*InjectionResult, error) { + return i.injectWithOptions(scenario, true) +} + +// InjectFull takes a scenario and injects ALL metrics into Prometheus. +// This includes ALERTS, cluster_health_components, and cluster_health_components_map. +// Use this when you want pre-computed metrics (e.g., for testing queries). +func (i *Injector) InjectFull(scenario *ScenarioBuilder) (*InjectionResult, error) { + return i.injectWithOptions(scenario, false) +} + +func (i *Injector) injectWithOptions(scenario *ScenarioBuilder, alertsOnly bool) (*InjectionResult, error) { + tmpDir, err := os.MkdirTemp("", "simulate-") + if err != nil { + return nil, fmt.Errorf("failed to create temp dir: %w", err) + } + defer os.RemoveAll(tmpDir) + + scenarioFile := filepath.Join(tmpDir, "scenario.csv") + openmetricsFile := filepath.Join(tmpDir, "openmetrics.txt") + dataDir := filepath.Join(tmpDir, "data") + + // Step 1: Write scenario CSV + if err := scenario.WriteCSV(scenarioFile); err != nil { + return nil, fmt.Errorf("failed to write scenario: %w", err) + } + + // Step 2: Run simulate command (with --alerts-only when testing the analyzer) + if err := runSimulate(i.projectRoot, scenarioFile, openmetricsFile, alertsOnly); err != nil { + return nil, fmt.Errorf("failed to run simulate: %w", err) + } + + // Capture time after simulate - alerts with maxEnd exist at approximately this time + queryTime := time.Now() + + // Step 3: Create TSDB blocks + if err := CreateTSDBBlocks(openmetricsFile, dataDir); err != nil { + return nil, fmt.Errorf("failed to create TSDB blocks: %w", err) + } + + // Step 4: Copy to Prometheus (with fallback to other pods) + usedPod, err := CopyBlocksToPrometheusWithFallback(dataDir, i.prometheusNS, i.prometheusPods) + if err != nil { + return nil, fmt.Errorf("failed to copy blocks: %w", err) + } + + return &InjectionResult{ + QueryTime: queryTime, + UsedPod: usedPod, + }, nil +} diff --git a/test/integration/simulate/runner.go b/test/integration/simulate/runner.go new file mode 100644 index 0000000..07073a2 --- /dev/null +++ b/test/integration/simulate/runner.go @@ -0,0 +1,47 @@ +package simulate + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" +) + +// runSimulate executes the simulate command to generate an OpenMetrics file. +// If alertsOnly is true, only ALERTS metrics are generated (for integration testing). +func runSimulate(projectRoot, scenarioFile, outputFile string, alertsOnly bool) error { + args := []string{"run", "./main.go", "simulate", + "--scenario", scenarioFile, + "--output", outputFile} + + if alertsOnly { + args = append(args, "--alerts-only") + } + + cmd := exec.Command("go", args...) + cmd.Dir = projectRoot + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("simulate failed: %w\nOutput: %s", err, string(output)) + } + return nil +} + +// findProjectRoot walks up from cwd to find go.mod. +func findProjectRoot() (string, error) { + dir, err := os.Getwd() + if err != nil { + return "", err + } + + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir, nil + } + parent := filepath.Dir(dir) + if parent == dir { + return "", fmt.Errorf("could not find project root (no go.mod found)") + } + dir = parent + } +} diff --git a/test/integration/simulate/scenario.go b/test/integration/simulate/scenario.go new file mode 100644 index 0000000..36209cb --- /dev/null +++ b/test/integration/simulate/scenario.go @@ -0,0 +1,103 @@ +// Package simulate provides utilities for running stress tests via the simulate command. +package simulate + +import ( + "fmt" + "os" + "strings" +) + +// Alert represents one alert in a scenario. +type Alert struct { + Start int + End int + Name string + Namespace string + Severity string + Silenced bool + ExtraLabel string // Optional extra label key=value +} + +// ScenarioBuilder creates scenario CSV files for the simulate command. +type ScenarioBuilder struct { + alerts []Alert +} + +// NewScenarioBuilder creates a new scenario builder. +func NewScenarioBuilder() *ScenarioBuilder { + return &ScenarioBuilder{} +} + +// AddAlert adds a single alert to the scenario. +func (s *ScenarioBuilder) AddAlert(alert Alert) *ScenarioBuilder { + s.alerts = append(s.alerts, alert) + return s +} + +// AddWatchdog adds a background Watchdog alert (commonly needed). +func (s *ScenarioBuilder) AddWatchdog(start, end int) *ScenarioBuilder { + return s.AddAlert(Alert{ + Start: start, + End: end, + Name: "Watchdog", + Namespace: "openshift-monitoring", + Severity: "none", + Silenced: true, + }) +} + +// AddStressAlerts adds multiple alerts with sequential names. +// The unique prefix (e.g., "StressSim1234567890") prevents grouping with other test runs. +func (s *ScenarioBuilder) AddStressAlerts(count int, prefix, namespace string, startTime, endTime int) *ScenarioBuilder { + for i := 1; i <= count; i++ { + start := startTime + (i % 100) // Slight variation in start times + s.AddAlert(Alert{ + Start: start, + End: endTime, + Name: fmt.Sprintf("%s%04d", prefix, i), + Namespace: namespace, + Severity: "warning", + Silenced: false, + ExtraLabel: "component=monitoring", + }) + } + return s +} + +// WriteCSV writes the scenario to a CSV file. +func (s *ScenarioBuilder) WriteCSV(path string) error { + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("failed to create scenario file: %w", err) + } + defer f.Close() + + // Header + if _, err := f.WriteString("start,end,alertname,namespace,severity,silenced,labels\n"); err != nil { + return err + } + + // Alerts + for _, a := range s.alerts { + silenced := "false" + if a.Silenced { + silenced = "true" + } + + labels := "{}" + if a.ExtraLabel != "" { + // ExtraLabel is in "key=value" format, convert to JSON {"key": "value"} + if parts := strings.SplitN(a.ExtraLabel, "=", 2); len(parts) == 2 { + labels = fmt.Sprintf("{\"%s\": \"%s\"}", parts[0], parts[1]) + } + } + + line := fmt.Sprintf("%d,%d,%s,%s,%s,%s,%s\n", + a.Start, a.End, a.Name, a.Namespace, a.Severity, silenced, labels) + if _, err := f.WriteString(line); err != nil { + return err + } + } + + return nil +} diff --git a/test/integration/simulate/tsdb.go b/test/integration/simulate/tsdb.go new file mode 100644 index 0000000..03b8c7d --- /dev/null +++ b/test/integration/simulate/tsdb.go @@ -0,0 +1,119 @@ +package simulate + +import ( + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strings" +) + +// CreateTSDBBlocks uses promtool to create TSDB blocks from an OpenMetrics file. +func CreateTSDBBlocks(openmetricsFile, dataDir string) error { + if err := os.MkdirAll(dataDir, 0755); err != nil { + return fmt.Errorf("failed to create data dir: %w", err) + } + + cmd := exec.Command("promtool", "tsdb", "create-blocks-from", "openmetrics", + openmetricsFile, dataDir) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("promtool failed: %w\nOutput: %s", err, string(output)) + } + return nil +} + +// CopyBlocksToPrometheusWithFallback copies TSDB blocks to one of the Prometheus pods. +// It tries each pod in order and returns the name of the pod that succeeded. +// If all pods fail, it returns an error with details from all attempts. +func CopyBlocksToPrometheusWithFallback(dataDir, namespace string, pods []string) (string, error) { + var lastErr error + + for _, pod := range pods { + slog.Info("Attempting to copy blocks to Prometheus pod", "pod", pod) + err := CopyBlocksToPrometheus(dataDir, namespace, pod) + if err == nil { + slog.Info("Successfully copied blocks to Prometheus pod", "pod", pod) + return pod, nil + } + slog.Warn("Failed to copy blocks to pod, trying next", "pod", pod, "error", err) + lastErr = err + } + + return "", fmt.Errorf("all Prometheus pods failed, last error: %w", lastErr) +} + +// CopyBlocksToPrometheus copies TSDB blocks to a specific Prometheus pod. +func CopyBlocksToPrometheus(dataDir, namespace, pod string) error { + entries, err := os.ReadDir(dataDir) + if err != nil { + return fmt.Errorf("failed to read data dir: %w", err) + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + blockPath := filepath.Join(dataDir, entry.Name()) + destPath := fmt.Sprintf("%s/%s:/prometheus", namespace, pod) + + cmd := exec.Command("oc", "cp", blockPath, destPath, "-c", "prometheus") + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("oc cp failed for block %s: %w\nOutput: %s", + entry.Name(), err, string(output)) + } + } + + return nil +} + +// WipePrometheusData deletes all Prometheus data and restarts the pods. +// This is a "nuke" approach that guarantees a clean slate by: +// 1. Deleting all contents of /prometheus/ on each pod +// 2. Deleting the pods to force a fresh restart +// 3. Waiting for pods to be ready again +// WARNING: This destroys ALL metrics data, not just test data. +// Only use on dedicated test clusters. +func WipePrometheusData(namespace string, pods []string) error { + slog.Info("Wiping all Prometheus data (nuke mode)") + + // Step 1: Delete all data from each pod + for _, pod := range pods { + slog.Info("Wiping data from pod", "pod", pod) + wipeCmd := exec.Command("oc", "exec", "-n", namespace, pod, "-c", "prometheus", "--", + "sh", "-c", "rm -rf /prometheus/*") + if output, err := wipeCmd.CombinedOutput(); err != nil { + slog.Warn("Failed to wipe data from pod", "pod", pod, "error", err, "output", string(output)) + // Continue - pod might not exist or be accessible + } else { + slog.Info("Wiped data from pod", "pod", pod) + } + } + + // Step 2: Delete all pods to force restart with clean state + podNames := strings.Join(pods, " ") + slog.Info("Deleting Prometheus pods to force restart", "pods", podNames) + deleteCmd := exec.Command("oc", "delete", "pod", "-n", namespace, "--wait=false") + deleteCmd.Args = append(deleteCmd.Args, pods...) + if output, err := deleteCmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to delete pods: %w\nOutput: %s", err, string(output)) + } + + // Step 3: Wait for pods to be ready again + slog.Info("Waiting for Prometheus pods to be ready") + for _, pod := range pods { + waitCmd := exec.Command("oc", "wait", "pod", pod, "-n", namespace, + "--for=condition=Ready", "--timeout=5m") + if output, err := waitCmd.CombinedOutput(); err != nil { + return fmt.Errorf("timeout waiting for pod %s to be ready: %w\nOutput: %s", pod, err, string(output)) + } + slog.Info("Pod is ready", "pod", pod) + } + + slog.Info("Prometheus data wiped and pods ready") + return nil +} + diff --git a/test/integration/testdata/crashing_deployment.yaml b/test/integration/testdata/crashing_deployment.yaml new file mode 100644 index 0000000..588583e --- /dev/null +++ b/test/integration/testdata/crashing_deployment.yaml @@ -0,0 +1,31 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{NAME}} + namespace: {{NAMESPACE}} + labels: + test-type: {{TEST_TYPE}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{NAME}} + template: + metadata: + labels: + app: {{NAME}} + test-type: {{TEST_TYPE}} + spec: + containers: + - name: crash-loop + image: registry.access.redhat.com/ubi9/ubi-minimal:latest + imagePullPolicy: IfNotPresent + command: ["sh", "-c", "exit 1"] + resources: + requests: + cpu: "1m" + memory: "4Mi" + limits: + cpu: "10m" + memory: "16Mi" + diff --git a/test/integration/testdata/crashloop_prometheusrule.yaml b/test/integration/testdata/crashloop_prometheusrule.yaml new file mode 100644 index 0000000..53bf171 --- /dev/null +++ b/test/integration/testdata/crashloop_prometheusrule.yaml @@ -0,0 +1,24 @@ +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +metadata: + labels: + app.kubernetes.io/name: kube-prometheus + app.kubernetes.io/part-of: openshift-monitoring + prometheus: k8s + role: alert-rules + test-type: {{TEST_TYPE}} + name: {{RULE_NAME}} + namespace: openshift-monitoring +spec: + groups: + - name: crashloop-test-{{RULE_NAME}} + rules: + - alert: {{ALERT_NAME}} + annotations: + description: 'Pod {{ $labels.namespace }}/{{ $labels.pod }} ({{ $labels.container }}) is in waiting state (reason: "CrashLoopBackOff").' + summary: Pod is crash looping. + expr: | + max_over_time(kube_pod_container_status_waiting_reason{reason="CrashLoopBackOff", pod=~"{{POD_PREFIX}}.*", job="kube-state-metrics"}[5m]) >= 1 + for: 1m + labels: + severity: warning diff --git a/test/integration/tests/crashloop_test.go b/test/integration/tests/crashloop_test.go index 623a16f..a432c9d 100644 --- a/test/integration/tests/crashloop_test.go +++ b/test/integration/tests/crashloop_test.go @@ -75,7 +75,7 @@ var _ = Describe("KubePodCrashLooping Alert Processing", func() { }, "5m", "30s").Should(BeTrue(), "Alert %s did not fire within timeout", alertName) By("Waiting for cluster-health-analyzer to process the alert") - var incident framework.Incident + var incident *framework.Incident Eventually(func() (bool, error) { incidents, err := promClient.GetIncidents(ctx, alertName, time.Time{}) if len(incidents) > 0 { @@ -86,8 +86,8 @@ var _ = Describe("KubePodCrashLooping Alert Processing", func() { By("Verifying the incident has correct labels") Expect(incident).To(framework.BeValidIncident()) - Expect(map[string]string(incident)).To(HaveKeyWithValue("src_alertname", alertName)) - Expect(map[string]string(incident)).To(HaveKeyWithValue("src_severity", "warning")) + Expect(incident.Labels).To(HaveKeyWithValue("src_alertname", alertName)) + Expect(incident.Labels).To(HaveKeyWithValue("src_severity", "warning")) By(fmt.Sprintf("Test completed - resources %s and %s left for inspection", deploymentName, ruleName)) }) diff --git a/test/integration/tests/stress_simulate_test.go b/test/integration/tests/stress_simulate_test.go new file mode 100644 index 0000000..2863d38 --- /dev/null +++ b/test/integration/tests/stress_simulate_test.go @@ -0,0 +1,111 @@ +package tests + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/openshift/cluster-health-analyzer/test/integration/framework" + "github.com/openshift/cluster-health-analyzer/test/integration/simulate" +) + +var _ = Describe("Stress Test via Simulate", Label("stress-simulate"), func() { + const ( + prometheusNS = "openshift-monitoring" + ) + + var ( + alertCount int + alertPrefix string + promClient *framework.PrometheusClient + injector *simulate.Injector + ) + + BeforeEach(func() { + alertCount = framework.GetEnvInt("STRESS_ALERT_COUNT", 100) + // Generate unique prefix for each test run to avoid conflicts + alertPrefix = fmt.Sprintf("StressSim%d", time.Now().UnixNano()/1e6) + GinkgoWriter.Printf("Stress simulate test: alerts=%d, prefix=%s\n", alertCount, alertPrefix) + + var err error + // Uses default pods (prometheus-k8s-0, prometheus-k8s-1) with fallback + injector, err = simulate.NewInjector(prometheusNS) + Expect(err).NotTo(HaveOccurred()) + + promClient, err = framework.NewPrometheusClient(cfg.ThanosURL, cfg.ThanosToken) + Expect(err).NotTo(HaveOccurred()) + + // Wipe all Prometheus data unless KEEP_TEST_DATA=true is set + // This deletes everything and restarts pods to guarantee a clean slate + if !framework.GetEnvBool("KEEP_TEST_DATA", false) { + GinkgoWriter.Printf("Wiping all Prometheus data (set KEEP_TEST_DATA=true to skip)\n") + if err := injector.WipePrometheusData(); err != nil { + GinkgoWriter.Printf("Warning: failed to wipe Prometheus data: %v\n", err) + // Continue anyway - it's best effort + } + } else { + GinkgoWriter.Printf("Keeping existing Prometheus data (KEEP_TEST_DATA=true)\n") + } + }) + + It("should inject simulated alerts and verify processing", func() { + By(fmt.Sprintf("Injecting %d simulated alerts into Prometheus", alertCount)) + // Use fixed timing: alerts from minute 3000 to 4000 (relative to reference point) + // The unique prefix prevents grouping with alerts from other test runs + scenario := simulate.NewScenarioBuilder(). + AddStressAlerts(alertCount, alertPrefix, "openshift-monitoring", 3000, 4000) + + result, err := injector.Inject(scenario) + Expect(err).NotTo(HaveOccurred()) + GinkgoWriter.Printf("Injection completed:\n") + GinkgoWriter.Printf(" - Used pod: %s\n", result.UsedPod) + GinkgoWriter.Printf(" - Query time: %s\n", result.QueryTime.Format("2006-01-02 15:04:05 MST")) + + By("Waiting for Prometheus to load the blocks") + time.Sleep(30 * time.Second) + + By("Verifying alerts are visible in Prometheus") + alertPattern := alertPrefix + ".*" + Eventually(func() (int, error) { + alerts, err := promClient.GetAlerts(ctx, alertPattern, result.QueryTime) + GinkgoWriter.Printf("Alerts found: %d/%d\n", len(alerts), alertCount) + return len(alerts), err + }, "5m", "10s").Should(BeNumerically(">=", alertCount*99/100), + "Expected at least 80%% of alerts to be visible") + + By("Verifying cluster-health-analyzer processed the alerts") + var incidents []*framework.Incident + Eventually(func() (int, error) { + var err error + // Use time.Now() for incidents - they're generated with current timestamps + incidents, err = promClient.GetIncidents(ctx, alertPattern, time.Time{}) + GinkgoWriter.Printf("Incidents found: %d/%d\n", len(incidents), alertCount) + return len(incidents), err + }, "10m", "15s").Should(BeNumerically(">=", alertCount*99/100), + "Expected at least 50%% of alerts to have incidents") + + By("Verifying all incidents have valid labels") + groupIDs := make(map[string]int) + for i, incident := range incidents { + GinkgoWriter.Printf("Incident %d: group_id=%s, labels=%v\n", + i+1, incident.Labels["group_id"], incident.Labels) + Expect(incident).To(framework.BeValidIncident()) + groupIDs[incident.Labels["group_id"]]++ + } + + // Log group_id summary + GinkgoWriter.Printf("\nGroup ID summary:\n") + for groupID, count := range groupIDs { + GinkgoWriter.Printf(" group_id=%s: %d incidents\n", groupID, count) + } + + // All alerts in the same namespace with similar timing should be grouped together + GinkgoWriter.Printf("\nFound %d unique group_ids across %d incidents\n", len(groupIDs), len(incidents)) + Expect(len(groupIDs)).To(BeNumerically("<=", 5), + "Expected alerts to be grouped into few incidents, got %d groups", len(groupIDs)) + + GinkgoWriter.Printf("Stress simulate test completed: %d alerts injected, %d groups\n", alertCount, len(groupIDs)) + }) +})