Skip to content

Commit 08f7806

Browse files
committed
correctly handle secret resolution
1 parent a793f26 commit 08f7806

File tree

3 files changed

+42
-23
lines changed

3 files changed

+42
-23
lines changed

agent/configmgr/fleet/connection.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -233,30 +233,36 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio
233233
connection.mu.Unlock()
234234

235235
if hasHandler {
236-
if err := handler(pr.Packet.Topic, pr.Packet.Payload); err != nil {
237-
connection.logger.Error("topic handler failed", "topic", pr.Packet.Topic, "error", err)
238-
}
236+
// Process in goroutine to avoid blocking message acknowledgment
237+
go func() {
238+
if err := handler(pr.Packet.Topic, pr.Packet.Payload); err != nil {
239+
connection.logger.Error("topic handler failed", "topic", pr.Packet.Topic, "error", err)
240+
}
241+
}()
239242
return true, nil
240243
}
241244

242-
orgID := strings.Split(pr.Packet.Topic, "/")[1]
243-
244-
// Use a fresh context for async message handling, not the Connect() context
245-
// which may be canceled or have a short timeout
246-
err = connection.messaging.DispatchToHandlers(
247-
context.Background(),
248-
pr.Packet.Payload,
249-
orgID,
250-
details.AgentID,
251-
TopicActions{
252-
Subscribe: connection.subscribeToTopic,
253-
Publish: connection.publishToTopic,
254-
Unsubscribe: connection.unsubscribeFromTopic,
255-
},
256-
)
257-
if err != nil {
258-
connection.logger.Error("failed to dispatch to handlers", "error", err)
259-
}
245+
// Process in goroutine to avoid blocking message acknowledgment
246+
go func() {
247+
orgID := strings.Split(pr.Packet.Topic, "/")[1]
248+
249+
// Use a fresh context for async message handling, not the Connect() context
250+
// which may be canceled or have a short timeout
251+
err := connection.messaging.DispatchToHandlers(
252+
context.Background(),
253+
pr.Packet.Payload,
254+
orgID,
255+
details.AgentID,
256+
TopicActions{
257+
Subscribe: connection.subscribeToTopic,
258+
Publish: connection.publishToTopic,
259+
Unsubscribe: connection.unsubscribeFromTopic,
260+
},
261+
)
262+
if err != nil {
263+
connection.logger.Error("failed to dispatch to handlers", "error", err)
264+
}
265+
}()
260266

261267
return true, nil
262268
},

agent/docker/default_config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ orb:
77
client_id: ${FLEET_CLIENT_ID}
88
client_secret: ${FLEET_CLIENT_SECRET}
99
otlp_bridge_grpc_port: 4337
10+
secrets_manager:
11+
active: fleet
12+
sources:
13+
fleet:
14+
timeout: 30
1015
backends:
1116
network_discovery:
1217
device_discovery:

agent/secretsmgr/fleet.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type fleetCachedSecret struct {
5454

5555
// NewFleetSecretsManager creates a new Fleet secrets manager
5656
func NewFleetSecretsManager(logger *slog.Logger, cfg config.FleetSecretsManager) *FleetSecretsManager {
57-
timeout := 30 * time.Second
57+
timeout := 120 * time.Second
5858
if cfg.Timeout != nil && *cfg.Timeout > 0 {
5959
timeout = time.Duration(*cfg.Timeout) * time.Second
6060
}
@@ -105,6 +105,9 @@ func (f *FleetSecretsManager) HandleMessage(topic string, payload []byte) error
105105
return f.handleResponse(payload)
106106
case f.updatedTopic:
107107
return f.handleUpdateNotification(payload)
108+
default:
109+
f.logger.Info("received unknown message on topic", "topic", topic)
110+
return nil
108111
}
109112
return nil
110113
}
@@ -115,6 +118,7 @@ func (f *FleetSecretsManager) handleResponse(payload []byte) error {
115118
f.logger.Error("failed to unmarshal secret response", "error", err)
116119
return err
117120
}
121+
f.logger.Debug("handling secret response", "request_id", response.RequestID, "status", response.Status, "secrets", len(response.Secrets))
118122

119123
f.mu.Lock()
120124
ch, exists := f.pendingReqs[response.RequestID]
@@ -211,6 +215,7 @@ func (f *FleetSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (
211215
newPayload := payload
212216

213217
// Process the Data field
218+
// TODO: currently this will solve secrets sequentially - we should find all the secrets and then request them all at once
214219
processedData, err := f.processValue(payload.Data, payload.ID)
215220
if err != nil {
216221
return payload, err
@@ -304,6 +309,8 @@ func (f *FleetSecretsManager) processString(s string, id string) (string, error)
304309
return "", fmt.Errorf("failed to get secret %s: %w", fleetPath, err)
305310
}
306311

312+
f.logger.Info("got secret", "secret_path", fleetPath)
313+
307314
// Cache the secret
308315
f.mu.Lock()
309316
f.usedVars[fleetPath] = fleetCachedSecret{
@@ -344,6 +351,7 @@ func (f *FleetSecretsManager) processSlice(s []any, id string) ([]any, error) {
344351

345352
// requestSecret requests a secret from the control plane via MQTT
346353
func (f *FleetSecretsManager) requestSecret(ctx context.Context, path string, policyIDs map[string]bool) (*messages.SecretValue, error) {
354+
f.logger.Info("requesting secret", "path", path, "policy_ids", policyIDs)
347355
if f.publisher == nil {
348356
return nil, fmt.Errorf("MQTT publisher not bound")
349357
}
@@ -404,7 +412,7 @@ func (f *FleetSecretsManager) requestSecret(ctx context.Context, path string, po
404412
case <-ctx.Done():
405413
return nil, fmt.Errorf("context canceled while waiting for secret response")
406414
case <-time.After(f.timeout):
407-
return nil, fmt.Errorf("timeout waiting for secret response")
415+
return nil, fmt.Errorf("timeout waiting for secret response after %s", f.timeout)
408416
}
409417
}
410418

0 commit comments

Comments
 (0)