Skip to content

Commit c97cc47

Browse files
blakerousemergify[bot]
authored andcommitted
Add /readiness and /liveness when enrolling with the container (#9612)
* Enable health checking pre-enroll in container start-up path. Add readiness endpoint. * Fix func signature. * Fix tests. * Add changelog entry. * Fix imports. * Apply suggestion from @ycombinator Co-authored-by: Shaunak Kashyap <[email protected]> --------- Co-authored-by: Shaunak Kashyap <[email protected]> (cherry picked from commit c028f68) # Conflicts: # internal/pkg/agent/application/monitoring/process.go # internal/pkg/agent/application/monitoring/server_test.go # internal/pkg/agent/application/monitoring/v1_monitor.go # internal/pkg/agent/cmd/container.go
1 parent e5d8892 commit c97cc47

File tree

11 files changed

+210
-45
lines changed

11 files changed

+210
-45
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Fix missing liveness healthcheck during container enrollment
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/elastic-agent/pull/9612
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/elastic-agent/issues/9611

internal/pkg/agent/application/monitoring/liveness.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ func livenessHandler(coord CoordinatorState) func(http.ResponseWriter, *http.Req
5656
return func(w http.ResponseWriter, r *http.Request) error {
5757
w.Header().Set("Content-Type", "application/json; charset=utf-8")
5858

59+
if coord == nil {
60+
// no coordinator, then that means we are in the container enrollment mode
61+
// at this point the container is healthy and trying to enroll, so return 200
62+
w.WriteHeader(http.StatusOK)
63+
return nil
64+
}
65+
5966
state := coord.State()
6067
isUp := coord.IsActive(time.Second * 10)
6168
// the coordinator check is always on, so if that fails, always return false

internal/pkg/agent/application/monitoring/liveness_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (mc mockCoordinator) IsActive(_ time.Duration) bool {
3636
return mc.isUp
3737
}
3838

39-
func TestProcessHTTPHandler(t *testing.T) {
39+
func TestLivenessProcessHTTPHandler(t *testing.T) {
4040
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
4141
defer cancel()
4242
testCases := []struct {
@@ -46,6 +46,12 @@ func TestProcessHTTPHandler(t *testing.T) {
4646
liveness bool
4747
failon string
4848
}{
49+
{
50+
name: "healthy-nocoord",
51+
expectedCode: 200,
52+
liveness: true,
53+
failon: "heartbeat",
54+
},
4955
{
5056
name: "default-failed",
5157
coord: mockCoordinator{

internal/pkg/agent/application/monitoring/process.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ var redirectableProcesses = []string{
4343
profilingServicePrefix,
4444
}
4545

46-
func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWriter, *http.Request) error, operatingSystem string) func(http.ResponseWriter, *http.Request) error {
46+
func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWriter, *http.Request) error) func(http.ResponseWriter, *http.Request) error {
4747
return func(w http.ResponseWriter, r *http.Request) error {
4848
w.Header().Set("Content-Type", "application/json; charset=utf-8")
4949

@@ -76,7 +76,7 @@ func processHandler(coord CoordinatorState, statsHandler func(http.ResponseWrite
7676
metricsPath = "stats"
7777
}
7878

79-
return redirectToPath(w, r, componentID, metricsPath, operatingSystem)
79+
return redirectToPath(w, r, componentID, metricsPath)
8080
}
8181

8282
state := coord.State()
@@ -119,9 +119,15 @@ func isProcessRedirectable(componentID string) bool {
119119
return false
120120
}
121121

122+
<<<<<<< HEAD
122123
func redirectToPath(w http.ResponseWriter, r *http.Request, id, path, operatingSystem string) error {
123124
endpoint := prefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
124125
metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), endpoint, path)
126+
=======
127+
func redirectToPath(w http.ResponseWriter, r *http.Request, id, path string) error {
128+
endpoint := PrefixedEndpoint(utils.SocketURLWithFallback(id, paths.TempDir()))
129+
metricsBytes, statusCode, metricsErr := GetProcessMetrics(r.Context(), endpoint, path)
130+
>>>>>>> c028f68fa (Add /readiness and /liveness when enrolling with the container (#9612))
125131
if metricsErr != nil {
126132
return metricsErr
127133
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package monitoring
6+
7+
import (
8+
"net/http"
9+
"time"
10+
)
11+
12+
// readinessHandler returns an HTTP handler function that checks the readiness of the service.
13+
// If a CoordinatorState is provided, it checks if the coordinator is active within a 10-second timeout.
14+
//
15+
// This is meant to be a very simple check, just ensure that the service is up and running. For more detail
16+
// about the liveness of the service use the livenessHandler.
17+
func readinessHandler(coord CoordinatorState) func(http.ResponseWriter, *http.Request) error {
18+
return func(w http.ResponseWriter, r *http.Request) error {
19+
w.Header().Set("Content-Type", "application/json; charset=utf-8")
20+
21+
if coord != nil {
22+
// to be ready the coordinator must be active
23+
if !coord.IsActive(time.Second * 10) {
24+
w.WriteHeader(http.StatusServiceUnavailable)
25+
return nil
26+
}
27+
}
28+
29+
w.WriteHeader(http.StatusOK)
30+
return nil
31+
}
32+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package monitoring
6+
7+
import (
8+
"context"
9+
"net/http"
10+
"net/http/httptest"
11+
"testing"
12+
"time"
13+
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestReadinessProcessHTTPHandler(t *testing.T) {
18+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
19+
defer cancel()
20+
testCases := []struct {
21+
name string
22+
coord mockCoordinator
23+
expectedCode int
24+
liveness bool
25+
}{
26+
{
27+
name: "healthy-nocoord",
28+
expectedCode: 200,
29+
liveness: true,
30+
},
31+
{
32+
name: "healthy",
33+
expectedCode: 200,
34+
liveness: true,
35+
},
36+
{
37+
name: "unhealthy",
38+
expectedCode: 503,
39+
liveness: false,
40+
},
41+
}
42+
43+
// test with processesHandler
44+
for _, test := range testCases {
45+
t.Run(test.name, func(t *testing.T) {
46+
testSrv := httptest.NewServer(createHandler(readinessHandler(test.coord)))
47+
defer testSrv.Close()
48+
49+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, testSrv.URL, nil)
50+
require.NoError(t, err)
51+
res, err := http.DefaultClient.Do(req)
52+
require.NoError(t, err)
53+
res.Body.Close()
54+
})
55+
}
56+
57+
}

internal/pkg/agent/application/monitoring/server.go

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package monitoring
66

77
import (
8+
"fmt"
89
"net/http"
910
_ "net/http/pprof" //nolint:gosec // this is only conditionally exposed
1011
"net/url"
@@ -25,30 +26,12 @@ import (
2526
"github.com/elastic/elastic-agent/pkg/core/logger"
2627
)
2728

28-
// New creates a new server exposing metrics and process information.
29+
// NewServer creates a new server exposing metrics and process information.
2930
func NewServer(
3031
log *logger.Logger,
31-
endpointConfig api.Config,
3232
ns func(string) *monitoring.Namespace,
3333
tracer *apm.Tracer,
3434
coord CoordinatorState,
35-
operatingSystem string,
36-
mcfg *monitoringCfg.MonitoringConfig,
37-
) (*reload.ServerReloader, error) {
38-
if err := createAgentMonitoringDrop(endpointConfig.Host); err != nil {
39-
// log but ignore
40-
log.Warnf("failed to create monitoring drop: %v", err)
41-
}
42-
43-
return exposeMetricsEndpoint(log, ns, tracer, coord, operatingSystem, mcfg)
44-
}
45-
46-
func exposeMetricsEndpoint(
47-
log *logger.Logger,
48-
ns func(string) *monitoring.Namespace,
49-
tracer *apm.Tracer,
50-
coord CoordinatorState,
51-
operatingSystem string,
5235
mcfg *monitoringCfg.MonitoringConfig,
5336
) (*reload.ServerReloader, error) {
5437

@@ -66,15 +49,15 @@ func exposeMetricsEndpoint(
6649

6750
statsHandler := statsHandler(statNs)
6851
r.Handle("/stats", createHandler(statsHandler))
52+
r.Handle("/readiness", createHandler(readinessHandler(coord)))
53+
r.Handle("/liveness", createHandler(livenessHandler(coord)))
6954

7055
if isProcessStatsEnabled(cfg) {
7156
log.Infof("process monitoring is enabled, creating monitoring endpoints")
7257
r.Handle("/processes", createHandler(processesHandler(coord)))
73-
r.Handle("/processes/{componentID}", createHandler(processHandler(coord, statsHandler, operatingSystem)))
74-
r.Handle("/processes/{componentID}/", createHandler(processHandler(coord, statsHandler, operatingSystem)))
75-
r.Handle("/processes/{componentID}/{metricsPath}", createHandler(processHandler(coord, statsHandler, operatingSystem)))
76-
77-
r.Handle("/liveness", createHandler(livenessHandler(coord)))
58+
r.Handle("/processes/{componentID}", createHandler(processHandler(coord, statsHandler)))
59+
r.Handle("/processes/{componentID}/", createHandler(processHandler(coord, statsHandler)))
60+
r.Handle("/processes/{componentID}/{metricsPath}", createHandler(processHandler(coord, statsHandler)))
7861
}
7962

8063
if isPprofEnabled(cfg) {
@@ -91,9 +74,13 @@ func exposeMetricsEndpoint(
9174

9275
srvCfg := api.DefaultConfig()
9376
srvCfg.Enabled = cfg.Enabled
94-
srvCfg.Host = AgentMonitoringEndpoint(operatingSystem, cfg)
77+
srvCfg.Host = AgentMonitoringEndpoint(cfg)
9578
srvCfg.Port = cfg.HTTP.Port
9679
log.Infof("creating monitoring API with cfg %#v", srvCfg)
80+
if err := createAgentMonitoringDrop(srvCfg.Host); err != nil {
81+
// if it cannot create the path for the socket, then we cannot start the server
82+
return nil, fmt.Errorf("failed to create monitoring socket directory: %w", err)
83+
}
9784
apiServer, err := api.NewFromConfig(log, mux, srvCfg)
9885
if err != nil {
9986
return nil, errors.New(err, "failed to create api server")

internal/pkg/agent/application/monitoring/server_test.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"github.com/elastic/elastic-agent-client/v7/pkg/client"
18-
"github.com/elastic/elastic-agent-libs/api"
1918
"github.com/elastic/elastic-agent-libs/logp"
2019
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
2120
"github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/reload"
@@ -92,41 +91,50 @@ func TestHTTPReloadEnableBehavior(t *testing.T) {
9291

9392
for _, testCase := range cases {
9493
t.Run(testCase.name, func(t *testing.T) {
95-
serverReloader, err := NewServer(logp.L(), api.Config{}, nil, nil, fakeCoordCfg, "linux", testCase.initConfig)
94+
serverReloader, err := NewServer(logp.L(), nil, nil, fakeCoordCfg, testCase.initConfig)
9695
require.NoError(t, err)
9796

9897
t.Logf("starting server...")
9998
serverReloader.Start()
10099
if testCase.httpOnAtInit {
101-
waitOnReturnCode(t, http.StatusOK, "liveness", "?failon=failed", serverReloader)
100+
waitOnReturnCode(t, http.StatusOK, "processes", "", serverReloader)
102101
} else {
103-
waitOnReturnCode(t, http.StatusNotFound, "liveness", "?failon=failed", serverReloader)
102+
waitOnReturnCode(t, http.StatusNotFound, "processes", "", serverReloader)
104103
}
105104

106105
err = serverReloader.Reload(testCase.secondConfig)
107106
require.NoError(t, err)
108107

109108
if testCase.httpOnAfterReload {
110-
waitOnReturnCode(t, http.StatusOK, "liveness", "?failon=failed", serverReloader)
109+
waitOnReturnCode(t, http.StatusOK, "processes", "", serverReloader)
111110
} else {
112-
waitOnReturnCode(t, http.StatusNotFound, "liveness", "?failon=failed", serverReloader)
111+
waitOnReturnCode(t, http.StatusNotFound, "processes", "", serverReloader)
113112
}
114113

115114
})
116115
}
117116
}
118117

119118
func TestBasicLivenessConfig(t *testing.T) {
119+
<<<<<<< HEAD
120120
_ = logp.DevelopmentSetup()
121121
testAPIConfig := api.Config{}
122+
=======
123+
>>>>>>> c028f68fa (Add /readiness and /liveness when enrolling with the container (#9612))
122124
testConfig := config.MonitoringConfig{
123125
Enabled: true,
124126
HTTP: &config.MonitoringHTTPConfig{
125127
Enabled: true,
126128
Port: 0,
127129
},
128130
}
131+
<<<<<<< HEAD
129132
serverReloader, err := NewServer(logp.L(), testAPIConfig, nil, nil, fakeCoordCfg, "linux", &testConfig)
133+
=======
134+
logger, err := logp.NewDevelopmentLogger("")
135+
require.NoError(t, err)
136+
serverReloader, err := NewServer(logger, nil, nil, fakeCoordCfg, &testConfig)
137+
>>>>>>> c028f68fa (Add /readiness and /liveness when enrolling with the container (#9612))
130138
require.NoError(t, err)
131139

132140
t.Logf("starting server...")
@@ -143,8 +151,11 @@ func TestBasicLivenessConfig(t *testing.T) {
143151
}
144152

145153
func TestPprofEnabled(t *testing.T) {
154+
<<<<<<< HEAD
146155
_ = logp.DevelopmentSetup()
147156
testAPIConfig := api.Config{}
157+
=======
158+
>>>>>>> c028f68fa (Add /readiness and /liveness when enrolling with the container (#9612))
148159
testConfig := config.MonitoringConfig{
149160
Enabled: true,
150161
HTTP: &config.MonitoringHTTPConfig{
@@ -155,7 +166,13 @@ func TestPprofEnabled(t *testing.T) {
155166
Enabled: true,
156167
},
157168
}
169+
<<<<<<< HEAD
158170
serverReloader, err := NewServer(logp.L(), testAPIConfig, nil, nil, fakeCoordCfg, "linux", &testConfig)
171+
=======
172+
logger, err := logp.NewDevelopmentLogger("")
173+
require.NoError(t, err)
174+
serverReloader, err := NewServer(logger, nil, nil, fakeCoordCfg, &testConfig)
175+
>>>>>>> c028f68fa (Add /readiness and /liveness when enrolling with the container (#9612))
159176
require.NoError(t, err)
160177

161178
t.Logf("starting server...")

internal/pkg/agent/application/monitoring/v1_monitor.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ func (b *BeatsMonitor) getHttpStreams(
616616
},
617617
"metricsets": []interface{}{"json"},
618618
"path": "/stats",
619-
"hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.operatingSystem, b.config.C)},
619+
"hosts": []interface{}{HttpPlusAgentMonitoringEndpoint(b.config.C)},
620620
"namespace": "agent",
621621
"period": metricsCollectionIntervalString,
622622
"index": indexName,
@@ -1167,17 +1167,22 @@ func changeOwner(path string, uid, gid int) error {
11671167
}
11681168

11691169
// HttpPlusAgentMonitoringEndpoint provides an agent monitoring endpoint path with a `http+` prefix.
1170+
<<<<<<< HEAD
11701171
func HttpPlusAgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string {
11711172
return prefixedEndpoint(AgentMonitoringEndpoint(operatingSystem, cfg))
1173+
=======
1174+
func HttpPlusAgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string {
1175+
return PrefixedEndpoint(AgentMonitoringEndpoint(cfg))
1176+
>>>>>>> c028f68fa (Add /readiness and /liveness when enrolling with the container (#9612))
11721177
}
11731178

11741179
// AgentMonitoringEndpoint provides an agent monitoring endpoint path.
1175-
func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string {
1180+
func AgentMonitoringEndpoint(cfg *monitoringCfg.MonitoringConfig) string {
11761181
if cfg != nil && cfg.Enabled {
11771182
return "http://" + net.JoinHostPort(cfg.HTTP.Host, strconv.Itoa(cfg.HTTP.Port))
11781183
}
11791184

1180-
if operatingSystem == windowsOS {
1185+
if runtime.GOOS == windowsOS {
11811186
return agentMbEndpointFileFormatWin
11821187
}
11831188
// unix socket path must be less than 104 characters

0 commit comments

Comments
 (0)