Skip to content

Commit 1c27e4e

Browse files
Migrate agent to a different cluster (#8014)
1 parent e68e038 commit 1c27e4e

23 files changed

+2280
-587
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Handle Migrate action in agent
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: Adds ability to re-enroll agent to different cluster using Fleet UI.
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: elastic-agent
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
#issue: https://github.com/owner/repo/1234
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package handlers
6+
7+
import (
8+
"context"
9+
"fmt"
10+
11+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
12+
fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet"
13+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
14+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
15+
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
16+
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
17+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
18+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
19+
"github.com/elastic/elastic-agent/pkg/core/logger"
20+
"github.com/elastic/elastic-agent/pkg/features"
21+
)
22+
23+
const ()
24+
25+
type migrateCoordinator interface {
26+
Migrate(_ context.Context, _ *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff) error
27+
ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string)
28+
HasEndpoint() bool
29+
}
30+
31+
// Migrate handles migrate change coming from fleet.
32+
type Migrate struct {
33+
log *logger.Logger
34+
agentInfo info.Agent
35+
coord migrateCoordinator
36+
37+
tamperProtectionFn func() bool // allows to inject the flag for tests, defaults to features.TamperProtection
38+
}
39+
40+
// NewMigrate creates a new Migrate handler.
41+
func NewMigrate(
42+
log *logger.Logger,
43+
agentInfo info.Agent,
44+
coord migrateCoordinator,
45+
) *Migrate {
46+
return &Migrate{
47+
log: log,
48+
agentInfo: agentInfo,
49+
coord: coord,
50+
tamperProtectionFn: features.TamperProtection,
51+
}
52+
}
53+
54+
// Handle handles MIGRATE action.
55+
func (h *Migrate) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker) error {
56+
h.log.Debugf("handlerMigrate: action '%+v' received", a)
57+
58+
action, ok := a.(*fleetapi.ActionMigrate)
59+
if !ok {
60+
return fmt.Errorf("invalid type, expected ActionMigrate and received %T", a)
61+
}
62+
63+
// if endpoint is present do not proceed
64+
if h.tamperProtectionFn() && h.coord.HasEndpoint() {
65+
err := errors.New("unsupported action: tamper protected agent")
66+
h.ackFailure(ctx, err, action, ack)
67+
return err
68+
}
69+
70+
if err := h.coord.Migrate(ctx, action, fleetgateway.RequestBackoff); err != nil {
71+
// this should not happen, unmanaged agent should not receive the action
72+
// defensive coding to avoid misbehavior
73+
if errors.Is(err, coordinator.ErrNotManaged) {
74+
return errors.New("unmanaged agent, use Enroll instead")
75+
}
76+
77+
// ack failure
78+
h.ackFailure(ctx, err, action, ack)
79+
80+
if errors.Is(err, coordinator.ErrFleetServer) {
81+
return errors.New("action not available for agents running Fleet Server")
82+
}
83+
84+
return fmt.Errorf("migration of agent to a new cluster failed: %w", err)
85+
86+
}
87+
88+
// reexec and load new config
89+
h.coord.ReExec(nil)
90+
return nil
91+
}
92+
93+
func (h *Migrate) ackFailure(ctx context.Context, err error, action *fleetapi.ActionMigrate, acker acker.Acker) {
94+
action.Err = err
95+
96+
if err := acker.Ack(ctx, action); err != nil {
97+
h.log.Errorw("failed to ack migrate action",
98+
"error.message", err,
99+
"action", action)
100+
}
101+
102+
if err := acker.Commit(ctx); err != nil {
103+
h.log.Errorw("failed to commit migrate action",
104+
"error.message", err,
105+
"action", action)
106+
}
107+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package handlers
6+
7+
import (
8+
"context"
9+
"testing"
10+
11+
"github.com/stretchr/testify/mock"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
15+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
16+
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
17+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
18+
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
19+
mockinfo "github.com/elastic/elastic-agent/testing/mocks/internal_/pkg/agent/application/info"
20+
)
21+
22+
func TestActionMigratelHandler(t *testing.T) {
23+
log, _ := loggertest.New("")
24+
mockAgentInfo := mockinfo.NewAgent(t)
25+
t.Run("wrong action type", func(t *testing.T) {
26+
action := &fleetapi.ActionSettings{}
27+
ack := &fakeAcker{}
28+
ack.On("Ack", t.Context(), action).Return(nil)
29+
ack.On("Commit", t.Context()).Return(nil)
30+
31+
coord := &fakeMigrateCoordinator{}
32+
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
33+
coord.On("ReExec", mock.Anything, mock.Anything)
34+
35+
h := NewMigrate(log, mockAgentInfo, coord)
36+
require.NotNil(t, h.Handle(t.Context(), action, ack))
37+
coord.AssertNumberOfCalls(t, "Migrate", 0)
38+
coord.AssertNumberOfCalls(t, "ReExec", 0)
39+
})
40+
41+
t.Run("tamper protected agent", func(t *testing.T) {
42+
action := &fleetapi.ActionMigrate{
43+
ActionType: "MIGRATE",
44+
}
45+
46+
ack := &fakeAcker{}
47+
ack.On("Ack", t.Context(), action).Return(nil)
48+
ack.On("Commit", t.Context()).Return(nil)
49+
50+
coord := &fakeMigrateCoordinator{}
51+
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
52+
coord.On("ReExec", mock.Anything, mock.Anything)
53+
coord.On("HasEndpoint").Return(true)
54+
55+
h := NewMigrate(log, mockAgentInfo, coord)
56+
h.tamperProtectionFn = func() bool { return true }
57+
58+
require.NotNil(t, h.Handle(t.Context(), action, ack))
59+
coord.AssertNumberOfCalls(t, "Migrate", 0)
60+
ack.AssertCalled(t, "Ack", t.Context(), action)
61+
ack.AssertCalled(t, "Commit", t.Context())
62+
coord.AssertNumberOfCalls(t, "ReExec", 0)
63+
})
64+
65+
t.Run("action propagated to coordinator", func(t *testing.T) {
66+
action := &fleetapi.ActionMigrate{}
67+
68+
ack := &fakeAcker{}
69+
ack.On("Ack", t.Context(), action).Return(nil)
70+
ack.On("Commit", t.Context()).Return(nil)
71+
72+
coord := &fakeMigrateCoordinator{}
73+
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
74+
coord.On("ReExec", mock.Anything, mock.Anything)
75+
76+
h := NewMigrate(log, mockAgentInfo, coord)
77+
h.tamperProtectionFn = func() bool { return false }
78+
79+
require.Nil(t, h.Handle(t.Context(), action, ack))
80+
coord.AssertNumberOfCalls(t, "Migrate", 1)
81+
82+
// ack delegated to migrate coordinator
83+
ack.AssertNumberOfCalls(t, "Ack", 0)
84+
ack.AssertNumberOfCalls(t, "Commit", 0)
85+
coord.AssertCalled(t, "ReExec", mock.Anything, mock.Anything)
86+
})
87+
88+
t.Run("fleet server", func(t *testing.T) {
89+
action := &fleetapi.ActionMigrate{}
90+
91+
ack := &fakeAcker{}
92+
ack.On("Ack", t.Context(), action).Return(nil)
93+
ack.On("Commit", t.Context()).Return(nil)
94+
95+
coord := &fakeMigrateCoordinator{}
96+
coord.On("Migrate", mock.Anything, mock.Anything).Return(coordinator.ErrFleetServer)
97+
coord.On("ReExec", mock.Anything, mock.Anything)
98+
99+
h := NewMigrate(log, mockAgentInfo, coord)
100+
h.tamperProtectionFn = func() bool { return false }
101+
102+
require.Error(t, coordinator.ErrFleetServer, h.Handle(t.Context(), action, ack))
103+
coord.AssertNumberOfCalls(t, "Migrate", 1)
104+
105+
// ack not delegated to migrate coordinator, failure is reported
106+
ack.AssertNumberOfCalls(t, "Ack", 1)
107+
ack.AssertNumberOfCalls(t, "Commit", 1)
108+
coord.AssertNotCalled(t, "ReExec", mock.Anything, mock.Anything)
109+
})
110+
}
111+
112+
type fakeMigrateCoordinator struct {
113+
mock.Mock
114+
}
115+
116+
func (f *fakeMigrateCoordinator) Migrate(ctx context.Context, a *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff) error {
117+
args := f.Called(ctx, a)
118+
return args.Error(0)
119+
}
120+
121+
func (f *fakeMigrateCoordinator) ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string) {
122+
f.Called(callback, argOverrides)
123+
}
124+
125+
func (f *fakeMigrateCoordinator) HasEndpoint() bool {
126+
args := f.Called()
127+
return args.Bool(0)
128+
}

internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ import (
1010
goerrors "errors"
1111
"fmt"
1212
"io"
13-
"net/http"
1413
"sort"
15-
"time"
1614

1715
"gopkg.in/yaml.v2"
1816

@@ -33,10 +31,6 @@ import (
3331
"github.com/elastic/elastic-agent/pkg/core/logger"
3432
)
3533

36-
const (
37-
apiStatusTimeout = 15 * time.Second
38-
)
39-
4034
// PolicyChangeHandler is a handler for POLICY_CHANGE action.
4135
type PolicyChangeHandler struct {
4236
log *logger.Logger
@@ -171,28 +165,7 @@ func testFleetConfig(ctx context.Context, log *logger.Logger, clientConfig remot
171165
clientConfig.Hosts, clientConfig.Host)))
172166
}
173167

174-
ctx, cancel := context.WithTimeout(ctx, apiStatusTimeout)
175-
defer cancel()
176-
177-
// TODO: a HEAD should be enough as we need to test only the connectivity part
178-
resp, err := fleetClient.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil)
179-
if err != nil {
180-
return errors.New(
181-
err, "fail to communicate with Fleet Server API client hosts",
182-
errors.TypeNetwork, errors.M("hosts", clientConfig.Hosts))
183-
}
184-
185-
if resp.StatusCode != http.StatusOK {
186-
return errors.New(
187-
err, fmt.Sprintf("fleet server ping returned a bad status code: %d", resp.StatusCode),
188-
errors.TypeNetwork, errors.M("hosts", clientConfig.Hosts))
189-
}
190-
191-
// discard body for proper cancellation and connection reuse
192-
_, _ = io.Copy(io.Discard, resp.Body)
193-
resp.Body.Close()
194-
195-
return nil
168+
return client.CheckRemote(ctx, fleetClient)
196169
}
197170

198171
// updateFleetConfig copies the relevant Fleet client settings from policyConfig on agentConfig. The destination struct is modified in-place
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package coordinator
6+
7+
import (
8+
"fmt"
9+
"os"
10+
11+
"github.com/otiai10/copy"
12+
13+
"github.com/elastic/elastic-agent-libs/file"
14+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
15+
)
16+
17+
const (
18+
backupSuffix = ".enroll.bak"
19+
)
20+
21+
// RestoreConfig restores from backup if needed and signals restore was performed
22+
func RestoreConfig() error {
23+
configFile := paths.AgentConfigFile()
24+
backup := configFile + backupSuffix
25+
26+
// check backup exists
27+
if _, err := os.Stat(backup); os.IsNotExist(err) {
28+
return nil
29+
}
30+
31+
if err := file.SafeFileRotate(configFile, backup); err != nil {
32+
return fmt.Errorf("failed to safe rotate backup config file: %w", err)
33+
}
34+
35+
return nil
36+
}
37+
38+
// backupConfig creates a backup of currently used fleet config
39+
func backupConfig() error {
40+
configFile := paths.AgentConfigFile()
41+
backup := configFile + backupSuffix
42+
43+
err := copy.Copy(configFile, backup, copy.Options{
44+
PermissionControl: copy.AddPermission(0600),
45+
})
46+
if err != nil {
47+
return fmt.Errorf("failed to backup config file %s -> %s: %w", configFile, backup, err)
48+
}
49+
50+
return nil
51+
}
52+
53+
// cleanBackupConfig removes backup config file
54+
func cleanBackupConfig() error {
55+
backup := paths.AgentConfigFile() + backupSuffix
56+
if err := os.RemoveAll(backup); err != nil && !os.IsNotExist(err) {
57+
return err
58+
}
59+
60+
return nil
61+
}

0 commit comments

Comments
 (0)