Skip to content

Commit b8d062c

Browse files
authored
Add OTel collector properties to policy schema (#5169)
Support provisioning of OTel collector configuration for hybrid agents. Add the OTel collector properties to the policy models and the OpenAPI schema, so they can be received and forwarded to agents. Include logic to clone the new fields in the model, and add a basic integration test.
1 parent 568b0e5 commit b8d062c

File tree

9 files changed

+392
-3
lines changed

9 files changed

+392
-3
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add OTel collector properties to policy schema
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Add OTel collector properties to the policy schema. This way policies defined in Fleet that include
21+
this data are forwarded to agents.
22+
23+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
24+
component: fleet-server
25+
26+
# PR URL; optional; the PR number that added the changeset.
27+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
28+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
29+
# Please provide it if you are adding a fragment for a different PR.
30+
pr: https://github.com/elastic/fleet-server/pull/5169
31+
32+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
33+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
34+
issue: https://github.com/elastic/fleet-server/issues/5241

internal/pkg/api/openapi.gen.go

Lines changed: 34 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/model/ext.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package model
66

77
import (
88
"maps"
9+
"slices"
910
"time"
1011
)
1112

@@ -92,21 +93,46 @@ func ClonePolicyData(d *PolicyData) *PolicyData {
9293
OutputPermissions: d.OutputPermissions,
9394
Outputs: cloneMap(d.Outputs),
9495
Revision: d.Revision,
95-
SecretReferences: make([]SecretReferencesItems, 0, len(d.SecretReferences)),
96+
SecretReferences: slices.Clone(d.SecretReferences),
97+
98+
// OTel config.
99+
Connectors: maps.Clone(d.Connectors),
100+
Exporters: maps.Clone(d.Exporters),
101+
Extensions: maps.Clone(d.Extensions),
102+
Processors: maps.Clone(d.Processors),
103+
Receivers: maps.Clone(d.Receivers),
96104
}
97105
for _, m := range d.Inputs {
98106
res.Inputs = append(res.Inputs, maps.Clone(m))
99107
}
100-
res.SecretReferences = append(res.SecretReferences, d.SecretReferences...)
101108
if d.Signed != nil {
102109
res.Signed = &Signed{
103110
Data: d.Signed.Data,
104111
Signature: d.Signed.Signature,
105112
}
106113
}
114+
if d.Service != nil {
115+
res.Service = cloneOTelService(d.Service)
116+
}
107117
return res
108118
}
109119

120+
func cloneOTelService(s *Service) *Service {
121+
var clone Service
122+
clone.Extensions = slices.Clone(s.Extensions)
123+
if len(s.Pipelines) > 0 {
124+
clone.Pipelines = make(map[string]*PipelinesItem)
125+
for id, pipeline := range s.Pipelines {
126+
clone.Pipelines[id] = &PipelinesItem{
127+
Exporters: slices.Clone(pipeline.Exporters),
128+
Processors: slices.Clone(pipeline.Processors),
129+
Receivers: slices.Clone(pipeline.Receivers),
130+
}
131+
}
132+
}
133+
return &clone
134+
}
135+
110136
// cloneMap does a deep copy on a map of objects
111137
// TODO generics?
112138
func cloneMap(m map[string]map[string]interface{}) map[string]map[string]interface{} {

internal/pkg/model/schema.go

Lines changed: 33 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/policy/parsed_policy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func NewParsedPolicy(ctx context.Context, bulker bulk.Bulk, p model.Policy) (*Pa
9797
Inputs: policyInputs,
9898
SecretKeys: secretKeys,
9999
}
100+
100101
if trace := apm.TransactionFromContext(ctx); trace != nil {
101102
// Pass current transaction link (should be a monitor transaction) to caller (likely a client request).
102103
tCtx := trace.TraceContext()

internal/pkg/server/fleet_integration_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1673,3 +1673,112 @@ func Test_SmokeTest_AuditUnenroll(t *testing.T) {
16731673
cancel()
16741674
srv.waitExit() //nolint:errcheck // test case
16751675
}
1676+
1677+
func TestCheckinOTelColPolicy(t *testing.T) {
1678+
ctx, cancel := context.WithCancel(t.Context())
1679+
defer cancel()
1680+
1681+
idSuffix := uuid.Must(uuid.NewV4()).String()
1682+
componentID := func(id string) string {
1683+
return fmt.Sprintf("%s/%s", id, idSuffix)
1684+
}
1685+
policyData := model.PolicyData{
1686+
Outputs: map[string]map[string]interface{}{
1687+
"default": {
1688+
"type": "elasticsearch",
1689+
},
1690+
},
1691+
OutputPermissions: json.RawMessage(`{"default": {}}`),
1692+
Inputs: []map[string]any{},
1693+
Receivers: map[string]any{
1694+
componentID("somereceiver"): map[string]any{},
1695+
},
1696+
Processors: map[string]any{
1697+
componentID("someprocessor"): map[string]any{},
1698+
},
1699+
Connectors: map[string]any{
1700+
componentID("forward"): map[string]any{},
1701+
},
1702+
Exporters: map[string]any{
1703+
componentID("someexporter"): map[string]any{},
1704+
},
1705+
Service: &model.Service{
1706+
Pipelines: map[string]*model.PipelinesItem{
1707+
componentID("metrics"): &model.PipelinesItem{
1708+
Receivers: []string{componentID("somereceiver")},
1709+
Processors: []string{componentID("someprocessor")},
1710+
Exporters: []string{componentID("forward")},
1711+
},
1712+
"metrics": &model.PipelinesItem{
1713+
Receivers: []string{componentID("forward")},
1714+
Exporters: []string{componentID("someexporter")},
1715+
},
1716+
},
1717+
},
1718+
}
1719+
1720+
// Start test server
1721+
srv, err := startTestServer(t, ctx, policyData)
1722+
require.NoError(t, err)
1723+
ctx = testlog.SetLogger(t).WithContext(ctx)
1724+
1725+
cli := cleanhttp.DefaultClient()
1726+
// enroll an agent
1727+
t.Log("Enroll an agent")
1728+
req, err := http.NewRequestWithContext(ctx, "POST", srv.buildURL("", "enroll"), strings.NewReader(enrollBody))
1729+
require.NoError(t, err)
1730+
req.Header.Set("Authorization", "ApiKey "+srv.enrollKey)
1731+
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
1732+
req.Header.Set("Content-Type", "application/json")
1733+
res, err := cli.Do(req)
1734+
require.NoError(t, err)
1735+
1736+
require.Equal(t, http.StatusOK, res.StatusCode)
1737+
t.Log("Agent enrollment successful")
1738+
p, _ := io.ReadAll(res.Body)
1739+
res.Body.Close()
1740+
var obj map[string]interface{}
1741+
err = json.Unmarshal(p, &obj)
1742+
require.NoError(t, err)
1743+
1744+
item := obj["item"]
1745+
mm, ok := item.(map[string]interface{})
1746+
require.True(t, ok, "expected attribute item to be an object")
1747+
agentID, ok := mm["id"].(string)
1748+
require.True(t, ok, "expected attribute id to be a string")
1749+
1750+
apiKey, ok := mm["access_api_key"].(string)
1751+
require.True(t, ok, "expected attribute apiKey to be a string")
1752+
1753+
// checkin
1754+
t.Logf("Fake a checkin for agent %s", agentID)
1755+
req, err = http.NewRequestWithContext(ctx, "POST", srv.buildURL(agentID, "checkin"), strings.NewReader(checkinBody))
1756+
require.NoError(t, err)
1757+
req.Header.Set("Authorization", "ApiKey "+apiKey)
1758+
req.Header.Set("User-Agent", "elastic agent "+serverVersion)
1759+
req.Header.Set("Content-Type", "application/json")
1760+
res, err = cli.Do(req)
1761+
require.NoError(t, err)
1762+
defer res.Body.Close()
1763+
1764+
require.Equal(t, http.StatusOK, res.StatusCode)
1765+
t.Log("Checkin successful, verify body")
1766+
p, err = io.ReadAll(res.Body)
1767+
require.NoError(t, err)
1768+
1769+
err = json.Unmarshal(p, &obj)
1770+
require.NoError(t, err)
1771+
1772+
actionsRaw, ok := obj["actions"]
1773+
require.True(t, ok, "expected actions is missing")
1774+
actions, ok := actionsRaw.([]interface{})
1775+
require.True(t, ok, "expected actions to be an array")
1776+
require.Greater(t, len(actions), 0, "expected at least 1 action")
1777+
action, ok := actions[0].(map[string]interface{})
1778+
require.True(t, ok, "expected action to be an object")
1779+
_, ok = action["id"].(string)
1780+
require.True(t, ok, "expected action id to be string")
1781+
aAgentID, ok := action["agent_id"].(string)
1782+
require.True(t, ok, "expected action agent_id to be string")
1783+
require.Equal(t, agentID, aAgentID)
1784+
}

0 commit comments

Comments
 (0)