Skip to content

Commit a793f26

Browse files
committed
adds fleet secrets manager
1 parent b2c1370 commit a793f26

File tree

13 files changed

+955
-37
lines changed

13 files changed

+955
-37
lines changed

agent/agent.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,17 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
191191
return err
192192
}
193193

194+
// Bind fleet secrets manager to fleet config manager if both are fleet-based
195+
// This needs to happen before SolveConfigSecrets so secrets can be resolved
196+
if a.config.OrbAgent.ConfigManager.Active == "fleet" && a.config.OrbAgent.SecretsManager.Active == "fleet" {
197+
if fleetCM, ok := a.configManager.(*configmgr.FleetConfigManager); ok {
198+
if err := fleetCM.BindSecretsManager(a.secretsManager); err != nil {
199+
a.logger.Error("error binding fleet secrets manager", "error", err)
200+
return err
201+
}
202+
}
203+
}
204+
194205
var err error
195206
if a.config.OrbAgent.Backends,
196207
a.config.OrbAgent.ConfigManager,

agent/config/types.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,15 @@ type VaultManager struct {
6969
Schedule *string `yaml:"schedule,omitempty"`
7070
}
7171

72-
// SecretsSources represents the configuration for manager sources, including vault.
72+
// FleetSecretsManager represents the configuration for the Fleet secrets manager
73+
type FleetSecretsManager struct {
74+
Timeout *int `yaml:"timeout,omitempty"` // Request timeout in seconds
75+
}
76+
77+
// SecretsSources represents the configuration for manager sources, including vault and fleet.
7378
type SecretsSources struct {
74-
Vault VaultManager `yaml:"vault"`
79+
Vault VaultManager `yaml:"vault"`
80+
Fleet FleetSecretsManager `yaml:"fleet"`
7581
}
7682

7783
// ManagerSecrets represents the configuration for the Secrets Manager

agent/configmgr/fleet.go

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ import (
1515
"github.com/netboxlabs/orb-agent/agent/otlpbridge"
1616
"github.com/netboxlabs/orb-agent/agent/policymgr"
1717
"github.com/netboxlabs/orb-agent/agent/redact"
18+
"github.com/netboxlabs/orb-agent/agent/secretsmgr"
1819
)
1920

20-
// Compile-time check to ensure fleetConfigManager implements Manager interface
21-
var _ Manager = (*fleetConfigManager)(nil)
21+
// Compile-time check to ensure FleetConfigManager implements Manager interface
22+
var _ Manager = (*FleetConfigManager)(nil)
2223

23-
type fleetConfigManager struct {
24+
// FleetConfigManager implements the Manager interface for Fleet-based configuration
25+
type FleetConfigManager struct {
2426
logger *slog.Logger
2527
connection fleet.MQTTConnector
2628
authTokenManager *fleet.AuthTokenManager
@@ -38,10 +40,10 @@ type fleetConfigManager struct {
3840
monitorCancel context.CancelFunc
3941
}
4042

41-
func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *fleetConfigManager {
43+
func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *FleetConfigManager {
4244
resetChan := make(chan struct{}, 1)
4345
reconnectChan := make(chan struct{}, 1)
44-
return &fleetConfigManager{
46+
return &FleetConfigManager{
4547
logger: logger,
4648
connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, reconnectChan, backendState),
4749
authTokenManager: fleet.NewAuthTokenManager(logger),
@@ -52,11 +54,11 @@ func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, ba
5254
}
5355
}
5456

55-
// newFleetConfigManagerWithConnection creates a fleetConfigManager with a custom connection (for testing)
56-
func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever, conn fleet.MQTTConnector) *fleetConfigManager {
57+
// newFleetConfigManagerWithConnection creates a FleetConfigManager with a custom connection (for testing)
58+
func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever, conn fleet.MQTTConnector) *FleetConfigManager {
5759
resetChan := make(chan struct{}, 1)
5860
reconnectChan := make(chan struct{}, 1)
59-
return &fleetConfigManager{
61+
return &FleetConfigManager{
6062
logger: logger,
6163
connection: conn, // Use provided connection instead of creating new one
6264
authTokenManager: fleet.NewAuthTokenManager(logger),
@@ -67,7 +69,8 @@ func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.Pol
6769
}
6870
}
6971

70-
func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
72+
// Start initializes and starts the Fleet configuration manager
73+
func (fleetManager *FleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
7174
ctx := context.Background()
7275

7376
var err error
@@ -222,8 +225,48 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
222225
return nil
223226
}
224227

228+
// BindSecretsManager binds a fleet secrets manager to the MQTT connection
229+
func (fleetManager *FleetConfigManager) BindSecretsManager(sm secretsmgr.Manager) error {
230+
// Check if it's a fleet secrets manager by type assertion
231+
fleetSM, ok := sm.(*secretsmgr.FleetSecretsManager)
232+
if !ok {
233+
// Try to get the underlying fleet secrets manager
234+
// This handles the case where the manager is wrapped
235+
return nil // Not a fleet secrets manager, nothing to bind
236+
}
237+
238+
// Register OnReadyHook to bind secrets manager when MQTT connection is ready
239+
fleetManager.connection.AddOnReadyHook(func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) {
240+
// Create publisher and subscriber adapters
241+
pub := secretsmgr.NewCMAdapterPublisher(cm)
242+
sub := secretsmgr.NewCMAdapterSubscriber(cm)
243+
244+
// Bind the secrets manager to MQTT
245+
if err := fleetSM.BindMQTT(pub, sub, topics.SecretsRequest, topics.SecretsResponse, topics.SecretsUpdated); err != nil {
246+
fleetManager.logger.Error("failed to bind fleet secrets manager to MQTT", "error", err)
247+
return
248+
}
249+
250+
// Register topic handlers for secrets topics
251+
// Note: These handlers will be called from OnPublishReceived in connection.go
252+
fleetManager.connection.RegisterTopicHandler(topics.SecretsResponse, func(topic string, payload []byte) error {
253+
return fleetSM.HandleMessage(topic, payload)
254+
})
255+
fleetManager.connection.RegisterTopicHandler(topics.SecretsUpdated, func(topic string, payload []byte) error {
256+
return fleetSM.HandleMessage(topic, payload)
257+
})
258+
259+
fleetManager.logger.Info("Fleet secrets manager bound to MQTT",
260+
slog.String("request_topic", topics.SecretsRequest),
261+
slog.String("response_topic", topics.SecretsResponse),
262+
slog.String("updated_topic", topics.SecretsUpdated))
263+
})
264+
265+
return nil
266+
}
267+
225268
// refreshAndReconnect refreshes the JWT token and reconnects to MQTT
226-
func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error {
269+
func (fleetManager *FleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error {
227270
// Refresh JWT token
228271
token, err := fleetManager.authTokenManager.RefreshToken(ctx)
229272
if err != nil {
@@ -271,7 +314,7 @@ func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context,
271314
return nil
272315
}
273316

274-
func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (string, error) {
317+
func (fleetManager *FleetConfigManager) configToSafeString(cfg config.Config) (string, error) {
275318
redacted := redact.SensitiveData(cfg)
276319
configYaml, err := yaml.Marshal(redacted)
277320
if err != nil {
@@ -280,13 +323,14 @@ func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (s
280323
return string(configYaml), nil
281324
}
282325

283-
func (fleetManager *fleetConfigManager) GetContext(ctx context.Context) context.Context {
326+
// GetContext returns the context for the Fleet configuration manager
327+
func (fleetManager *FleetConfigManager) GetContext(ctx context.Context) context.Context {
284328
// Empty implementation for now - just return the context as-is
285329
return ctx
286330
}
287331

288332
// monitorTokenExpiry periodically checks token expiry and triggers reconnection before token expires
289-
func (fleetManager *fleetConfigManager) monitorTokenExpiry() {
333+
func (fleetManager *FleetConfigManager) monitorTokenExpiry() {
290334
// Check interval: default 30 seconds, configurable via config
291335
checkInterval := 30 * time.Second
292336
if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval != nil && *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval > 0 {
@@ -346,7 +390,7 @@ func (fleetManager *fleetConfigManager) monitorTokenExpiry() {
346390
}
347391

348392
// Stop gracefully shuts down the OTLP bridge and token expiry monitor.
349-
func (fleetManager *fleetConfigManager) Stop(ctx context.Context) error {
393+
func (fleetManager *FleetConfigManager) Stop(ctx context.Context) error {
350394
// Stop token expiry monitor
351395
if fleetManager.monitorCancel != nil {
352396
fleetManager.monitorCancel()

agent/configmgr/fleet/connection.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log/slog"
77
"net/url"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/eclipse/paho.golang/autopaho"
@@ -15,6 +16,9 @@ import (
1516
"github.com/netboxlabs/orb-agent/agent/policymgr"
1617
)
1718

19+
// TopicMessageHandler handles messages for a specific topic
20+
type TopicMessageHandler func(topic string, payload []byte) error
21+
1822
// MQTTConnection manages the MQTT connection
1923
type MQTTConnection struct {
2024
logger *slog.Logger
@@ -23,11 +27,13 @@ type MQTTConnection struct {
2327
messaging *Messaging
2428
resetChan chan struct{}
2529
onReadyHooks []func(cm *autopaho.ConnectionManager, topics TokenResponseTopics)
30+
topicHandlers map[string]TopicMessageHandler
2631
connectionTopics TokenResponseTopics
2732
reconnectChan chan struct{}
2833
capabilitiesFailCount int
2934
groupMembershipFailCount int
3035
heartbeatFailCount int
36+
mu sync.Mutex
3137
}
3238

3339
// NewMQTTConnection creates a new MQTTConnection
@@ -40,6 +46,7 @@ func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetC
4046
messaging: NewMessaging(logger, pMgr, resetChan, &groupManager),
4147
resetChan: resetChan,
4248
onReadyHooks: make([]func(cm *autopaho.ConnectionManager, topics TokenResponseTopics), 0),
49+
topicHandlers: make(map[string]TopicMessageHandler),
4350
reconnectChan: reconnectChan,
4451
}
4552
}
@@ -49,6 +56,13 @@ func (connection *MQTTConnection) AddOnReadyHook(fn func(cm *autopaho.Connection
4956
connection.onReadyHooks = append(connection.onReadyHooks, fn)
5057
}
5158

59+
// RegisterTopicHandler registers a handler for a specific topic
60+
func (connection *MQTTConnection) RegisterTopicHandler(topic string, handler TopicMessageHandler) {
61+
connection.mu.Lock()
62+
defer connection.mu.Unlock()
63+
connection.topicHandlers[topic] = handler
64+
}
65+
5266
// TopicActions are the actions to take on a topic
5367
type TopicActions struct {
5468
Subscribe func(topic string) error
@@ -72,6 +86,7 @@ type MQTTConnector interface {
7286
Disconnect(ctx context.Context, heartbeatTopic string) error
7387
Reconnect(ctx context.Context, details ConnectionDetails, backends map[string]backend.Backend, labels map[string]string, configFile string, timeout time.Duration) error
7488
AddOnReadyHook(fn func(cm *autopaho.ConnectionManager, topics TokenResponseTopics))
89+
RegisterTopicHandler(topic string, handler TopicMessageHandler)
7590
}
7691

7792
// Connect connects to the MQTT broker
@@ -212,6 +227,18 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio
212227
// Log any published messages to subscribed topics
213228
connection.logger.Info("received MQTT message", "topic", pr.Packet.Topic)
214229

230+
// Check if there's a topic-specific handler
231+
connection.mu.Lock()
232+
handler, hasHandler := connection.topicHandlers[pr.Packet.Topic]
233+
connection.mu.Unlock()
234+
235+
if hasHandler {
236+
if err := handler(pr.Packet.Topic, pr.Packet.Payload); err != nil {
237+
connection.logger.Error("topic handler failed", "topic", pr.Packet.Topic, "error", err)
238+
}
239+
return true, nil
240+
}
241+
215242
orgID := strings.Split(pr.Packet.Topic, "/")[1]
216243

217244
// Use a fresh context for async message handling, not the Connect() context
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package messages
2+
3+
import (
4+
"time"
5+
)
6+
7+
// CurrentSecretsSchemaVersion defines the current version of the secrets schema
8+
const CurrentSecretsSchemaVersion = "1.0"
9+
10+
// Error codes for secret operations
11+
const (
12+
ErrorCodeNotFound = "NOT_FOUND"
13+
ErrorCodeForbidden = "FORBIDDEN"
14+
ErrorCodeInvalidPath = "INVALID_PATH"
15+
ErrorCodeTimeout = "TIMEOUT"
16+
ErrorCodeInternalError = "INTERNAL_ERROR"
17+
ErrorCodeRateLimited = "RATE_LIMITED"
18+
)
19+
20+
// SecretRequest represents a single secret request in a SecretRequestMsg
21+
type SecretRequest struct {
22+
Path string `json:"path"` // The path to the secret in the control plane's secret store
23+
Context string `json:"context"` // The context where the secret is used (policy ID, "config", or "backend")
24+
}
25+
26+
// SecretRequestMsg represents a request for secrets
27+
type SecretRequestMsg struct {
28+
SchemaVersion string `json:"schema_version"`
29+
RequestID string `json:"request_id"` // UUID v4
30+
Timestamp time.Time `json:"timestamp"`
31+
Secrets []SecretRequest `json:"secrets"`
32+
}
33+
34+
// SecretValue represents a successfully retrieved secret
35+
type SecretValue struct {
36+
Path string `json:"path"`
37+
Value string `json:"value"`
38+
Version int `json:"version"`
39+
Metadata map[string]string `json:"metadata,omitempty"`
40+
}
41+
42+
// SecretError represents an error for a failed secret retrieval
43+
type SecretError struct {
44+
Path string `json:"path"`
45+
Error string `json:"error"` // Human-readable error message
46+
Code string `json:"code"` // Machine-readable error code
47+
}
48+
49+
// SecretResponseMsg represents a response to a secret request
50+
type SecretResponseMsg struct {
51+
SchemaVersion string `json:"schema_version"`
52+
RequestID string `json:"request_id"` // Matches the request_id from the original request
53+
Timestamp time.Time `json:"timestamp"`
54+
Status string `json:"status"` // "success", "partial", "error"
55+
Secrets []SecretValue `json:"secrets,omitempty"` // Omitted if status is "error"
56+
Errors []SecretError `json:"errors,omitempty"` // Omitted if status is "success"
57+
}
58+
59+
// SecretUpdate represents a single secret update notification
60+
type SecretUpdate struct {
61+
Path string `json:"path"`
62+
Version int `json:"version"`
63+
Contexts []string `json:"contexts"` // List of contexts (policy IDs) that use this secret
64+
}
65+
66+
// SecretUpdateNotificationMsg represents a push notification for updated secrets
67+
type SecretUpdateNotificationMsg struct {
68+
SchemaVersion string `json:"schema_version"`
69+
Timestamp time.Time `json:"timestamp"`
70+
Updates []SecretUpdate `json:"updates"`
71+
}

agent/configmgr/fleet/mocks.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ func (m *MockMQTTConnection) AddOnReadyHook(fn func(cm *autopaho.ConnectionManag
118118
m.hooks = append(m.hooks, fn)
119119
}
120120

121+
// RegisterTopicHandler registers a handler for a specific topic (mock implementation)
122+
func (m *MockMQTTConnection) RegisterTopicHandler(_ string, _ TopicMessageHandler) {
123+
// No-op for mock
124+
}
125+
121126
// TriggerOnReadyHook triggers all registered onReady hooks (for testing)
122127
func (m *MockMQTTConnection) TriggerOnReadyHook(cm *autopaho.ConnectionManager, topics TokenResponseTopics) {
123128
for _, hook := range m.hooks {

0 commit comments

Comments
 (0)