Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix missing liveness healthcheck during container enrollment

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/9612

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/9611
7 changes: 7 additions & 0 deletions internal/pkg/agent/application/monitoring/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func livenessHandler(coord CoordinatorState) func(http.ResponseWriter, *http.Req
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

if coord == nil {
// no coordinator, then that means we are in the container enrollment mode
// at this point the container is healthy and trying to enroll, so return 200
w.WriteHeader(http.StatusOK)
return nil
}

state := coord.State()
isUp := coord.IsActive(time.Second * 10)
// the coordinator check is always on, so if that fails, always return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (mc mockCoordinator) IsActive(_ time.Duration) bool {
return mc.isUp
}

func TestProcessHTTPHandler(t *testing.T) {
func TestLivenessProcessHTTPHandler(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
testCases := []struct {
Expand All @@ -46,6 +46,12 @@ func TestProcessHTTPHandler(t *testing.T) {
liveness bool
failon string
}{
{
name: "healthy-nocoord",
expectedCode: 200,
liveness: true,
failon: "heartbeat",
},
{
name: "default-failed",
coord: mockCoordinator{
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/monitoring/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var redirectableProcesses = []string{
profilingServicePrefix,
}

func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWriter, *http.Request) error, operatingSystem string) func(http.ResponseWriter, *http.Request) error {
func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

Expand Down Expand Up @@ -76,7 +76,7 @@ func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWrite
metricsPath = "stats"
}

return redirectToPath(w, r, componentID, metricsPath, operatingSystem)
return redirectToPath(w, r, componentID, metricsPath)
}

state := coord.State()
Expand Down Expand Up @@ -119,7 +119,7 @@ func isProcessRedirectable(componentID string) bool {
return false
}

func redirectToPath(w http.ResponseWriter, r *http.Request, id, path, operatingSystem string) error {
func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error {
endpoint := PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path)
if metricsErr != nil {
Expand Down
32 changes: 32 additions & 0 deletions internal/pkg/agent/application/monitoring/readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package monitoring

import (
"net/http"
"time"
)

// readinessHandler returns an HTTP handler function that checks the readiness of the service.
// If a CoordinatorState is provided, it checks if the coordinator is active within a 10-second timeout.
//
// This is meant to be a very simple check, just ensure that the service is up and running. For more detail
// about the liveness of the service use the livenessHandler.
func readinessHandler(coord CoordinatorState) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

if coord != nil {
// to be ready the coordinator must be active
if !coord.IsActive(time.Second * 10) {
w.WriteHeader(http.StatusServiceUnavailable)
return nil
}
}

w.WriteHeader(http.StatusOK)
return nil
}
}
57 changes: 57 additions & 0 deletions internal/pkg/agent/application/monitoring/readiness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package monitoring

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestReadinessProcessHTTPHandler(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
testCases := []struct {
name string
coord mockCoordinator
expectedCode int
liveness bool
}{
{
name: "healthy-nocoord",
expectedCode: 200,
liveness: true,
},
{
name: "healthy",
expectedCode: 200,
liveness: true,
},
{
name: "unhealthy",
expectedCode: 503,
liveness: false,
},
}

// test with processesHandler
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
testSrv := httptest.NewServer(createHandler(readinessHandler(test.coord)))
defer testSrv.Close()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, testSrv.URL, nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
res.Body.Close()
})
}

}
37 changes: 12 additions & 25 deletions internal/pkg/agent/application/monitoring/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package monitoring

import (
"fmt"
"net/http"
_ "net/http/pprof" //nolint:gosec // this is only conditionally exposed
"net/url"
Expand All @@ -25,30 +26,12 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// New creates a new server exposing metrics and process information.
// NewServer creates a new server exposing metrics and process information.
func NewServer(
log *logger.Logger,
endpointConfig api.Config,
ns func(string) *monitoring.Namespace,
tracer *apm.Tracer,
coord CoordinatorState,
operatingSystem string,
mcfg *monitoringCfg.MonitoringConfig,
) (*reload.ServerReloader, error) {
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
// log but ignore
log.Warnf("failed to create monitoring drop: %v", err)
}

return exposeMetricsEndpoint(log, ns, tracer, coord, operatingSystem, mcfg)
}

func exposeMetricsEndpoint(
log *logger.Logger,
ns func(string) *monitoring.Namespace,
tracer *apm.Tracer,
coord CoordinatorState,
operatingSystem string,
mcfg *monitoringCfg.MonitoringConfig,
) (*reload.ServerReloader, error) {

Expand All @@ -66,15 +49,15 @@ func exposeMetricsEndpoint(

statsHandler := statsHandler(statNs)
r.Handle("/stats", createHandler(statsHandler))
r.Handle("/readiness", createHandler(readinessHandler(coord)))
r.Handle("/liveness", createHandler(livenessHandler(coord)))

if isProcessStatsEnabled(cfg) {
log.Infof("process monitoring is enabled, creating monitoring endpoints")
r.Handle("/processes", createHandler(processesHandler(coord)))
r.Handle("/processes/{componentID}", createHandler(processHandler(coord, statsHandler, operatingSystem)))
r.Handle("/processes/{componentID}/", createHandler(processHandler(coord, statsHandler, operatingSystem)))
r.Handle("/processes/{componentID}/{metricsPath}", createHandler(processHandler(coord, statsHandler, operatingSystem)))

r.Handle("/liveness", createHandler(livenessHandler(coord)))
r.Handle("/processes/{componentID}", createHandler(processHandler(coord, statsHandler)))
r.Handle("/processes/{componentID}/", createHandler(processHandler(coord, statsHandler)))
r.Handle("/processes/{componentID}/{metricsPath}", createHandler(processHandler(coord, statsHandler)))
}

if isPprofEnabled(cfg) {
Expand All @@ -91,9 +74,13 @@ func exposeMetricsEndpoint(

srvCfg := api.DefaultConfig()
srvCfg.Enabled = cfg.Enabled
srvCfg.Host = AgentMonitoringEndpoint(operatingSystem, cfg)
srvCfg.Host = AgentMonitoringEndpoint(cfg)
srvCfg.Port = cfg.HTTP.Port
log.Infof("creating monitoring API with cfg %#v", srvCfg)
if err := createAgentMonitoringDrop(srvCfg.Host); err != nil {
// if it cannot create the path for the socket, then we cannot start the server
return nil, fmt.Errorf("failed to create monitoring socket directory: %w", err)
}
apiServer, err := api.NewFromConfig(log, mux, srvCfg)
if err != nil {
return nil, errors.New(err, "failed to create api server")
Expand Down
23 changes: 11 additions & 12 deletions internal/pkg/agent/application/monitoring/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/api"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
Expand Down Expand Up @@ -92,41 +91,41 @@ func TestHTTPReloadEnableBehavior(t *testing.T) {

for _, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) {
serverReloader, err := NewServer(logp.L(), api.Config{}, nil, nil, fakeCoordCfg, "linux", testCase.initConfig)
serverReloader, err := NewServer(logp.L(), nil, nil, fakeCoordCfg, testCase.initConfig)
require.NoError(t, err)

t.Logf("starting server...")
serverReloader.Start()
if testCase.httpOnAtInit {
waitOnReturnCode(t, http.StatusOK, "liveness", "?failon=failed", serverReloader)
waitOnReturnCode(t, http.StatusOK, "processes", "", serverReloader)
} else {
waitOnReturnCode(t, http.StatusNotFound, "liveness", "?failon=failed", serverReloader)
waitOnReturnCode(t, http.StatusNotFound, "processes", "", serverReloader)
}

err = serverReloader.Reload(testCase.secondConfig)
require.NoError(t, err)

if testCase.httpOnAfterReload {
waitOnReturnCode(t, http.StatusOK, "liveness", "?failon=failed", serverReloader)
waitOnReturnCode(t, http.StatusOK, "processes", "", serverReloader)
} else {
waitOnReturnCode(t, http.StatusNotFound, "liveness", "?failon=failed", serverReloader)
waitOnReturnCode(t, http.StatusNotFound, "processes", "", serverReloader)
}

})
}
}

func TestBasicLivenessConfig(t *testing.T) {
_ = logp.DevelopmentSetup()
testAPIConfig := api.Config{}
testConfig := config.MonitoringConfig{
Enabled: true,
HTTP: &config.MonitoringHTTPConfig{
Enabled: true,
Port: 0,
},
}
serverReloader, err := NewServer(logp.L(), testAPIConfig, nil, nil, fakeCoordCfg, "linux", &testConfig)
logger, err := logp.NewDevelopmentLogger("")
require.NoError(t, err)
serverReloader, err := NewServer(logger, nil, nil, fakeCoordCfg, &testConfig)
require.NoError(t, err)

t.Logf("starting server...")
Expand All @@ -143,8 +142,6 @@ func TestBasicLivenessConfig(t *testing.T) {
}

func TestPprofEnabled(t *testing.T) {
_ = logp.DevelopmentSetup()
testAPIConfig := api.Config{}
testConfig := config.MonitoringConfig{
Enabled: true,
HTTP: &config.MonitoringHTTPConfig{
Expand All @@ -155,7 +152,9 @@ func TestPprofEnabled(t *testing.T) {
Enabled: true,
},
}
serverReloader, err := NewServer(logp.L(), testAPIConfig, nil, nil, fakeCoordCfg, "linux", &testConfig)
logger, err := logp.NewDevelopmentLogger("")
require.NoError(t, err)
serverReloader, err := NewServer(logger, nil, nil, fakeCoordCfg, &testConfig)
require.NoError(t, err)

t.Logf("starting server...")
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (b *BeatsMonitor) getHttpStreams(
},
"metricsets": []interface{}{"json"},
"path": "/stats",
"hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)},
"hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.config.C)},
"namespace": "agent",
"period": metricsCollectionIntervalString,
"index": indexName,
Expand Down Expand Up @@ -1247,17 +1247,17 @@ func changeOwner(path string, uid, gid int) error {
}

// HttpPlusAgentMonitoringEndpoint provides an agent monitoring endpoint path with a `http+` prefix.
func HttpPlusAgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string {
return PrefixedEndpoint(AgentMonitoringEndpoint(operatingSystem, cfg))
func HttpPlusAgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string {
return PrefixedEndpoint(AgentMonitoringEndpoint(cfg))
}

// AgentMonitoringEndpoint provides an agent monitoring endpoint path.
func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string {
func AgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string {
if cfg != nil && cfg.Enabled {
return "http://" + net.JoinHostPort(cfg.HTTP.Host, strconv.Itoa(cfg.HTTP.Port))
}

if operatingSystem == windowsOS {
if runtime.GOOS == windowsOS {
return agentMbEndpointFileFormatWin
}
// unix socket path must be less than 104 characters
Expand Down
Loading
Loading