From a0aa9b5ba963508616c89aeb6a25cc42f8a7bf7f Mon Sep 17 00:00:00 2001 From: Vimal Kumar Date: Wed, 1 Oct 2025 21:02:44 +0530 Subject: [PATCH] feat: add health check probes for k8s liveness and readiness Implement health probe endpoints following k8s best practices: - Add LiveChecker and ReadyChecker interfaces to service framework - Implement IsLive() and IsReady() methods in PowerMonitor using atomic operations - Create HealthProbe service with /probe/livez and /probe/readyz endpoints - Update K8s daemonset to use new health endpoints instead of /metrics - unit tests for health check Signed-off-by: Vimal Kumar --- cmd/kepler/main.go | 4 + internal/monitor/monitor.go | 29 +++ internal/monitor/monitor_health_test.go | 203 +++++++++++++++++++++ internal/server/healthprobe.go | 149 +++++++++++++++ internal/server/healthprobe_test.go | 231 ++++++++++++++++++++++++ internal/service/service.go | 14 ++ manifests/k8s/daemonset.yaml | 12 +- 7 files changed, 641 insertions(+), 1 deletion(-) create mode 100644 internal/monitor/monitor_health_test.go create mode 100644 internal/server/healthprobe.go create mode 100644 internal/server/healthprobe_test.go diff --git a/cmd/kepler/main.go b/cmd/kepler/main.go index 02c43377b7..e86cd5d66c 100644 --- a/cmd/kepler/main.go +++ b/cmd/kepler/main.go @@ -206,6 +206,10 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service, services = append(services, stdoutExporter) } + // Add health probe endpoints + healthProbe := server.NewHealthProbe(apiServer, services, logger) + services = append(services, healthProbe) + return services, nil } diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 0cc1ae7fdb..d9e35b0852 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -78,6 +78,10 @@ type PowerMonitor struct { // For managing the collection loop collectionCtx context.Context collectionCancel context.CancelFunc + + // Health tracking + initialized atomic.Bool + fatalError atomic.Bool } var _ Service = (*PowerMonitor)(nil) @@ -146,6 +150,9 @@ func (pm *PowerMonitor) Init() error { // signal now so that exporters can construct descriptors pm.signalNewData() + // Mark as initialized for health checks + pm.initialized.Store(true) + return nil } @@ -429,3 +436,25 @@ func (pm *PowerMonitor) calculatePower(prev, newSnapshot *Snapshot) error { return nil } + +// IsLive returns true if the monitor is initialized and has not encountered fatal errors +func (pm *PowerMonitor) IsLive() bool { + return pm.initialized.Load() && !pm.fatalError.Load() +} + +// IsReady returns true if the monitor is live and has collected at least one valid snapshot +// that is not stale +func (pm *PowerMonitor) IsReady() bool { + if !pm.IsLive() { + return false + } + + snapshot := pm.snapshot.Load() + if snapshot == nil || snapshot.Timestamp.IsZero() { + return false + } + + // Check if data is fresh (not stale) + age := pm.clock.Now().Sub(snapshot.Timestamp) + return age <= pm.maxStaleness +} diff --git a/internal/monitor/monitor_health_test.go b/internal/monitor/monitor_health_test.go new file mode 100644 index 0000000000..238a2ac74c --- /dev/null +++ b/internal/monitor/monitor_health_test.go @@ -0,0 +1,203 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package monitor + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/sustainable-computing-io/kepler/internal/device" + testingclock "k8s.io/utils/clock/testing" +) + +func TestPowerMonitor_IsLive_NotInitialized(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + pm := NewPowerMonitor(meter) + + // Before initialization, should not be live + assert.False(t, pm.IsLive()) +} + +func TestPowerMonitor_IsLive_Initialized(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + pm := NewPowerMonitor(meter) + + err = pm.Init() + assert.NoError(t, err) + + // After initialization, should be live + assert.True(t, pm.IsLive()) +} + +func TestPowerMonitor_IsLive_FatalError(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + pm := NewPowerMonitor(meter) + + err = pm.Init() + assert.NoError(t, err) + + // Simulate fatal error + pm.fatalError.Store(true) + + // Should not be live when fatal error occurred + assert.False(t, pm.IsLive()) +} + +func TestPowerMonitor_IsReady_NotInitialized(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + pm := NewPowerMonitor(meter) + + // Before initialization, should not be ready + assert.False(t, pm.IsReady()) +} + +func TestPowerMonitor_IsReady_InitializedNoSnapshot(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + pm := NewPowerMonitor(meter) + + err = pm.Init() + assert.NoError(t, err) + + // After initialization but before first snapshot, should not be ready + assert.False(t, pm.IsReady()) +} + +func TestPowerMonitor_IsReady_WithFreshSnapshot(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + fakeClock := testingclock.NewFakeClock(time.Now()) + + pm := NewPowerMonitor(meter, + WithClock(fakeClock), + WithMaxStaleness(10*time.Second), + ) + + initErr := pm.Init() + assert.NoError(t, initErr) + + // Create a snapshot + snapshot := NewSnapshot() + snapshot.Timestamp = fakeClock.Now() + pm.snapshot.Store(snapshot) + + // Should be ready with fresh snapshot + assert.True(t, pm.IsReady()) +} + +func TestPowerMonitor_IsReady_WithStaleSnapshot(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + fakeClock := testingclock.NewFakeClock(time.Now()) + + pm := NewPowerMonitor(meter, + WithClock(fakeClock), + WithMaxStaleness(10*time.Second), + ) + + err = pm.Init() + assert.NoError(t, err) + + // Create a snapshot in the past + snapshot := NewSnapshot() + snapshot.Timestamp = fakeClock.Now() + pm.snapshot.Store(snapshot) + + // Advance clock beyond staleness threshold + fakeClock.Step(15 * time.Second) + + // Should not be ready with stale snapshot + assert.False(t, pm.IsReady()) +} + +func TestPowerMonitor_IsReady_NotLive(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + fakeClock := testingclock.NewFakeClock(time.Now()) + + pm := NewPowerMonitor(meter, + WithClock(fakeClock), + WithMaxStaleness(10*time.Second), + ) + + err = pm.Init() + assert.NoError(t, err) + + // Create a fresh snapshot + snapshot := NewSnapshot() + snapshot.Timestamp = fakeClock.Now() + pm.snapshot.Store(snapshot) + + // Simulate fatal error (not live) + pm.fatalError.Store(true) + + // Should not be ready if not live, even with fresh snapshot + assert.False(t, pm.IsReady()) +} + +func TestPowerMonitor_IsReady_ZeroTimestamp(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + fakeClock := testingclock.NewFakeClock(time.Now()) + + pm := NewPowerMonitor(meter, + WithClock(fakeClock), + WithMaxStaleness(10*time.Second), + ) + + err = pm.Init() + assert.NoError(t, err) + + // Create a snapshot with zero timestamp + snapshot := NewSnapshot() + snapshot.Timestamp = time.Time{} // Zero value + pm.snapshot.Store(snapshot) + + // Should not be ready with zero timestamp + assert.False(t, pm.IsReady()) +} + +func TestPowerMonitor_HealthCheck_Integration(t *testing.T) { + meter, err := device.NewFakeCPUMeter([]string{"pkg"}) + assert.NoError(t, err) + fakeClock := testingclock.NewFakeClock(time.Now()) + + pm := NewPowerMonitor(meter, + WithClock(fakeClock), + WithMaxStaleness(10*time.Second), + WithInterval(0), // No automatic collection + ) + + // Initially not live or ready + assert.False(t, pm.IsLive()) + assert.False(t, pm.IsReady()) + + // After init, live but not ready + initErr := pm.Init() + assert.NoError(t, initErr) + assert.True(t, pm.IsLive()) + assert.False(t, pm.IsReady()) + + // After first snapshot, both live and ready + snapshot := NewSnapshot() + snapshot.Timestamp = fakeClock.Now() + pm.snapshot.Store(snapshot) + assert.True(t, pm.IsLive()) + assert.True(t, pm.IsReady()) + + // After data becomes stale, live but not ready + fakeClock.Step(15 * time.Second) + assert.True(t, pm.IsLive()) + assert.False(t, pm.IsReady()) + + // After fatal error, neither live nor ready + pm.fatalError.Store(true) + assert.False(t, pm.IsLive()) + assert.False(t, pm.IsReady()) +} diff --git a/internal/server/healthprobe.go b/internal/server/healthprobe.go new file mode 100644 index 0000000000..88c9c6a73b --- /dev/null +++ b/internal/server/healthprobe.go @@ -0,0 +1,149 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package server + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + + "github.com/sustainable-computing-io/kepler/internal/service" +) + +// HealthProbe provides health check endpoints for Kubernetes probes +type HealthProbe struct { + logger *slog.Logger + apiServer APIService + services []service.Service +} + +// ServiceHealth represents the health status of a single service +type ServiceHealth struct { + Name string `json:"name"` + Live bool `json:"live,omitempty"` + Ready bool `json:"ready,omitempty"` +} + +// HealthStatus represents the overall health status +type HealthStatus struct { + Status string `json:"status"` // "ok" or "unhealthy" + Services []ServiceHealth `json:"services,omitempty"` +} + +// NewHealthProbe creates a new HealthProbe service +func NewHealthProbe(apiServer APIService, services []service.Service, logger *slog.Logger) *HealthProbe { + return &HealthProbe{ + logger: logger.With("service", "health-probe"), + apiServer: apiServer, + services: services, + } +} + +func (h *HealthProbe) Name() string { + return "health-probe" +} + +func (h *HealthProbe) Init() error { + h.logger.Info("Initializing health probe endpoints") + + // Register liveness probe endpoint + if err := h.apiServer.Register( + "/probe/livez", + "Liveness Probe", + "Returns 200 if all services are alive", + http.HandlerFunc(h.handleLiveness), + ); err != nil { + return err + } + + // Register readiness probe endpoint + if err := h.apiServer.Register( + "/probe/readyz", + "Readiness Probe", + "Returns 200 if all services are ready", + http.HandlerFunc(h.handleReadiness), + ); err != nil { + return err + } + + h.logger.Info("Health probe endpoints registered") + return nil +} + +func (h *HealthProbe) Run(ctx context.Context) error { + // Health probe doesn't need to run in the background + // It only responds to HTTP requests via the API server + <-ctx.Done() + return nil +} + +// handleLiveness checks if all services implementing LiveChecker are alive +func (h *HealthProbe) handleLiveness(w http.ResponseWriter, r *http.Request) { + status := HealthStatus{ + Status: "ok", + Services: make([]ServiceHealth, 0), + } + + allLive := true + for _, svc := range h.services { + if liveChecker, ok := svc.(service.LiveChecker); ok { + isLive := liveChecker.IsLive() + status.Services = append(status.Services, ServiceHealth{ + Name: svc.Name(), + Live: isLive, + }) + if !isLive { + allLive = false + } + } + } + + if !allLive { + status.Status = "unhealthy" + h.writeJSONResponse(w, http.StatusServiceUnavailable, status) + return + } + + h.writeJSONResponse(w, http.StatusOK, status) +} + +// handleReadiness checks if all services implementing ReadyChecker are ready +func (h *HealthProbe) handleReadiness(w http.ResponseWriter, r *http.Request) { + status := HealthStatus{ + Status: "ok", + Services: make([]ServiceHealth, 0), + } + + allReady := true + for _, svc := range h.services { + if readyChecker, ok := svc.(service.ReadyChecker); ok { + isReady := readyChecker.IsReady() + status.Services = append(status.Services, ServiceHealth{ + Name: svc.Name(), + Ready: isReady, + }) + if !isReady { + allReady = false + } + } + } + + if !allReady { + status.Status = "unhealthy" + h.writeJSONResponse(w, http.StatusServiceUnavailable, status) + return + } + + h.writeJSONResponse(w, http.StatusOK, status) +} + +// writeJSONResponse writes a JSON response with the given status code +func (h *HealthProbe) writeJSONResponse(w http.ResponseWriter, statusCode int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + if err := json.NewEncoder(w).Encode(data); err != nil { + h.logger.Error("failed to encode JSON response", "error", err) + } +} diff --git a/internal/server/healthprobe_test.go b/internal/server/healthprobe_test.go new file mode 100644 index 0000000000..bd7d54aefd --- /dev/null +++ b/internal/server/healthprobe_test.go @@ -0,0 +1,231 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package server + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/sustainable-computing-io/kepler/internal/service" +) + +// mockService implements service.Service, service.LiveChecker, and service.ReadyChecker +type mockService struct { + name string + live bool + ready bool +} + +func (m *mockService) Name() string { + return m.name +} + +func (m *mockService) IsLive() bool { + return m.live +} + +func (m *mockService) IsReady() bool { + return m.ready +} + +// mockAPIServer implements APIService for testing +type mockAPIServer struct { + handlers map[string]http.Handler +} + +func newMockAPIServer() *mockAPIServer { + return &mockAPIServer{ + handlers: make(map[string]http.Handler), + } +} + +func (m *mockAPIServer) Name() string { + return "mock-api-server" +} + +func (m *mockAPIServer) Register(endpoint, summary, description string, handler http.Handler) error { + m.handlers[endpoint] = handler + return nil +} + +func (m *mockAPIServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if handler, ok := m.handlers[r.URL.Path]; ok { + handler.ServeHTTP(w, r) + return + } + http.NotFound(w, r) +} + +func TestHealthProbe_Init(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + services := []service.Service{ + &mockService{name: "test-service", live: true, ready: true}, + } + + hp := NewHealthProbe(apiServer, services, logger) + err := hp.Init() + + require.NoError(t, err) + assert.Contains(t, apiServer.handlers, "/probe/livez") + assert.Contains(t, apiServer.handlers, "/probe/readyz") +} + +func TestHealthProbe_Liveness_AllLive(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + services := []service.Service{ + &mockService{name: "service1", live: true, ready: true}, + &mockService{name: "service2", live: true, ready: false}, + } + + hp := NewHealthProbe(apiServer, services, logger) + require.NoError(t, hp.Init()) + + req := httptest.NewRequest(http.MethodGet, "/probe/livez", nil) + w := httptest.NewRecorder() + apiServer.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var status HealthStatus + err := json.NewDecoder(w.Body).Decode(&status) + require.NoError(t, err) + assert.Equal(t, "ok", status.Status) + assert.Len(t, status.Services, 2) + assert.True(t, status.Services[0].Live) + assert.True(t, status.Services[1].Live) +} + +func TestHealthProbe_Liveness_SomeUnhealthy(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + services := []service.Service{ + &mockService{name: "service1", live: true, ready: true}, + &mockService{name: "service2", live: false, ready: false}, + } + + hp := NewHealthProbe(apiServer, services, logger) + require.NoError(t, hp.Init()) + + req := httptest.NewRequest(http.MethodGet, "/probe/livez", nil) + w := httptest.NewRecorder() + apiServer.ServeHTTP(w, req) + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + var status HealthStatus + err := json.NewDecoder(w.Body).Decode(&status) + require.NoError(t, err) + assert.Equal(t, "unhealthy", status.Status) + assert.Len(t, status.Services, 2) + assert.True(t, status.Services[0].Live) + assert.False(t, status.Services[1].Live) +} + +func TestHealthProbe_Readiness_AllReady(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + services := []service.Service{ + &mockService{name: "service1", live: true, ready: true}, + &mockService{name: "service2", live: true, ready: true}, + } + + hp := NewHealthProbe(apiServer, services, logger) + require.NoError(t, hp.Init()) + + req := httptest.NewRequest(http.MethodGet, "/probe/readyz", nil) + w := httptest.NewRecorder() + apiServer.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var status HealthStatus + err := json.NewDecoder(w.Body).Decode(&status) + require.NoError(t, err) + assert.Equal(t, "ok", status.Status) + assert.Len(t, status.Services, 2) + assert.True(t, status.Services[0].Ready) + assert.True(t, status.Services[1].Ready) +} + +func TestHealthProbe_Readiness_SomeNotReady(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + services := []service.Service{ + &mockService{name: "service1", live: true, ready: true}, + &mockService{name: "service2", live: true, ready: false}, + } + + hp := NewHealthProbe(apiServer, services, logger) + require.NoError(t, hp.Init()) + + req := httptest.NewRequest(http.MethodGet, "/probe/readyz", nil) + w := httptest.NewRecorder() + apiServer.ServeHTTP(w, req) + + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + var status HealthStatus + err := json.NewDecoder(w.Body).Decode(&status) + require.NoError(t, err) + assert.Equal(t, "unhealthy", status.Status) + assert.Len(t, status.Services, 2) + assert.True(t, status.Services[0].Ready) + assert.False(t, status.Services[1].Ready) +} + +// simpleService is a test service that doesn't implement any health checker interfaces +type simpleService struct { + name string +} + +func (s *simpleService) Name() string { + return s.name +} + +func TestHealthProbe_NoHealthCheckers(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + + // Service that doesn't implement any health checker interfaces + svc := &simpleService{name: "simple"} + services := []service.Service{svc} + + hp := NewHealthProbe(apiServer, services, logger) + require.NoError(t, hp.Init()) + + // Test liveness - should return ok with no services + req := httptest.NewRequest(http.MethodGet, "/probe/livez", nil) + w := httptest.NewRecorder() + apiServer.ServeHTTP(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var status HealthStatus + err := json.NewDecoder(w.Body).Decode(&status) + require.NoError(t, err) + assert.Equal(t, "ok", status.Status) + assert.Len(t, status.Services, 0) +} + +func TestHealthProbe_Run(t *testing.T) { + logger := slog.Default() + apiServer := newMockAPIServer() + services := []service.Service{} + + hp := NewHealthProbe(apiServer, services, logger) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + err := hp.Run(ctx) + assert.NoError(t, err) +} diff --git a/internal/service/service.go b/internal/service/service.go index 78e9a9725d..c73823f62b 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -30,3 +30,17 @@ type Shutdowner interface { // Shutdown shuts down the service Shutdown() error } + +// LiveChecker is the interface that services can implement to provide liveness status +type LiveChecker interface { + Service + // IsLive returns true if the service is alive (initialized and running without fatal errors) + IsLive() bool +} + +// ReadyChecker is the interface that services can implement to provide readiness status +type ReadyChecker interface { + Service + // IsReady returns true if the service is ready to serve requests (has data and is not stale) + IsReady() bool +} diff --git a/manifests/k8s/daemonset.yaml b/manifests/k8s/daemonset.yaml index 8c526a4d71..4fa1ba8d42 100644 --- a/manifests/k8s/daemonset.yaml +++ b/manifests/k8s/daemonset.yaml @@ -54,10 +54,20 @@ spec: readOnly: true livenessProbe: httpGet: - path: /metrics + path: /probe/livez port: http initialDelaySeconds: 10 periodSeconds: 60 + timeoutSeconds: 5 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /probe/readyz + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 env: - name: NODE_NAME valueFrom: