Skip to content

Commit 16b44e9

Browse files
Added support for secrets into action data (#5482)
1 parent 0d00710 commit 16b44e9

File tree

9 files changed

+127
-28
lines changed

9 files changed

+127
-28
lines changed

internal/pkg/action/dispatcher.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
1314
"github.com/elastic/fleet-server/v7/internal/pkg/es"
1415
"github.com/elastic/fleet-server/v7/internal/pkg/logger/ecs"
1516
"github.com/elastic/fleet-server/v7/internal/pkg/model"
1617
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
18+
"github.com/elastic/fleet-server/v7/internal/pkg/secret"
1719
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
1820

1921
"github.com/rs/zerolog"
@@ -34,23 +36,25 @@ func (s Sub) Ch() chan []model.Action {
3436

3537
// Dispatcher tracks agent subscriptions and emits actions to the subscriptions.
3638
type Dispatcher struct {
37-
am monitor.SimpleMonitor
38-
limit *rate.Limiter
39+
am monitor.SimpleMonitor
40+
limit *rate.Limiter
41+
bulker bulk.Bulk
3942

4043
mx sync.RWMutex
4144
subs map[string]Sub
4245
}
4346

4447
// NewDispatcher creates a Dispatcher using the provided monitor.
45-
func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher {
48+
func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int, bulker bulk.Bulk) *Dispatcher {
4649
r := rate.Inf
4750
if throttle > 0 {
4851
r = rate.Every(throttle)
4952
}
5053
return &Dispatcher{
51-
am: am,
52-
limit: rate.NewLimiter(r, i),
53-
subs: make(map[string]Sub),
54+
am: am,
55+
limit: rate.NewLimiter(r, i),
56+
subs: make(map[string]Sub),
57+
bulker: bulker,
5458
}
5559
}
5660

@@ -117,6 +121,16 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
117121
zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal action document")
118122
break
119123
}
124+
125+
secretData, err := secret.GetActionDataWithSecrets(ctx, action.Data, action.SecretReferences, d.bulker)
126+
if err != nil {
127+
zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to replace secrets in action document")
128+
break
129+
}
130+
131+
action.Data = secretData
132+
action.SecretReferences = nil
133+
120134
numAgents := len(action.Agents)
121135
for i, agentID := range action.Agents {
122136
arr := agentActions[agentID]

internal/pkg/action/dispatcher_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/elastic/fleet-server/v7/internal/pkg/es"
1414
"github.com/elastic/fleet-server/v7/internal/pkg/model"
1515
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
16+
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
1617
"github.com/stretchr/testify/assert"
1718
"github.com/stretchr/testify/mock"
1819
"golang.org/x/time/rate"
@@ -24,7 +25,7 @@ type mockMonitor struct {
2425

2526
func (m *mockMonitor) Output() <-chan []es.HitT {
2627
args := m.Called()
27-
return args.Get(0).(<-chan []es.HitT)
28+
return args.Get(0).(<-chan []es.HitT) //nolint:errcheck // we don't need to check here
2829
}
2930

3031
func (m *mockMonitor) Run(ctx context.Context) error {
@@ -34,15 +35,17 @@ func (m *mockMonitor) Run(ctx context.Context) error {
3435

3536
func (m *mockMonitor) GetCheckpoint() sqn.SeqNo {
3637
args := m.Called()
37-
return args.Get(0).(sqn.SeqNo)
38+
return args.Get(0).(sqn.SeqNo) //nolint:errcheck // we don't need to check here
3839
}
3940

4041
func TestNewDispatcher(t *testing.T) {
4142
m := &mockMonitor{}
42-
d := NewDispatcher(m, 0, 0)
43+
bulker := ftesting.NewMockBulk()
44+
d := NewDispatcher(m, 0, 0, bulker)
4345

4446
assert.NotNil(t, d.am)
4547
assert.NotNil(t, d.subs)
48+
assert.NotNil(t, d.bulker)
4649
}
4750

4851
func compareActions(t *testing.T, expects, results []model.Action) {
@@ -259,7 +262,7 @@ func Test_Dispatcher_Run(t *testing.T) {
259262
}
260263

261264
now := time.Now()
262-
ctx, cancel := context.WithCancel(context.Background())
265+
ctx, cancel := context.WithCancel(t.Context())
263266
defer cancel()
264267
go func() {
265268
err := d.Run(ctx)
@@ -366,7 +369,7 @@ func Test_offsetStartTime(t *testing.T) {
366369
}}
367370
for _, tt := range tests {
368371
t.Run(tt.name, func(t *testing.T) {
369-
r := offsetStartTime(context.Background(), tt.start, tt.dur, tt.i, tt.total)
372+
r := offsetStartTime(t.Context(), tt.start, tt.dur, tt.i, tt.total)
370373
assert.Equal(t, tt.result, r)
371374
})
372375
}

internal/pkg/api/handleCheckin.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/elastic/fleet-server/v7/internal/pkg/model"
3131
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
3232
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
33+
"github.com/elastic/fleet-server/v7/internal/pkg/secret"
3334
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
3435

3536
"github.com/hashicorp/go-version"
@@ -861,7 +862,7 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
861862
data := model.ClonePolicyData(pp.Policy.Data)
862863
for policyName, policyOutput := range data.Outputs {
863864
// NOTE: Not sure if output secret keys collected here include new entries, but they are collected for completeness
864-
ks, err := policy.ProcessOutputSecret(ctx, policyOutput, bulker) // makes a bulk request to get secret values
865+
ks, err := secret.ProcessOutputSecret(ctx, policyOutput, bulker) // makes a bulk request to get secret values
865866
if err != nil {
866867
return nil, fmt.Errorf("failed to process output secrets %q: %w",
867868
policyName, err)

internal/pkg/model/schema.go

Lines changed: 5 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/policy/parsed_policy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ import (
1414

1515
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
1616
"github.com/elastic/fleet-server/v7/internal/pkg/model"
17+
"github.com/elastic/fleet-server/v7/internal/pkg/secret"
1718
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
1819
)
1920

2021
const (
2122
FieldOutputs = "outputs"
2223
FieldOutputType = "type"
23-
FieldOutputSecrets = "secrets"
2424
FieldOutputFleetServer = "fleet_server"
2525
FieldOutputServiceToken = "service_token"
2626
FieldOutputPermissions = "output_permissions"
@@ -68,7 +68,7 @@ func NewParsedPolicy(ctx context.Context, bulker bulk.Bulk, p model.Policy) (*Pa
6868
return nil, err
6969
}
7070
for name, policyOutput := range p.Data.Outputs {
71-
ks, err := ProcessOutputSecret(ctx, policyOutput, bulker)
71+
ks, err := secret.ProcessOutputSecret(ctx, policyOutput, bulker)
7272
if err != nil {
7373
return nil, err
7474
}
@@ -80,7 +80,7 @@ func NewParsedPolicy(ctx context.Context, bulker bulk.Bulk, p model.Policy) (*Pa
8080
if err != nil {
8181
return nil, err
8282
}
83-
policyInputs, keys, err := getPolicyInputsWithSecrets(ctx, p.Data, bulker)
83+
policyInputs, keys, err := secret.GetPolicyInputsWithSecrets(ctx, p.Data, bulker)
8484
if err != nil {
8585
return nil, err
8686
}

internal/pkg/policy/secret.go renamed to internal/pkg/secret/secret.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
// or more contributor license agreements. Licensed under the Elastic License;
33
// you may not use this file except in compliance with the Elastic License.
44

5-
package policy
5+
package secret
66

77
import (
88
"context"
9+
"encoding/json"
910
"regexp"
1011
"strconv"
1112
"strings"
@@ -15,12 +16,16 @@ import (
1516
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
1617
)
1718

19+
const (
20+
FieldOutputSecrets = "secrets"
21+
)
22+
1823
var (
1924
secretRegex = regexp.MustCompile(`\$co\.elastic\.secret{([^}]*)}`)
2025
)
2126

2227
// read secret values that belong to the agent policy's secret references, returns secrets as id:value map
23-
func getSecretValues(ctx context.Context, secretRefs []model.SecretReferencesItems, bulker bulk.Bulk) (map[string]string, error) {
28+
func GetSecretValues(ctx context.Context, secretRefs []model.SecretReferencesItems, bulker bulk.Bulk) (map[string]string, error) {
2429
if len(secretRefs) == 0 {
2530
return nil, nil
2631
}
@@ -40,7 +45,7 @@ func getSecretValues(ctx context.Context, secretRefs []model.SecretReferencesIte
4045

4146
// read inputs and secret_references from agent policy
4247
// replace values of secret refs in inputs and input streams properties
43-
func getPolicyInputsWithSecrets(ctx context.Context, data *model.PolicyData, bulker bulk.Bulk) ([]map[string]interface{}, []string, error) {
48+
func GetPolicyInputsWithSecrets(ctx context.Context, data *model.PolicyData, bulker bulk.Bulk) ([]map[string]interface{}, []string, error) {
4449
if len(data.Inputs) == 0 {
4550
return nil, nil, nil
4651
}
@@ -49,7 +54,7 @@ func getPolicyInputsWithSecrets(ctx context.Context, data *model.PolicyData, bul
4954
return data.Inputs, nil, nil
5055
}
5156

52-
secretValues, err := getSecretValues(ctx, data.SecretReferences, bulker)
57+
secretValues, err := GetSecretValues(ctx, data.SecretReferences, bulker)
5358
if err != nil {
5459
return nil, nil, err
5560
}
@@ -67,6 +72,31 @@ func getPolicyInputsWithSecrets(ctx context.Context, data *model.PolicyData, bul
6772
return result, keys, nil
6873
}
6974

75+
func GetActionDataWithSecrets(ctx context.Context, data json.RawMessage, refs []model.SecretReferencesItems, bulker bulk.Bulk) (json.RawMessage, error) {
76+
if len(refs) == 0 {
77+
return data, nil
78+
}
79+
80+
secretValues, err := GetSecretValues(ctx, refs, bulker)
81+
if err != nil {
82+
return data, err
83+
}
84+
85+
parsedData, err := smap.Parse(data)
86+
if err != nil {
87+
return data, err
88+
}
89+
90+
result, _ := replaceMapRef(parsedData, secretValues)
91+
92+
b, err := json.Marshal(result)
93+
if err != nil {
94+
return data, err
95+
}
96+
97+
return b, nil
98+
}
99+
70100
// replaceMapRef replaces all nested secret values in the passed input and returns the resulting input along with a list of keys where inputs have been replaced.
71101
func replaceMapRef(input map[string]any, secrets map[string]string) (map[string]any, []string) {
72102
keys := make([]string, 0)
@@ -207,7 +237,7 @@ func ProcessOutputSecret(ctx context.Context, output smap.Map, bulker bulk.Bulk)
207237
if len(secretReferences) == 0 {
208238
return nil, nil
209239
}
210-
secretValues, err := getSecretValues(ctx, secretReferences, bulker)
240+
secretValues, err := GetSecretValues(ctx, secretReferences, bulker)
211241
if err != nil {
212242
return nil, err
213243
}

internal/pkg/policy/secret_test.go renamed to internal/pkg/secret/secret_test.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44

55
//go:build !integration
66

7-
package policy
7+
package secret
88

99
import (
1010
"context"
11+
"encoding/json"
1112
"testing"
1213

1314
"github.com/elastic/fleet-server/v7/internal/pkg/model"
@@ -86,7 +87,7 @@ func TestGetSecretValues(t *testing.T) {
8687
refs := []model.SecretReferencesItems{{ID: "ref1"}, {ID: "ref2"}}
8788
bulker := ftesting.NewMockBulk()
8889

89-
secretRefs, _ := getSecretValues(context.TODO(), refs, bulker)
90+
secretRefs, _ := GetSecretValues(context.TODO(), refs, bulker)
9091

9192
expectedRefs := map[string]string{
9293
"ref1": "ref1_value",
@@ -95,6 +96,40 @@ func TestGetSecretValues(t *testing.T) {
9596
assert.Equal(t, expectedRefs, secretRefs)
9697
}
9798

99+
func TestGetActionDataWithSecrets(t *testing.T) {
100+
refs := []model.SecretReferencesItems{
101+
{ID: "ref1"},
102+
{ID: "ref2"},
103+
}
104+
// Input JSON with secret references
105+
input := map[string]interface{}{
106+
"username": "user1",
107+
"password": "$co.elastic.secret{ref1}",
108+
"nested": map[string]interface{}{
109+
"token": "$co.elastic.secret{ref2}",
110+
},
111+
}
112+
b, err := json.Marshal(input)
113+
require.NoError(t, err)
114+
115+
bulker := ftesting.NewMockBulk()
116+
result, err := GetActionDataWithSecrets(t.Context(), b, refs, bulker)
117+
require.NoError(t, err)
118+
119+
var out map[string]interface{}
120+
err = json.Unmarshal(result, &out)
121+
require.NoError(t, err)
122+
123+
assert.Equal(t, "user1", out["username"])
124+
assert.Equal(t, "ref1_value", out["password"])
125+
126+
nestedMap, ok := out["nested"].(map[string]interface{})
127+
assert.True(t, ok)
128+
129+
require.NoError(t, err)
130+
assert.Equal(t, "ref2_value", nestedMap["token"])
131+
}
132+
98133
func TestGetPolicyInputsWithSecretsAndStreams(t *testing.T) {
99134
refs := []model.SecretReferencesItems{{ID: "ref1"}, {ID: "ref2"}, {ID: "ref3"}}
100135
inputs := []map[string]interface{}{
@@ -126,7 +161,7 @@ func TestGetPolicyInputsWithSecretsAndStreams(t *testing.T) {
126161
{"id": "input2", "streams": []interface{}{expectedStream}},
127162
}
128163

129-
result, keys, _ := getPolicyInputsWithSecrets(context.TODO(), &pData, bulker)
164+
result, keys, _ := GetPolicyInputsWithSecrets(context.TODO(), &pData, bulker)
130165

131166
assert.Equal(t, expectedResult, result)
132167
assert.ElementsMatch(t, []string{"inputs.0.package_var_secret", "inputs.0.input_var_secret", "inputs.1.streams.0.package_var_secret", "inputs.1.streams.0.input_var_secret", "inputs.1.streams.0.stream_var_secret"}, keys)
@@ -173,7 +208,7 @@ func TestPolicyInputSteamsEmbedded(t *testing.T) {
173208
}},
174209
}
175210

176-
result, keys, err := getPolicyInputsWithSecrets(context.TODO(), &pData, bulker)
211+
result, keys, err := GetPolicyInputsWithSecrets(context.TODO(), &pData, bulker)
177212
require.NoError(t, err)
178213

179214
assert.Equal(t, expected, result)
@@ -201,7 +236,7 @@ func TestGetPolicyInputsNoopWhenNoSecrets(t *testing.T) {
201236
{"id": "input2", "streams": []interface{}{expectedStream}},
202237
}
203238

204-
result, keys, _ := getPolicyInputsWithSecrets(context.TODO(), &pData, bulker)
239+
result, keys, _ := GetPolicyInputsWithSecrets(context.TODO(), &pData, bulker)
205240

206241
assert.Equal(t, expectedResult, result)
207242
assert.Empty(t, keys)

internal/pkg/server/fleet.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro
518518
}
519519
g.Go(loggedRunFunc(ctx, "Action monitor", am.Run))
520520

521-
ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst)
521+
ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst, bulker)
522522
g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run))
523523

524524
bc := checkin.NewBulk(bulker)

model/schema.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,19 @@
6161
"description": "The optional action timeout in seconds",
6262
"type": "integer"
6363
},
64+
"secret_references": {
65+
"description": "A list of all secrets fleet-server needs to inject into the actions data before passing it to the agent. This attribute is removed when action data is send to an agent.",
66+
"type": "array",
67+
"items": {
68+
"type": "object",
69+
"properties": {
70+
"id": {
71+
"type": "string"
72+
}
73+
},
74+
"required": ["id"]
75+
}
76+
},
6477
"user_id": {
6578
"description": "The ID of the user who created the action.",
6679
"type": "string"

0 commit comments

Comments
 (0)