Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
return err
}

// Bind fleet secrets manager to fleet config manager if both are fleet-based
// This needs to happen before SolveConfigSecrets so secrets can be resolved
if a.config.OrbAgent.ConfigManager.Active == "fleet" && a.config.OrbAgent.SecretsManager.Active == "fleet" {
if fleetCM, ok := a.configManager.(*configmgr.FleetConfigManager); ok {
if err := fleetCM.BindSecretsManager(a.secretsManager); err != nil {
a.logger.Error("error binding fleet secrets manager", "error", err)
return err
}
}
}

var err error
if a.config.OrbAgent.Backends,
a.config.OrbAgent.ConfigManager,
Expand Down
10 changes: 8 additions & 2 deletions agent/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ type VaultManager struct {
Schedule *string `yaml:"schedule,omitempty"`
}

// SecretsSources represents the configuration for manager sources, including vault.
// FleetSecretsManager represents the configuration for the Fleet secrets manager
type FleetSecretsManager struct {
Timeout *int `yaml:"timeout,omitempty"` // Request timeout in seconds
}

// SecretsSources represents the configuration for manager sources, including vault and fleet.
type SecretsSources struct {
Vault VaultManager `yaml:"vault"`
Vault VaultManager `yaml:"vault"`
Fleet FleetSecretsManager `yaml:"fleet"`
}

// ManagerSecrets represents the configuration for the Secrets Manager
Expand Down
72 changes: 58 additions & 14 deletions agent/configmgr/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/netboxlabs/orb-agent/agent/otlpbridge"
"github.com/netboxlabs/orb-agent/agent/policymgr"
"github.com/netboxlabs/orb-agent/agent/redact"
"github.com/netboxlabs/orb-agent/agent/secretsmgr"
)

// Compile-time check to ensure fleetConfigManager implements Manager interface
var _ Manager = (*fleetConfigManager)(nil)
// Compile-time check to ensure FleetConfigManager implements Manager interface
var _ Manager = (*FleetConfigManager)(nil)

type fleetConfigManager struct {
// FleetConfigManager implements the Manager interface for Fleet-based configuration
type FleetConfigManager struct {
logger *slog.Logger
connection fleet.MQTTConnector
authTokenManager *fleet.AuthTokenManager
Expand All @@ -38,10 +40,10 @@ type fleetConfigManager struct {
monitorCancel context.CancelFunc
}

func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *fleetConfigManager {
func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *FleetConfigManager {
resetChan := make(chan struct{}, 1)
reconnectChan := make(chan struct{}, 1)
return &fleetConfigManager{
return &FleetConfigManager{
logger: logger,
connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, reconnectChan, backendState),
authTokenManager: fleet.NewAuthTokenManager(logger),
Expand All @@ -52,11 +54,11 @@ func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, ba
}
}

// newFleetConfigManagerWithConnection creates a fleetConfigManager with a custom connection (for testing)
func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever, conn fleet.MQTTConnector) *fleetConfigManager {
// newFleetConfigManagerWithConnection creates a FleetConfigManager with a custom connection (for testing)
func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever, conn fleet.MQTTConnector) *FleetConfigManager {
resetChan := make(chan struct{}, 1)
reconnectChan := make(chan struct{}, 1)
return &fleetConfigManager{
return &FleetConfigManager{
logger: logger,
connection: conn, // Use provided connection instead of creating new one
authTokenManager: fleet.NewAuthTokenManager(logger),
Expand All @@ -67,7 +69,8 @@ func newFleetConfigManagerWithConnection(logger *slog.Logger, pMgr policymgr.Pol
}
}

func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
// Start initializes and starts the Fleet configuration manager
func (fleetManager *FleetConfigManager) Start(cfg config.Config, backends map[string]backend.Backend) error {
ctx := context.Background()

var err error
Expand Down Expand Up @@ -222,8 +225,48 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
return nil
}

// BindSecretsManager binds a fleet secrets manager to the MQTT connection
func (fleetManager *FleetConfigManager) BindSecretsManager(sm secretsmgr.Manager) error {
// Check if it's a fleet secrets manager by type assertion
fleetSM, ok := sm.(*secretsmgr.FleetSecretsManager)
if !ok {
// Try to get the underlying fleet secrets manager
// This handles the case where the manager is wrapped
return nil // Not a fleet secrets manager, nothing to bind
}

// Register OnReadyHook to bind secrets manager when MQTT connection is ready
fleetManager.connection.AddOnReadyHook(func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) {
// Create publisher and subscriber adapters
pub := secretsmgr.NewCMAdapterPublisher(cm)
sub := secretsmgr.NewCMAdapterSubscriber(cm)

// Bind the secrets manager to MQTT
if err := fleetSM.BindMQTT(pub, sub, topics.SecretsRequest, topics.SecretsResponse, topics.SecretsUpdated); err != nil {
fleetManager.logger.Error("failed to bind fleet secrets manager to MQTT", "error", err)
return
}

// Register topic handlers for secrets topics
// Note: These handlers will be called from OnPublishReceived in connection.go
fleetManager.connection.RegisterTopicHandler(topics.SecretsResponse, func(topic string, payload []byte) error {
return fleetSM.HandleMessage(topic, payload)
})
fleetManager.connection.RegisterTopicHandler(topics.SecretsUpdated, func(topic string, payload []byte) error {
return fleetSM.HandleMessage(topic, payload)
})

fleetManager.logger.Info("Fleet secrets manager bound to MQTT",
slog.String("request_topic", topics.SecretsRequest),
slog.String("response_topic", topics.SecretsResponse),
slog.String("updated_topic", topics.SecretsUpdated))
})

return nil
}

// refreshAndReconnect refreshes the JWT token and reconnects to MQTT
func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error {
func (fleetManager *FleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error {
// Refresh JWT token
token, err := fleetManager.authTokenManager.RefreshToken(ctx)
if err != nil {
Expand Down Expand Up @@ -271,7 +314,7 @@ func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context,
return nil
}

func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (string, error) {
func (fleetManager *FleetConfigManager) configToSafeString(cfg config.Config) (string, error) {
redacted := redact.SensitiveData(cfg)
configYaml, err := yaml.Marshal(redacted)
if err != nil {
Expand All @@ -280,13 +323,14 @@ func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (s
return string(configYaml), nil
}

func (fleetManager *fleetConfigManager) GetContext(ctx context.Context) context.Context {
// GetContext returns the context for the Fleet configuration manager
func (fleetManager *FleetConfigManager) GetContext(ctx context.Context) context.Context {
// Empty implementation for now - just return the context as-is
return ctx
}

// monitorTokenExpiry periodically checks token expiry and triggers reconnection before token expires
func (fleetManager *fleetConfigManager) monitorTokenExpiry() {
func (fleetManager *FleetConfigManager) monitorTokenExpiry() {
// Check interval: default 30 seconds, configurable via config
checkInterval := 30 * time.Second
if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval != nil && *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval > 0 {
Expand Down Expand Up @@ -346,7 +390,7 @@ func (fleetManager *fleetConfigManager) monitorTokenExpiry() {
}

// Stop gracefully shuts down the OTLP bridge and token expiry monitor.
func (fleetManager *fleetConfigManager) Stop(ctx context.Context) error {
func (fleetManager *FleetConfigManager) Stop(ctx context.Context) error {
// Stop token expiry monitor
if fleetManager.monitorCancel != nil {
fleetManager.monitorCancel()
Expand Down
Loading