diff --git a/cmd/kepler/main.go b/cmd/kepler/main.go index 02c43377b7..03359a8885 100644 --- a/cmd/kepler/main.go +++ b/cmd/kepler/main.go @@ -167,11 +167,15 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service, server.WithWebConfig(cfg.Web.Config), ) + // Create health probe service + healthProbeService := server.NewHealthProbeService(apiServer, pm, pm, logger) + services = append(services, resourceInformer, cpuPowerMeter, apiServer, pm, + healthProbeService, ) // Add Redfish service if enabled diff --git a/docs/HEALTH_PROBES.md b/docs/HEALTH_PROBES.md new file mode 100644 index 0000000000..81c73156e3 --- /dev/null +++ b/docs/HEALTH_PROBES.md @@ -0,0 +1,232 @@ +# Kubernetes Health Probes for Kepler + +This documentation describes the implementation of health check endpoints for Kubernetes probes in Kepler. + +## Overview + +Health probes allow Kubernetes to determine the health status of your application. Kepler implements two types of probes: + +- **Liveness Probe** (`/probe/livez`): Determines if the application is alive and responding +- **Readiness Probe** (`/probe/readyz`): Determines if the application is ready to receive traffic + +## Endpoints + +### `/probe/livez` - Liveness Probe + +**Description**: Checks if Kepler's monitor service is alive and responding. + +**Success Criteria**: +- PowerMonitor service is not nil +- Collection context is not cancelled + +**Response**: +- `200 OK`: Service is alive +- `503 Service Unavailable`: Service is not alive + +**Example Response**: +```json +{ + "status": "ok", + "timestamp": "2025-01-17T10:30:00Z", + "duration": "1.2µs" +} +``` + +### `/probe/readyz` - Readiness Probe + +**Description**: Checks if Kepler's monitor service is ready to serve data. + +**Success Criteria**: +- Service is alive (checks liveness first) +- At least one snapshot is available +- Snapshot is not too old (within staleness limit) +- CPU meter is functional +- Energy zones are initialized + +**Response**: +- `200 OK`: Service is ready +- `503 Service Unavailable`: Service is not ready + +**Example Response**: +```json +{ + "status": "ok", + "timestamp": "2025-01-17T10:30:00Z", + "duration": "1.8µs" +} +``` + +## Kubernetes Configuration + +### DaemonSet with Health Probes + +```yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: kepler + namespace: kepler +spec: + selector: + matchLabels: + app: kepler + template: + metadata: + labels: + app: kepler + spec: + containers: + - name: kepler + image: quay.io/sustainable_computing_io/kepler:latest + ports: + - containerPort: 28282 + name: http-metrics + livenessProbe: + httpGet: + path: /probe/livez + port: 28282 + initialDelaySeconds: 10 + periodSeconds: 30 + timeoutSeconds: 5 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /probe/readyz + port: 28282 + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 +``` + +## Testing + +### Unit Tests + +Unit tests are available in `internal/server/health_test.go`: + +```bash +go test ./internal/server/ -v +``` + +### Integration Tests + +A test script is provided to test the endpoints live: + +```bash +# Start Kepler +go run ./cmd/kepler/ + +# In another terminal, test the endpoints +./examples/test-health-endpoints.sh +``` + +## Architecture + +### Interfaces + +The following interfaces were added in `internal/service/service.go`: + +```go +// LiveChecker checks if a service is alive +type LiveChecker interface { + IsLive(ctx context.Context) (bool, error) +} + +// ReadyChecker checks if a service is ready +type ReadyChecker interface { + IsReady(ctx context.Context) (bool, error) +} +``` + +### Implementation + +1. **PowerMonitor** (`internal/monitor/monitor.go`): Implements `LiveChecker` and `ReadyChecker` interfaces +2. **HealthProbeService** (`internal/server/health.go`): Service that exposes HTTP endpoints +3. **Integration** (`cmd/kepler/main.go`): Service registration in the main application + +### Verification Flow + +#### Liveness Check +1. Verify the monitor is not nil +2. Verify the collection context is not cancelled + +#### Readiness Check +1. Execute liveness check +2. Verify a snapshot is available +3. Verify the snapshot is not stale +4. Verify the CPU meter is available +5. Verify energy zones are initialized + +## Performance + +Health checks are designed to be very lightweight: +- **Liveness**: Typically 1-5 microseconds +- **Readiness**: Typically 1-10 microseconds + +No forced data collection is performed during health checks to avoid performance impact. + +## Debugging + +### Logs + +Health checks generate DEBUG level logs for successes and ERROR level logs for failures: + +```bash +# View health check logs +journalctl -u kepler -f | grep "health-probe" +``` + +### Manual Testing + +```bash +# Test liveness +curl -v http://localhost:28282/probe/livez + +# Test readiness +curl -v http://localhost:28282/probe/readyz + +# With jq to format response +curl -s http://localhost:28282/probe/livez | jq . +``` + +## Troubleshooting + +### Liveness probe fails + +- Verify Kepler is started +- Check logs for startup errors +- Verify port 28282 is accessible + +### Readiness probe fails + +- Verify liveness probe works +- Verify `/proc` and `/sys` files are accessible +- Check RAPL zones configuration +- Verify collection interval is not too long + +## Migration + +This implementation is compatible with existing Kepler versions. The new endpoints are optional and do not affect existing functionality. + +To enable health probes in an existing installation, simply update the Kubernetes configuration to include the new probes. + +## Files Modified/Added + +### New Files +- `internal/server/health.go` - Health probe service implementation +- `internal/server/health_test.go` - Unit tests for health probes +- `examples/kubernetes-health-probes.yaml` - Kubernetes DaemonSet example +- `examples/test-health-endpoints.sh` - Integration test script + +### Modified Files +- `internal/service/service.go` - Added LiveChecker and ReadyChecker interfaces +- `internal/monitor/monitor.go` - Added IsLive() and IsReady() methods to PowerMonitor +- `cmd/kepler/main.go` - Registered health probe service + +## Future Enhancements + +- Add more granular health checks for different components +- Implement health check metrics for monitoring +- Add configuration options for health check behavior +- Support for custom health check plugins \ No newline at end of file diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 0cc1ae7fdb..3a0ca07786 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -67,6 +67,13 @@ type PowerMonitor struct { // state atomically across goroutines. exported atomic.Bool + + // lastCollectUnixNano tracks the last collection timestamp for liveness checks + lastCollectUnixNano int64 + + // healthCheckTolerance is the multiplier for interval tolerance in liveness checks + healthCheckTolerance float64 + zonesNames []string // cache of all zones // Internal terminated workload trackers (not exposed) @@ -103,6 +110,8 @@ func NewPowerMonitor(meter device.CPUPowerMeter, applyOpts ...OptionFn) *PowerMo maxTerminated: opts.maxTerminated, minTerminatedEnergyThreshold: opts.minTerminatedEnergyThreshold, + + healthCheckTolerance: opts.healthCheckTolerance, collectionCtx: ctx, collectionCancel: cancel, @@ -338,8 +347,13 @@ func (pm *PowerMonitor) refreshSnapshot() error { pm.exported.Store(false) // Update snapshot with current timestamp - newSnapshot.Timestamp = pm.clock.Now() + now := pm.clock.Now() + newSnapshot.Timestamp = now pm.snapshot.Store(newSnapshot) + + // Update collection heartbeat for liveness checks + atomic.StoreInt64(&pm.lastCollectUnixNano, now.UnixNano()) + pm.signalNewData() pm.logger.Debug("refreshSnapshot", "processes", len(newSnapshot.Processes), @@ -429,3 +443,47 @@ func (pm *PowerMonitor) calculatePower(prev, newSnapshot *Snapshot) error { return nil } + +// IsLive checks if the monitor is alive and responsive +func (pm *PowerMonitor) IsLive(ctx context.Context) (bool, error) { + if pm == nil { + return false, fmt.Errorf("monitor is nil") + } + if pm.cpu == nil { + return false, fmt.Errorf("CPU meter not initialized") + } + + // If periodic collection is expected, require a recent heartbeat + if pm.interval > 0 { + lastNano := atomic.LoadInt64(&pm.lastCollectUnixNano) + if lastNano == 0 { + return false, fmt.Errorf("no collection heartbeat yet") + } + last := time.Unix(0, lastNano) + tolerance := time.Duration(float64(pm.interval) * pm.healthCheckTolerance) + if time.Since(last) > tolerance { + return false, fmt.Errorf("collector stalled; last=%s, tolerance=%.1fx interval", last, pm.healthCheckTolerance) + } + } + return true, nil +} + +// IsReady checks if the monitor is ready to serve data +func (pm *PowerMonitor) IsReady(ctx context.Context) (bool, error) { + if pm == nil { + return false, fmt.Errorf("monitor is nil") + } + + // Passive mode: ready even without periodic collection + if pm.interval == 0 { + return true, nil + } + + // Active collection: require at least one snapshot + if pm.snapshot.Load() == nil { + return false, fmt.Errorf("no data yet") + } + + return true, nil +} + diff --git a/internal/monitor/monitor_integration_test.go b/internal/monitor/monitor_integration_test.go new file mode 100644 index 0000000000..9d1abcaf50 --- /dev/null +++ b/internal/monitor/monitor_integration_test.go @@ -0,0 +1,381 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package monitor + +import ( + "context" + "log/slog" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/sustainable-computing-io/kepler/internal/device" +) + +func TestPowerMonitor_HealthCheck_CollectorFailure(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + interval := 1 * time.Second + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(interval), + ) + + ctx := context.Background() + + // Test 1: Initial state - not alive without heartbeat + alive, err := pm.IsLive(ctx) + if alive { + t.Error("Expected not alive initially without heartbeat") + } + if err == nil || err.Error() != "no collection heartbeat yet" { + t.Errorf("Expected 'no collection heartbeat yet' error, got: %v", err) + } + + // Test 2: Simulate collector starting - set fresh heartbeat + now := time.Now() + atomic.StoreInt64(&pm.lastCollectUnixNano, now.UnixNano()) + + alive, err = pm.IsLive(ctx) + if !alive || err != nil { + t.Errorf("Expected alive with fresh heartbeat, got alive=%v, err=%v", alive, err) + } + + // Test 3: Simulate collector failure - heartbeat goes stale + staleTime := time.Now().Add(-3 * interval) // 3x interval = stale + atomic.StoreInt64(&pm.lastCollectUnixNano, staleTime.UnixNano()) + + alive, err = pm.IsLive(ctx) + if alive { + t.Error("Expected not alive with stale heartbeat") + } + if err == nil || !containsStringIntegration(err.Error(), "collector stalled") { + t.Errorf("Expected 'collector stalled' error, got: %v", err) + } + + // Test 4: Simulate collector recovery - fresh heartbeat again + freshTime := time.Now() + atomic.StoreInt64(&pm.lastCollectUnixNano, freshTime.UnixNano()) + + alive, err = pm.IsLive(ctx) + if !alive || err != nil { + t.Errorf("Expected alive after recovery, got alive=%v, err=%v", alive, err) + } +} + +func TestPowerMonitor_HealthCheck_StateTransitions(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), // Active mode + ) + + ctx := context.Background() + + // State 1: Not Ready (no data) + ready, err := pm.IsReady(ctx) + if ready { + t.Error("Expected not ready initially") + } + if err == nil || err.Error() != "no data yet" { + t.Errorf("Expected 'no data yet' error, got: %v", err) + } + + // State 2: Transition to Ready (add snapshot) + snapshot := NewSnapshot() + snapshot.Timestamp = time.Now() + pm.snapshot.Store(snapshot) + + ready, err = pm.IsReady(ctx) + if !ready || err != nil { + t.Errorf("Expected ready with snapshot, got ready=%v, err=%v", ready, err) + } + + // State 3: Transition back to Not Ready (remove snapshot) + pm.snapshot.Store(nil) + + ready, err = pm.IsReady(ctx) + if ready { + t.Error("Expected not ready after removing snapshot") + } + if err == nil || err.Error() != "no data yet" { + t.Errorf("Expected 'no data yet' error, got: %v", err) + } + + // State 4: Transition to Ready again (restore snapshot) + snapshot2 := NewSnapshot() + snapshot2.Timestamp = time.Now() + pm.snapshot.Store(snapshot2) + + ready, err = pm.IsReady(ctx) + if !ready || err != nil { + t.Errorf("Expected ready again with new snapshot, got ready=%v, err=%v", ready, err) + } +} + +func TestPowerMonitor_HealthCheck_PassiveActiveTransition(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + ctx := context.Background() + + // Test Passive Mode (interval = 0) + pmPassive := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(0), // Passive mode + ) + + // Passive mode should always be alive and ready + alive, err := pmPassive.IsLive(ctx) + if !alive || err != nil { + t.Errorf("Expected passive mode to be alive, got alive=%v, err=%v", alive, err) + } + + ready, err := pmPassive.IsReady(ctx) + if !ready || err != nil { + t.Errorf("Expected passive mode to be ready, got ready=%v, err=%v", ready, err) + } + + // Test Active Mode (interval > 0) + pmActive := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), // Active mode + ) + + // Active mode without heartbeat should not be alive + alive, err = pmActive.IsLive(ctx) + if alive { + t.Error("Expected active mode without heartbeat to not be alive") + } + + // Active mode without data should not be ready + ready, err = pmActive.IsReady(ctx) + if ready { + t.Error("Expected active mode without data to not be ready") + } +} + +func TestPowerMonitor_HealthCheck_HeartbeatEdgeCases(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + interval := 1 * time.Second + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(interval), + ) + + ctx := context.Background() + + testCases := []struct { + name string + ageOffset time.Duration + expectedAlive bool + expectError bool + }{ + { + name: "Fresh heartbeat", + ageOffset: -100 * time.Millisecond, + expectedAlive: true, + expectError: false, + }, + { + name: "At tolerance limit (just under 2x interval)", + ageOffset: -2*interval + 50*time.Millisecond, + expectedAlive: true, + expectError: false, + }, + { + name: "Just over tolerance limit", + ageOffset: -2*interval - 50*time.Millisecond, + expectedAlive: false, + expectError: true, + }, + { + name: "Very stale heartbeat", + ageOffset: -10 * interval, + expectedAlive: false, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Set heartbeat with specific age + heartbeatTime := time.Now().Add(tc.ageOffset) + atomic.StoreInt64(&pm.lastCollectUnixNano, heartbeatTime.UnixNano()) + + alive, err := pm.IsLive(ctx) + + if alive != tc.expectedAlive { + t.Errorf("Expected alive=%v, got %v", tc.expectedAlive, alive) + } + + if tc.expectError && err == nil { + t.Error("Expected error but got none") + } else if !tc.expectError && err != nil { + t.Errorf("Expected no error but got: %v", err) + } + }) + } +} + +func TestPowerMonitor_HealthCheck_ConcurrentStateChanges(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), + ) + + ctx := context.Background() + + // Set initial state + snapshot := NewSnapshot() + snapshot.Timestamp = time.Now() + pm.snapshot.Store(snapshot) + atomic.StoreInt64(&pm.lastCollectUnixNano, time.Now().UnixNano()) + + // Run concurrent health checks while changing state + const numCheckers = 50 + const numStateChanges = 10 + + done := make(chan bool, numCheckers+numStateChanges) + + // Start health checkers + for i := 0; i < numCheckers; i++ { + go func() { + defer func() { done <- true }() + for j := 0; j < 20; j++ { + pm.IsLive(ctx) + pm.IsReady(ctx) + time.Sleep(1 * time.Millisecond) + } + }() + } + + // Start state changers + for i := 0; i < numStateChanges; i++ { + go func(id int) { + defer func() { done <- true }() + for j := 0; j < 10; j++ { + if j%2 == 0 { + // Set good state + snap := NewSnapshot() + snap.Timestamp = time.Now() + pm.snapshot.Store(snap) + atomic.StoreInt64(&pm.lastCollectUnixNano, time.Now().UnixNano()) + } else { + // Set bad state + pm.snapshot.Store(nil) + atomic.StoreInt64(&pm.lastCollectUnixNano, 0) + } + time.Sleep(2 * time.Millisecond) + } + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numCheckers+numStateChanges; i++ { + <-done + } + + // No panics or deadlocks = success + t.Log("Concurrent state changes test completed successfully") +} + +func TestPowerMonitor_HealthCheck_RealTimeProgression(t *testing.T) { + if testing.Short() { + t.Skip("Skipping real-time test in short mode") + } + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + interval := 500 * time.Millisecond + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(interval), + ) + + ctx := context.Background() + + // Set initial heartbeat + atomic.StoreInt64(&pm.lastCollectUnixNano, time.Now().UnixNano()) + + // Test progression over time + times := []time.Duration{ + 0, // Fresh + interval, // 1x interval - still alive + interval + 500*time.Millisecond, // 1.5x interval - still alive + 2*interval + 100*time.Millisecond, // Just over 2x interval - should be stale + } + + for i, sleepDuration := range times { + if i > 0 { + time.Sleep(sleepDuration - times[i-1]) + } + + alive, err := pm.IsLive(ctx) + expectedAlive := sleepDuration < 2*interval + + t.Logf("At %v (%.1fx interval): alive=%v, err=%v", + sleepDuration, float64(sleepDuration)/float64(interval), alive, err) + + if alive != expectedAlive { + t.Errorf("At %v: expected alive=%v, got %v", sleepDuration, expectedAlive, alive) + } + } +} + +// Helper function for string containment check (local to avoid conflicts) +func containsStringIntegration(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || + (len(substr) > 0 && len(s) > len(substr) && + (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || + func() bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false + }()))) +} \ No newline at end of file diff --git a/internal/monitor/monitor_liveness_test.go b/internal/monitor/monitor_liveness_test.go new file mode 100644 index 0000000000..2c1f2e5294 --- /dev/null +++ b/internal/monitor/monitor_liveness_test.go @@ -0,0 +1,393 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package monitor + +import ( + "context" + "log/slog" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/sustainable-computing-io/kepler/internal/device" + clocktesting "k8s.io/utils/clock/testing" +) + +func TestPowerMonitor_IsLive_NilMonitor(t *testing.T) { + var pm *PowerMonitor = nil + ctx := context.Background() + + alive, err := pm.IsLive(ctx) + if alive { + t.Error("Expected alive=false for nil monitor") + } + if err == nil { + t.Error("Expected error for nil monitor") + } + if err.Error() != "monitor is nil" { + t.Errorf("Expected 'monitor is nil' error, got: %v", err) + } +} + +func TestPowerMonitor_IsLive_NilCPUMeter(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + pm := NewPowerMonitor( + nil, // nil CPU meter + WithLogger(logger), + WithInterval(0), // Passive mode + ) + + ctx := context.Background() + + alive, err := pm.IsLive(ctx) + if alive { + t.Error("Expected alive=false with nil CPU meter") + } + if err == nil { + t.Error("Expected error with nil CPU meter") + } + if err.Error() != "CPU meter not initialized" { + t.Errorf("Expected 'CPU meter not initialized' error, got: %v", err) + } +} + +func TestPowerMonitor_IsLive_PassiveMode(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(0), // Passive mode - no heartbeat required + ) + + ctx := context.Background() + + // Should be alive immediately in passive mode (no heartbeat check) + alive, err := pm.IsLive(ctx) + if err != nil { + t.Errorf("Expected no error in passive mode, got: %v", err) + } + if !alive { + t.Error("Expected alive=true in passive mode") + } +} + +func TestPowerMonitor_IsLive_ActiveMode_NoHeartbeat(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), // Active mode + ) + + ctx := context.Background() + + // Should not be alive without heartbeat + alive, err := pm.IsLive(ctx) + if alive { + t.Error("Expected alive=false without heartbeat") + } + if err == nil { + t.Error("Expected error without heartbeat") + } + if err.Error() != "no collection heartbeat yet" { + t.Errorf("Expected 'no collection heartbeat yet' error, got: %v", err) + } +} + +func TestPowerMonitor_IsLive_ActiveMode_FreshHeartbeat(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), // Active mode + ) + + // Simulate fresh heartbeat + now := time.Now() + atomic.StoreInt64(&pm.lastCollectUnixNano, now.UnixNano()) + + ctx := context.Background() + + // Should be alive with fresh heartbeat + alive, err := pm.IsLive(ctx) + if err != nil { + t.Errorf("Expected no error with fresh heartbeat, got: %v", err) + } + if !alive { + t.Error("Expected alive=true with fresh heartbeat") + } +} + +func TestPowerMonitor_IsLive_ActiveMode_StaleHeartbeat(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + interval := 1 * time.Second + tolerance := 2.0 // Default tolerance + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(interval), // Active mode + WithHealthCheckTolerance(tolerance), + ) + + // Simulate stale heartbeat (older than tolerance*interval) + staleTime := time.Now().Add(-time.Duration(float64(interval) * (tolerance + 1))) // Beyond tolerance + atomic.StoreInt64(&pm.lastCollectUnixNano, staleTime.UnixNano()) + + ctx := context.Background() + + // Should not be alive with stale heartbeat + alive, err := pm.IsLive(ctx) + if alive { + t.Error("Expected alive=false with stale heartbeat") + } + if err == nil { + t.Error("Expected error with stale heartbeat") + } + if !containsString(err.Error(), "collector stalled") { + t.Errorf("Expected 'collector stalled' error, got: %v", err) + } + if !containsString(err.Error(), "tolerance=2.0x interval") { + t.Errorf("Expected tolerance info in error, got: %v", err) + } +} + +func TestPowerMonitor_IsLive_ActiveMode_HeartbeatAtLimit(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + interval := 1 * time.Second + tolerance := 2.0 + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(interval), + WithHealthCheckTolerance(tolerance), + ) + + // Simulate heartbeat exactly at the tolerance limit + toleranceDuration := time.Duration(float64(interval) * tolerance) + limitTime := time.Now().Add(-toleranceDuration + 100*time.Millisecond) // Just under the limit + atomic.StoreInt64(&pm.lastCollectUnixNano, limitTime.UnixNano()) + + ctx := context.Background() + + // Should still be alive just under the limit + alive, err := pm.IsLive(ctx) + if err != nil { + t.Errorf("Expected no error just under limit, got: %v", err) + } + if !alive { + t.Error("Expected alive=true just under limit") + } +} + +func TestPowerMonitor_IsLive_WithRealHeartbeatUpdate(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + testClock := clocktesting.NewFakeClock(time.Now()) + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), + WithClock(testClock), + ) + + if err := pm.Init(); err != nil { + t.Fatalf("Failed to init monitor: %v", err) + } + + ctx := context.Background() + + // Initial state: no heartbeat + alive, err := pm.IsLive(ctx) + if alive || err == nil { + t.Error("Expected not alive initially") + } + + // Simulate a collection cycle by manually setting heartbeat + // (calling refreshSnapshot would require full initialization which is complex for this test) + now := testClock.Now() + atomic.StoreInt64(&pm.lastCollectUnixNano, now.UnixNano()) + + // Now should be alive after collection + alive, err = pm.IsLive(ctx) + if err != nil { + t.Errorf("Expected alive after collection, got: %v", err) + } + if !alive { + t.Error("Expected alive=true after collection") + } + + // Advance real time by simulating a stale heartbeat + // (We can't use testClock.Step because IsLive uses real time.Since, not pm.clock) + staleTime := time.Now().Add(-3 * time.Second) + atomic.StoreInt64(&pm.lastCollectUnixNano, staleTime.UnixNano()) + + // Should now be stale + alive, err = pm.IsLive(ctx) + if alive { + t.Error("Expected not alive after setting stale heartbeat") + } + if err == nil || !containsString(err.Error(), "collector stalled") { + t.Errorf("Expected stalled error, got: %v", err) + } +} + +func TestPowerMonitor_IsLive_ConcurrentAccess(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), + ) + + // Set fresh heartbeat + now := time.Now() + atomic.StoreInt64(&pm.lastCollectUnixNano, now.UnixNano()) + + ctx := context.Background() + + // Run concurrent liveness checks + const numGoroutines = 100 + results := make(chan bool, numGoroutines) + errors := make(chan error, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + alive, err := pm.IsLive(ctx) + results <- alive + errors <- err + }() + } + + // Collect results + aliveCount := 0 + errorCount := 0 + + for i := 0; i < numGoroutines; i++ { + alive := <-results + err := <-errors + + if err != nil { + errorCount++ + } else if alive { + aliveCount++ + } + } + + if errorCount > 0 { + t.Errorf("Expected no errors in concurrent access, got %d errors", errorCount) + } + + if aliveCount != numGoroutines { + t.Errorf("Expected %d alive results, got %d", numGoroutines, aliveCount) + } +} + +func TestPowerMonitor_IsLive_CustomTolerance(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + interval := 1 * time.Second + customTolerance := 3.5 // 3.5x interval tolerance + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(interval), + WithHealthCheckTolerance(customTolerance), + ) + + ctx := context.Background() + + // Test heartbeat that would be stale with default tolerance (2.0) but fresh with custom (3.5) + heartbeatTime := time.Now().Add(-time.Duration(float64(interval) * 3.0)) // 3x interval ago + atomic.StoreInt64(&pm.lastCollectUnixNano, heartbeatTime.UnixNano()) + + // Should be alive with custom tolerance + alive, err := pm.IsLive(ctx) + if err != nil { + t.Errorf("Expected no error with custom tolerance, got: %v", err) + } + if !alive { + t.Error("Expected alive=true with custom tolerance") + } + + // Test heartbeat beyond custom tolerance + staleTime := time.Now().Add(-time.Duration(float64(interval) * 4.0)) // 4x interval ago, beyond 3.5x + atomic.StoreInt64(&pm.lastCollectUnixNano, staleTime.UnixNano()) + + // Should not be alive beyond custom tolerance + alive, err = pm.IsLive(ctx) + if alive { + t.Error("Expected alive=false beyond custom tolerance") + } + if err == nil { + t.Error("Expected error beyond custom tolerance") + } + if !containsString(err.Error(), "tolerance=3.5x interval") { + t.Errorf("Expected custom tolerance info in error, got: %v", err) + } +} + +// Helper function +func containsString(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || + (len(substr) > 0 && len(s) > len(substr) && + (s[:len(substr)] == substr || s[len(s)-len(substr):] == substr || + func() bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false + }()))) +} \ No newline at end of file diff --git a/internal/monitor/monitor_readiness_test.go b/internal/monitor/monitor_readiness_test.go new file mode 100644 index 0000000000..568e4bdf65 --- /dev/null +++ b/internal/monitor/monitor_readiness_test.go @@ -0,0 +1,234 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package monitor + +import ( + "context" + "log/slog" + "os" + "testing" + "time" + + "github.com/sustainable-computing-io/kepler/internal/device" +) + +func TestPowerMonitor_IsReady_PassiveMode(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + // Create monitor with interval = 0 (passive mode) + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(0), // Passive mode + ) + + ctx := context.Background() + + // Should be ready immediately in passive mode + ready, err := pm.IsReady(ctx) + if err != nil { + t.Errorf("Expected no error in passive mode, got: %v", err) + } + if !ready { + t.Error("Expected ready=true in passive mode") + } + +} + +func TestPowerMonitor_IsReady_ActiveMode_NoData(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + // Create monitor with active collection + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), // Active mode + ) + + ctx := context.Background() + + // Should not be ready without data + ready, err := pm.IsReady(ctx) + if err == nil { + t.Error("Expected error when no data available") + } + if ready { + t.Error("Expected ready=false when no data available") + } + if err.Error() != "no data yet" { + t.Errorf("Expected 'no data yet' error, got: %v", err) + } + +} + +func TestPowerMonitor_IsReady_ActiveMode_WithData(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + // Create monitor with active collection + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), // Active mode + ) + + // Manually create a snapshot to simulate data collection + snapshot := NewSnapshot() + snapshot.Timestamp = time.Now() + pm.snapshot.Store(snapshot) + + ctx := context.Background() + + // Should be ready with data + ready, err := pm.IsReady(ctx) + if err != nil { + t.Errorf("Expected no error with data available, got: %v", err) + } + if !ready { + t.Error("Expected ready=true with data available") + } + +} + +func TestPowerMonitor_IsReady_StateTransitions(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), + ) + + ctx := context.Background() + + // Initial state: not ready, never been ready + ready, err := pm.IsReady(ctx) + if ready || err == nil { + t.Error("Expected not ready initially") + } + + // Add data: should become ready + snapshot := NewSnapshot() + snapshot.Timestamp = time.Now() + pm.snapshot.Store(snapshot) + + ready, err = pm.IsReady(ctx) + if !ready || err != nil { + t.Errorf("Expected ready with data, got ready=%v, err=%v", ready, err) + } + + // Remove data: should not be ready, but HasBeenReady should remain true + pm.snapshot.Store(nil) + + ready, err = pm.IsReady(ctx) + if ready || err == nil { + t.Error("Expected not ready after removing data") + } + + // Add data again: should be ready again + snapshot2 := NewSnapshot() + snapshot2.Timestamp = time.Now() + pm.snapshot.Store(snapshot2) + + ready, err = pm.IsReady(ctx) + if !ready || err != nil { + t.Errorf("Expected ready again with new data, got ready=%v, err=%v", ready, err) + } +} + +func TestPowerMonitor_IsReady_NilMonitor(t *testing.T) { + var pm *PowerMonitor = nil + ctx := context.Background() + + ready, err := pm.IsReady(ctx) + if ready { + t.Error("Expected ready=false for nil monitor") + } + if err == nil { + t.Error("Expected error for nil monitor") + } + if err.Error() != "monitor is nil" { + t.Errorf("Expected 'monitor is nil' error, got: %v", err) + } +} + + +func TestPowerMonitor_IsReady_ConcurrentAccess(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + fakeMeter, err := device.NewFakeCPUMeter([]string{"package-0"}, device.WithFakeLogger(logger)) + if err != nil { + t.Fatalf("Failed to create fake CPU meter: %v", err) + } + + pm := NewPowerMonitor( + fakeMeter, + WithLogger(logger), + WithInterval(1*time.Second), + ) + + // Add initial data + snapshot := NewSnapshot() + snapshot.Timestamp = time.Now() + pm.snapshot.Store(snapshot) + + ctx := context.Background() + + // Run concurrent readiness checks + const numGoroutines = 100 + results := make(chan bool, numGoroutines) + errors := make(chan error, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + ready, err := pm.IsReady(ctx) + results <- ready + errors <- err + }() + } + + // Collect results + readyCount := 0 + errorCount := 0 + + for i := 0; i < numGoroutines; i++ { + ready := <-results + err := <-errors + + if err != nil { + errorCount++ + } else if ready { + readyCount++ + } + } + + if errorCount > 0 { + t.Errorf("Expected no errors in concurrent access, got %d errors", errorCount) + } + + if readyCount != numGoroutines { + t.Errorf("Expected %d ready results, got %d", numGoroutines, readyCount) + } + +} + diff --git a/internal/monitor/options.go b/internal/monitor/options.go index 5aa452acc4..ef9a0d3f52 100644 --- a/internal/monitor/options.go +++ b/internal/monitor/options.go @@ -19,6 +19,7 @@ type Opts struct { maxStaleness time.Duration maxTerminated int minTerminatedEnergyThreshold Energy + healthCheckTolerance float64 } // NewConfig returns a new Config with defaults set @@ -31,6 +32,7 @@ func DefaultOpts() Opts { resources: nil, maxTerminated: 500, minTerminatedEnergyThreshold: 10 * Joule, + healthCheckTolerance: 2.0, } } @@ -79,6 +81,13 @@ func WithMaxTerminated(max int) OptionFn { } } +// WithHealthCheckTolerance sets the tolerance multiplier for health check heartbeat +func WithHealthCheckTolerance(tolerance float64) OptionFn { + return func(o *Opts) { + o.healthCheckTolerance = tolerance + } +} + // WithMinTerminatedEnergyThreshold sets the minimum energy threshold for terminated workloads func WithMinTerminatedEnergyThreshold(threshold Energy) OptionFn { return func(o *Opts) { diff --git a/internal/server/health.go b/internal/server/health.go new file mode 100644 index 0000000000..d4d2eac04e --- /dev/null +++ b/internal/server/health.go @@ -0,0 +1,125 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package server + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/sustainable-computing-io/kepler/internal/service" +) + +// HealthProbeService provides Kubernetes health probe endpoints +type HealthProbeService struct { + logger *slog.Logger + apiServer APIService + liveChecker service.LiveChecker + readyChecker service.ReadyChecker +} + +// NewHealthProbeService creates a new health probe service +func NewHealthProbeService(apiServer APIService, liveChecker service.LiveChecker, readyChecker service.ReadyChecker, logger *slog.Logger) *HealthProbeService { + return &HealthProbeService{ + logger: logger.With("service", "health-probe"), + apiServer: apiServer, + liveChecker: liveChecker, + readyChecker: readyChecker, + } +} + +func (h *HealthProbeService) Name() string { + return "health-probe" +} + +func (h *HealthProbeService) Init() error { + h.logger.Info("Initializing health probe endpoints") + + // Register liveness probe endpoint + if err := h.apiServer.Register("/probe/livez", "Liveness Probe", "Kubernetes liveness probe endpoint", h.livenessHandler()); err != nil { + return fmt.Errorf("failed to register liveness probe: %w", err) + } + + // Register readiness probe endpoint + if err := h.apiServer.Register("/probe/readyz", "Readiness Probe", "Kubernetes readiness probe endpoint", h.readinessHandler()); err != nil { + return fmt.Errorf("failed to register readiness probe: %w", err) + } + + h.logger.Info("Health probe endpoints registered successfully") + return nil +} + +// livenessHandler handles the liveness probe endpoint +func (h *HealthProbeService) livenessHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + ctx := r.Context() + + // Set appropriate headers + w.Header().Set("Content-Type", "application/json") + + alive, err := h.liveChecker.IsLive(ctx) + duration := time.Since(start) + + response := map[string]any{ + "status": "ok", + "timestamp": start.UTC().Format(time.RFC3339), + "duration": duration.String(), + } + + if err != nil || !alive { + w.WriteHeader(http.StatusServiceUnavailable) + response["status"] = "error" + if err != nil { + response["error"] = err.Error() + } + h.logger.Error("Liveness check failed", "error", err, "duration", duration) + } else { + w.WriteHeader(http.StatusOK) + h.logger.Debug("Liveness check passed", "duration", duration) + } + + if err := json.NewEncoder(w).Encode(response); err != nil { + h.logger.Error("Failed to encode liveness response", "error", err) + } + }) +} + +// readinessHandler handles the readiness probe endpoint +func (h *HealthProbeService) readinessHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + ctx := r.Context() + + // Set appropriate headers + w.Header().Set("Content-Type", "application/json") + + ready, err := h.readyChecker.IsReady(ctx) + duration := time.Since(start) + + response := map[string]any{ + "status": "ok", + "timestamp": start.UTC().Format(time.RFC3339), + "duration": duration.String(), + } + + if err != nil || !ready { + w.WriteHeader(http.StatusServiceUnavailable) + response["status"] = "error" + if err != nil { + response["error"] = err.Error() + } + h.logger.Error("Readiness check failed", "error", err, "duration", duration) + } else { + w.WriteHeader(http.StatusOK) + h.logger.Debug("Readiness check passed", "duration", duration) + } + + if err := json.NewEncoder(w).Encode(response); err != nil { + h.logger.Error("Failed to encode readiness response", "error", err) + } + }) +} \ No newline at end of file diff --git a/internal/server/health_test.go b/internal/server/health_test.go new file mode 100644 index 0000000000..9ca2718ab9 --- /dev/null +++ b/internal/server/health_test.go @@ -0,0 +1,176 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package server + +import ( + "context" + "errors" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" +) + +// Mock implementations for testing +type mockLiveChecker struct { + alive bool + err error +} + +func (m *mockLiveChecker) IsLive(ctx context.Context) (bool, error) { + return m.alive, m.err +} + +type mockReadyChecker struct { + ready bool + err error +} + +func (m *mockReadyChecker) IsReady(ctx context.Context) (bool, error) { + return m.ready, m.err +} + +type mockAPIServer struct { + handlers map[string]http.Handler +} + +func (m *mockAPIServer) Name() string { + return "mock-api-server" +} + +func (m *mockAPIServer) Register(endpoint, summary, description string, handler http.Handler) error { + if m.handlers == nil { + m.handlers = make(map[string]http.Handler) + } + m.handlers[endpoint] = handler + return nil +} + +func TestHealthProbeService_Init(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + apiServer := &mockAPIServer{} + liveChecker := &mockLiveChecker{alive: true} + readyChecker := &mockReadyChecker{ready: true} + + healthService := NewHealthProbeService(apiServer, liveChecker, readyChecker, logger) + + err := healthService.Init() + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Check that endpoints were registered + if len(apiServer.handlers) != 2 { + t.Fatalf("Expected 2 handlers registered, got %d", len(apiServer.handlers)) + } + + if _, exists := apiServer.handlers["/probe/livez"]; !exists { + t.Error("Liveness probe handler not registered") + } + + if _, exists := apiServer.handlers["/probe/readyz"]; !exists { + t.Error("Readiness probe handler not registered") + } +} + +func TestLivenessHandler_Success(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + apiServer := &mockAPIServer{} + liveChecker := &mockLiveChecker{alive: true} + readyChecker := &mockReadyChecker{ready: true} + + healthService := NewHealthProbeService(apiServer, liveChecker, readyChecker, logger) + err := healthService.Init() + if err != nil { + t.Fatalf("Failed to initialize health service: %v", err) + } + + handler := apiServer.handlers["/probe/livez"] + req := httptest.NewRequest("GET", "/probe/livez", nil) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + if contentType := w.Header().Get("Content-Type"); contentType != "application/json" { + t.Errorf("Expected Content-Type application/json, got %s", contentType) + } +} + +func TestLivenessHandler_Failure(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + apiServer := &mockAPIServer{} + liveChecker := &mockLiveChecker{alive: false, err: errors.New("service is down")} + readyChecker := &mockReadyChecker{ready: true} + + healthService := NewHealthProbeService(apiServer, liveChecker, readyChecker, logger) + err := healthService.Init() + if err != nil { + t.Fatalf("Failed to initialize health service: %v", err) + } + + handler := apiServer.handlers["/probe/livez"] + req := httptest.NewRequest("GET", "/probe/livez", nil) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("Expected status %d, got %d", http.StatusServiceUnavailable, w.Code) + } +} + +func TestReadinessHandler_Success(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + apiServer := &mockAPIServer{} + liveChecker := &mockLiveChecker{alive: true} + readyChecker := &mockReadyChecker{ready: true} + + healthService := NewHealthProbeService(apiServer, liveChecker, readyChecker, logger) + err := healthService.Init() + if err != nil { + t.Fatalf("Failed to initialize health service: %v", err) + } + + handler := apiServer.handlers["/probe/readyz"] + req := httptest.NewRequest("GET", "/probe/readyz", nil) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + if contentType := w.Header().Get("Content-Type"); contentType != "application/json" { + t.Errorf("Expected Content-Type application/json, got %s", contentType) + } +} + +func TestReadinessHandler_Failure(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + apiServer := &mockAPIServer{} + liveChecker := &mockLiveChecker{alive: true} + readyChecker := &mockReadyChecker{ready: false, err: errors.New("service not ready")} + + healthService := NewHealthProbeService(apiServer, liveChecker, readyChecker, logger) + err := healthService.Init() + if err != nil { + t.Fatalf("Failed to initialize health service: %v", err) + } + + handler := apiServer.handlers["/probe/readyz"] + req := httptest.NewRequest("GET", "/probe/readyz", nil) + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("Expected status %d, got %d", http.StatusServiceUnavailable, w.Code) + } +} \ No newline at end of file diff --git a/internal/service/service.go b/internal/service/service.go index 78e9a9725d..178e2e8ced 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -30,3 +30,15 @@ type Shutdowner interface { // Shutdown shuts down the service Shutdown() error } + +// LiveChecker is the interface for checking if a service is alive +type LiveChecker interface { + // IsLive returns whether the service is alive and responsive + IsLive(ctx context.Context) (bool, error) +} + +// ReadyChecker is the interface for checking if a service is ready to serve traffic +type ReadyChecker interface { + // IsReady returns whether the service is ready to handle requests + IsReady(ctx context.Context) (bool, error) +} diff --git a/manifests/helm/kepler/CHANGELOG.md b/manifests/helm/kepler/CHANGELOG.md new file mode 100644 index 0000000000..3f8030e485 --- /dev/null +++ b/manifests/helm/kepler/CHANGELOG.md @@ -0,0 +1,24 @@ +# Changelog + +All notable changes to the Kepler Helm chart will be documented in this file. + +## [Unreleased] + +### Added +- Dedicated health check endpoints for Kubernetes probes + - Liveness probe now uses `/probe/livez` instead of `/metrics` + - Readiness probe now uses `/probe/readyz` instead of being disabled + - Improved probe timing and failure thresholds for better reliability + +### Changed +- **BREAKING**: Health probes now use dedicated endpoints instead of `/metrics` + - Liveness probe: `/metrics` → `/probe/livez` + - Readiness probe: enabled with `/probe/readyz` +- Reduced liveness probe period from 60s to 30s for faster failure detection +- Added readiness probe with 10s period for better traffic management + +### Technical Details +- Liveness probe checks if the monitor service is alive and collection is working +- Readiness probe checks if the monitor has data available to serve +- Both probes support passive mode (interval=0) and active collection modes +- Probe responses include JSON with status, timestamp, and duration information \ No newline at end of file diff --git a/manifests/helm/kepler/values.yaml b/manifests/helm/kepler/values.yaml index 68ccfcce21..c6a976d279 100644 --- a/manifests/helm/kepler/values.yaml +++ b/manifests/helm/kepler/values.yaml @@ -54,12 +54,21 @@ daemonset: livenessProbe: httpGet: - path: /metrics + path: /probe/livez port: http initialDelaySeconds: 10 - periodSeconds: 60 + periodSeconds: 30 + timeoutSeconds: 5 + failureThreshold: 3 - readinessProbe: {} + readinessProbe: + httpGet: + path: /probe/readyz + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 config: log: