diff --git a/changelog/fragments/1756325090-Fix-missing-liveness-healthcheck-during-container-enrollment.yaml b/changelog/fragments/1756325090-Fix-missing-liveness-healthcheck-during-container-enrollment.yaml new file mode 100644 index 00000000000..60ffd90a769 --- /dev/null +++ b/changelog/fragments/1756325090-Fix-missing-liveness-healthcheck-during-container-enrollment.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Fix missing liveness healthcheck during container enrollment + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/9612 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/9611 diff --git a/internal/pkg/agent/application/monitoring/liveness.go b/internal/pkg/agent/application/monitoring/liveness.go index a0ffdb43feb..949ce449f6e 100644 --- a/internal/pkg/agent/application/monitoring/liveness.go +++ b/internal/pkg/agent/application/monitoring/liveness.go @@ -56,6 +56,13 @@ func livenessHandler(coord CoordinatorState) func(http.ResponseWriter, *http.Req return func(w http.ResponseWriter, r *http.Request) error { w.Header().Set("Content-Type", "application/json; charset=utf-8") + if coord == nil { + // no coordinator, then that means we are in the container enrollment mode + // at this point the container is healthy and trying to enroll, so return 200 + w.WriteHeader(http.StatusOK) + return nil + } + state := coord.State() isUp := coord.IsActive(time.Second * 10) // the coordinator check is always on, so if that fails, always return false diff --git a/internal/pkg/agent/application/monitoring/liveness_test.go b/internal/pkg/agent/application/monitoring/liveness_test.go index abe3bb94cc9..30f30bc65cd 100644 --- a/internal/pkg/agent/application/monitoring/liveness_test.go +++ b/internal/pkg/agent/application/monitoring/liveness_test.go @@ -36,7 +36,7 @@ func (mc mockCoordinator) IsActive(_ time.Duration) bool { return mc.isUp } -func TestProcessHTTPHandler(t *testing.T) { +func TestLivenessProcessHTTPHandler(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() testCases := []struct { @@ -46,6 +46,12 @@ func TestProcessHTTPHandler(t *testing.T) { liveness bool failon string }{ + { + name: "healthy-nocoord", + expectedCode: 200, + liveness: true, + failon: "heartbeat", + }, { name: "default-failed", coord: mockCoordinator{ diff --git a/internal/pkg/agent/application/monitoring/process.go b/internal/pkg/agent/application/monitoring/process.go index 215b02938e3..5413c4ac069 100644 --- a/internal/pkg/agent/application/monitoring/process.go +++ b/internal/pkg/agent/application/monitoring/process.go @@ -43,7 +43,7 @@ var redirectableProcesses = []string{ profilingServicePrefix, } -func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWriter, *http.Request) error, operatingSystem string) func(http.ResponseWriter, *http.Request) error { +func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error { return func(w http.ResponseWriter, r *http.Request) error { w.Header().Set("Content-Type", "application/json; charset=utf-8") @@ -76,7 +76,7 @@ func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWrite metricsPath = "stats" } - return redirectToPath(w, r, componentID, metricsPath, operatingSystem) + return redirectToPath(w, r, componentID, metricsPath) } state := coord.State() @@ -119,7 +119,7 @@ func isProcessRedirectable(componentID string) bool { return false } -func redirectToPath(w http.ResponseWriter, r *http.Request, id, path, operatingSystem string) error { +func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error { endpoint := PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir())) metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path) if metricsErr != nil { diff --git a/internal/pkg/agent/application/monitoring/readiness.go b/internal/pkg/agent/application/monitoring/readiness.go new file mode 100644 index 00000000000..64bf8300268 --- /dev/null +++ b/internal/pkg/agent/application/monitoring/readiness.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package monitoring + +import ( + "net/http" + "time" +) + +// readinessHandler returns an HTTP handler function that checks the readiness of the service. +// If a CoordinatorState is provided, it checks if the coordinator is active within a 10-second timeout. +// +// This is meant to be a very simple check, just ensure that the service is up and running. For more detail +// about the liveness of the service use the livenessHandler. +func readinessHandler(coord CoordinatorState) func(http.ResponseWriter, *http.Request) error { + return func(w http.ResponseWriter, r *http.Request) error { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + if coord != nil { + // to be ready the coordinator must be active + if !coord.IsActive(time.Second * 10) { + w.WriteHeader(http.StatusServiceUnavailable) + return nil + } + } + + w.WriteHeader(http.StatusOK) + return nil + } +} diff --git a/internal/pkg/agent/application/monitoring/readiness_test.go b/internal/pkg/agent/application/monitoring/readiness_test.go new file mode 100644 index 00000000000..16bdb7631db --- /dev/null +++ b/internal/pkg/agent/application/monitoring/readiness_test.go @@ -0,0 +1,57 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package monitoring + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestReadinessProcessHTTPHandler(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + testCases := []struct { + name string + coord mockCoordinator + expectedCode int + liveness bool + }{ + { + name: "healthy-nocoord", + expectedCode: 200, + liveness: true, + }, + { + name: "healthy", + expectedCode: 200, + liveness: true, + }, + { + name: "unhealthy", + expectedCode: 503, + liveness: false, + }, + } + + // test with processesHandler + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + testSrv := httptest.NewServer(createHandler(readinessHandler(test.coord))) + defer testSrv.Close() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, testSrv.URL, nil) + require.NoError(t, err) + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + res.Body.Close() + }) + } + +} diff --git a/internal/pkg/agent/application/monitoring/server.go b/internal/pkg/agent/application/monitoring/server.go index 5de7f52e7d1..e8fd93f5987 100644 --- a/internal/pkg/agent/application/monitoring/server.go +++ b/internal/pkg/agent/application/monitoring/server.go @@ -5,6 +5,7 @@ package monitoring import ( + "fmt" "net/http" _ "net/http/pprof" //nolint:gosec // this is only conditionally exposed "net/url" @@ -25,30 +26,12 @@ import ( "github.com/elastic/elastic-agent/pkg/core/logger" ) -// New creates a new server exposing metrics and process information. +// NewServer creates a new server exposing metrics and process information. func NewServer( log *logger.Logger, - endpointConfig api.Config, ns func(string) *monitoring.Namespace, tracer *apm.Tracer, coord CoordinatorState, - operatingSystem string, - mcfg *monitoringCfg.MonitoringConfig, -) (*reload.ServerReloader, error) { - if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil { - // log but ignore - log.Warnf("failed to create monitoring drop: %v", err) - } - - return exposeMetricsEndpoint(log, ns, tracer, coord, operatingSystem, mcfg) -} - -func exposeMetricsEndpoint( - log *logger.Logger, - ns func(string) *monitoring.Namespace, - tracer *apm.Tracer, - coord CoordinatorState, - operatingSystem string, mcfg *monitoringCfg.MonitoringConfig, ) (*reload.ServerReloader, error) { @@ -66,15 +49,15 @@ func exposeMetricsEndpoint( statsHandler := statsHandler(statNs) r.Handle("/stats", createHandler(statsHandler)) + r.Handle("/readiness", createHandler(readinessHandler(coord))) + r.Handle("/liveness", createHandler(livenessHandler(coord))) if isProcessStatsEnabled(cfg) { log.Infof("process monitoring is enabled, creating monitoring endpoints") r.Handle("/processes", createHandler(processesHandler(coord))) - r.Handle("/processes/{componentID}", createHandler(processHandler(coord, statsHandler, operatingSystem))) - r.Handle("/processes/{componentID}/", createHandler(processHandler(coord, statsHandler, operatingSystem))) - r.Handle("/processes/{componentID}/{metricsPath}", createHandler(processHandler(coord, statsHandler, operatingSystem))) - - r.Handle("/liveness", createHandler(livenessHandler(coord))) + r.Handle("/processes/{componentID}", createHandler(processHandler(coord, statsHandler))) + r.Handle("/processes/{componentID}/", createHandler(processHandler(coord, statsHandler))) + r.Handle("/processes/{componentID}/{metricsPath}", createHandler(processHandler(coord, statsHandler))) } if isPprofEnabled(cfg) { @@ -91,9 +74,13 @@ func exposeMetricsEndpoint( srvCfg := api.DefaultConfig() srvCfg.Enabled = cfg.Enabled - srvCfg.Host = AgentMonitoringEndpoint(operatingSystem, cfg) + srvCfg.Host = AgentMonitoringEndpoint(cfg) srvCfg.Port = cfg.HTTP.Port log.Infof("creating monitoring API with cfg %#v", srvCfg) + if err := createAgentMonitoringDrop(srvCfg.Host); err != nil { + // if it cannot create the path for the socket, then we cannot start the server + return nil, fmt.Errorf("failed to create monitoring socket directory: %w", err) + } apiServer, err := api.NewFromConfig(log, mux, srvCfg) if err != nil { return nil, errors.New(err, "failed to create api server") diff --git a/internal/pkg/agent/application/monitoring/server_test.go b/internal/pkg/agent/application/monitoring/server_test.go index 7375f37c8fa..0e3b713ea52 100644 --- a/internal/pkg/agent/application/monitoring/server_test.go +++ b/internal/pkg/agent/application/monitoring/server_test.go @@ -15,7 +15,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-libs/api" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator" "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload" @@ -92,24 +91,24 @@ func TestHTTPReloadEnableBehavior(t *testing.T) { for _, testCase := range cases { t.Run(testCase.name, func(t *testing.T) { - serverReloader, err := NewServer(logp.L(), api.Config{}, nil, nil, fakeCoordCfg, "linux", testCase.initConfig) + serverReloader, err := NewServer(logp.L(), nil, nil, fakeCoordCfg, testCase.initConfig) require.NoError(t, err) t.Logf("starting server...") serverReloader.Start() if testCase.httpOnAtInit { - waitOnReturnCode(t, http.StatusOK, "liveness", "?failon=failed", serverReloader) + waitOnReturnCode(t, http.StatusOK, "processes", "", serverReloader) } else { - waitOnReturnCode(t, http.StatusNotFound, "liveness", "?failon=failed", serverReloader) + waitOnReturnCode(t, http.StatusNotFound, "processes", "", serverReloader) } err = serverReloader.Reload(testCase.secondConfig) require.NoError(t, err) if testCase.httpOnAfterReload { - waitOnReturnCode(t, http.StatusOK, "liveness", "?failon=failed", serverReloader) + waitOnReturnCode(t, http.StatusOK, "processes", "", serverReloader) } else { - waitOnReturnCode(t, http.StatusNotFound, "liveness", "?failon=failed", serverReloader) + waitOnReturnCode(t, http.StatusNotFound, "processes", "", serverReloader) } }) @@ -117,8 +116,6 @@ func TestHTTPReloadEnableBehavior(t *testing.T) { } func TestBasicLivenessConfig(t *testing.T) { - _ = logp.DevelopmentSetup() - testAPIConfig := api.Config{} testConfig := config.MonitoringConfig{ Enabled: true, HTTP: &config.MonitoringHTTPConfig{ @@ -126,7 +123,9 @@ func TestBasicLivenessConfig(t *testing.T) { Port: 0, }, } - serverReloader, err := NewServer(logp.L(), testAPIConfig, nil, nil, fakeCoordCfg, "linux", &testConfig) + logger, err := logp.NewDevelopmentLogger("") + require.NoError(t, err) + serverReloader, err := NewServer(logger, nil, nil, fakeCoordCfg, &testConfig) require.NoError(t, err) t.Logf("starting server...") @@ -143,8 +142,6 @@ func TestBasicLivenessConfig(t *testing.T) { } func TestPprofEnabled(t *testing.T) { - _ = logp.DevelopmentSetup() - testAPIConfig := api.Config{} testConfig := config.MonitoringConfig{ Enabled: true, HTTP: &config.MonitoringHTTPConfig{ @@ -155,7 +152,9 @@ func TestPprofEnabled(t *testing.T) { Enabled: true, }, } - serverReloader, err := NewServer(logp.L(), testAPIConfig, nil, nil, fakeCoordCfg, "linux", &testConfig) + logger, err := logp.NewDevelopmentLogger("") + require.NoError(t, err) + serverReloader, err := NewServer(logger, nil, nil, fakeCoordCfg, &testConfig) require.NoError(t, err) t.Logf("starting server...") diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 2a3ea435426..22e59d7d5bb 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -657,7 +657,7 @@ func (b *BeatsMonitor) getHttpStreams( }, "metricsets": []interface{}{"json"}, "path": "/stats", - "hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)}, + "hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.config.C)}, "namespace": "agent", "period": metricsCollectionIntervalString, "index": indexName, @@ -1247,17 +1247,17 @@ func changeOwner(path string, uid, gid int) error { } // HttpPlusAgentMonitoringEndpoint provides an agent monitoring endpoint path with a `http+` prefix. -func HttpPlusAgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string { - return PrefixedEndpoint(AgentMonitoringEndpoint(operatingSystem, cfg)) +func HttpPlusAgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string { + return PrefixedEndpoint(AgentMonitoringEndpoint(cfg)) } // AgentMonitoringEndpoint provides an agent monitoring endpoint path. -func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string { +func AgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string { if cfg != nil && cfg.Enabled { return "http://" + net.JoinHostPort(cfg.HTTP.Host, strconv.Itoa(cfg.HTTP.Port)) } - if operatingSystem == windowsOS { + if runtime.GOOS == windowsOS { return agentMbEndpointFileFormatWin } // unix socket path must be less than 104 characters diff --git a/internal/pkg/agent/cmd/container.go b/internal/pkg/agent/cmd/container.go index 37d300d2869..6d7e8db2085 100644 --- a/internal/pkg/agent/cmd/container.go +++ b/internal/pkg/agent/cmd/container.go @@ -30,14 +30,17 @@ import ( "github.com/elastic/elastic-agent-libs/kibana" "github.com/elastic/elastic-agent-libs/logp" + monitoringLib "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" "github.com/elastic/elastic-agent/internal/pkg/cli" "github.com/elastic/elastic-agent/internal/pkg/config" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" "github.com/elastic/elastic-agent/internal/pkg/crypto" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" @@ -295,6 +298,19 @@ func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error { return err } if shouldEnroll { + // pre-enroll in container uses a logger and default monitoring configuration + // once the enrollment has occurred then it will pick up the final configuration + monitoringServer, err := monitoring.NewServer(logp.L(), monitoringLib.GetNamespace, nil, nil, monitoringCfg.DefaultConfig()) + if err != nil { + return fmt.Errorf("failed starting monitoring server: %w", err) + } + monitoringServer.Start() + defer func() { + if monitoringServer != nil { + _ = monitoringServer.Stop() + } + }() + var policy *kibanaPolicy token := cfg.Fleet.EnrollmentToken if token == "" && !cfg.FleetServer.Enable { @@ -339,6 +355,9 @@ func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error { if err != nil { return errors.New("enrollment failed", err) } + + _ = monitoringServer.Stop() + monitoringServer = nil } return run(containerCfgOverrides, false, initTimeout, isContainer) diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 70cd126df02..1c02f713235 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -24,7 +24,6 @@ import ( "github.com/spf13/cobra" - "github.com/elastic/elastic-agent-libs/api" "github.com/elastic/elastic-agent-libs/logp" monitoringLib "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/service" @@ -668,13 +667,7 @@ func setupMetrics( return nil, err } - // start server for stats - endpointConfig := api.Config{ - Enabled: true, - Host: monitoring.AgentMonitoringEndpoint(operatingSystem, cfg), - } - - s, err := monitoring.NewServer(logger, endpointConfig, monitoringLib.GetNamespace, tracer, coord, operatingSystem, cfg) + s, err := monitoring.NewServer(logger, monitoringLib.GetNamespace, tracer, coord, cfg) if err != nil { return nil, errors.New(err, "could not start the HTTP server for the API") }