Skip to content

Commit e4a4e2d

Browse files
authored
feat(trace): Trigger API Key refresh on 403 (#45674)
### What does this PR do? The agent now supports API Key rotation. Instead of dropping payloads when a 403 is received from the backend, we will instead trigger an API Key (secrets) refresh, and mark the payload as retriable. This PR also refactors the API Key management within `pkg/trace/writer/sender.go`, making reads and writes safe for concurrent use. ### Motivation See above ### Describe how you validated your changes Added unit tests ### Additional Notes Co-authored-by: inigo.lopezdeheredia <inigo.lopezdeheredia@datadoghq.com>
1 parent 89764d5 commit e4a4e2d

File tree

12 files changed

+288
-83
lines changed

12 files changed

+288
-83
lines changed

comp/trace/agent/impl/agent.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,17 @@ func NewAgent(deps dependencies) (traceagent.Component, error) {
137137

138138
prepGoRuntime(tracecfg)
139139

140+
tracecfg.SecretsRefreshFn = func() (string, error) {
141+
if deps.Secrets == nil {
142+
log.Error("Secrets component not available, cannot trigger refresh")
143+
return "", errors.New("secrets component not available")
144+
}
145+
return deps.Secrets.Refresh(true)
146+
}
147+
140148
c.Agent = pkgagent.NewAgent(
141149
ctx,
142-
c.config.Object(),
150+
tracecfg,
143151
c.telemetryCollector,
144152
statsdCl,
145153
deps.Compressor,

comp/trace/config/setup.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,10 @@ func applyDatadogConfig(c *config.AgentConfig, core corecompcfg.Component) error
548548
// Default of 4 was chosen through experimentation, but may not be the optimal value.
549549
c.MaxSenderRetries = 4
550550
}
551+
if core.IsSet("secret_refresh_on_api_key_failure_interval") {
552+
// Use the global secret refresh interval for throttling API key refresh at the sender level
553+
c.APIKeyRefreshThrottleInterval = time.Duration(core.GetInt("secret_refresh_on_api_key_failure_interval")) * time.Minute
554+
}
551555
if core.IsConfigured("apm_config.sync_flushing") {
552556
c.SynchronousFlushing = core.GetBool("apm_config.sync_flushing")
553557
}

pkg/trace/config/config.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,8 @@ type AgentConfig struct {
416416
// case, the sender will drop failed payloads when it is unable to enqueue
417417
// them for another retry.
418418
MaxSenderRetries int
419+
// APIKeyRefreshThrottleInterval is the minimum time between API key refresh attempts.
420+
APIKeyRefreshThrottleInterval time.Duration
419421
// HTTP Transport used in writer connections. If nil, default transport values will be used.
420422
HTTPTransportFunc func() *http.Transport `json:"-"`
421423
// ClientStatsFlushInterval specifies the frequency at which the client stats aggregator will flush its buffer.
@@ -555,6 +557,11 @@ type AgentConfig struct {
555557

556558
// APMMode specifies whether using "edge" APM mode. May support other modes in the future. If unset, it has no impact.
557559
APMMode string
560+
561+
// SecretsRefreshFn is called when a 403 response is received to trigger
562+
// API key refresh from the secrets backend. It blocks until the refresh
563+
// completes and returns a message and any error encountered.
564+
SecretsRefreshFn func() (string, error) `json:"-"`
558565
}
559566

560567
// RemoteClient client is used to APM Sampling Updates from a remote source.
@@ -610,11 +617,12 @@ func New() *AgentConfig {
610617
PipeSecurityDescriptor: "D:AI(A;;GA;;;WD)",
611618
GUIPort: "5002",
612619

613-
StatsWriter: new(WriterConfig),
614-
TraceWriter: new(WriterConfig),
615-
ConnectionResetInterval: 0, // disabled
616-
MaxSenderRetries: 4,
617-
ClientStatsFlushInterval: 2 * time.Second, // bucket duration (2s)
620+
StatsWriter: new(WriterConfig),
621+
TraceWriter: new(WriterConfig),
622+
ConnectionResetInterval: 0, // disabled
623+
MaxSenderRetries: 4,
624+
APIKeyRefreshThrottleInterval: 2 * time.Minute,
625+
ClientStatsFlushInterval: 2 * time.Second, // bucket duration (2s)
618626

619627
StatsdHost: "localhost",
620628
StatsdPort: 8125,

pkg/trace/writer/sender.go

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,24 @@ func newSenders(cfg *config.AgentConfig, r eventRecorder, path string, climit, q
4545
log.Criticalf("Invalid host endpoint: %q", endpoint.Host)
4646
os.Exit(1)
4747
}
48-
senders[i] = newSender(&senderConfig{
48+
scfg := &senderConfig{
4949
client: cfg.NewHTTPClient(),
5050
maxConns: int(maxConns),
5151
maxQueued: qsize,
5252
maxRetries: cfg.MaxSenderRetries,
5353
url: url,
54-
apiKey: endpoint.APIKey,
5554
recorder: r,
5655
userAgent: fmt.Sprintf("Datadog Trace Agent/%s/%s", cfg.AgentVersion, cfg.GitCommit),
5756
isMRF: endpoint.IsMRF,
5857
MRFFailoverAPM: cfg.MRFFailoverAPM,
59-
}, statsd)
58+
}
59+
apiKeyManager := &apiKeyManager{
60+
apiKey: endpoint.APIKey,
61+
refreshFn: cfg.SecretsRefreshFn,
62+
throttleInterval: cfg.APIKeyRefreshThrottleInterval,
63+
}
64+
65+
senders[i] = newSender(scfg, apiKeyManager, statsd)
6066
}
6167
return senders
6268
}
@@ -140,8 +146,6 @@ type senderConfig struct {
140146
client *config.ResetClient
141147
// url specifies the URL to send requests too.
142148
url *url.URL
143-
// apiKey specifies the Datadog API key to use.
144-
apiKey string
145149
// maxConns specifies the maximum number of allowed concurrent ougoing
146150
// connections.
147151
maxConns int
@@ -162,10 +166,62 @@ type senderConfig struct {
162166
MRFFailoverAPM func() bool
163167
}
164168

169+
// apiKeyManager handles API Key access for concurrent use
170+
type apiKeyManager struct {
171+
sync.RWMutex
172+
// apiKey specifies the Datadog API key to use.
173+
apiKey string
174+
175+
// refreshFn triggers blocking API key refresh from the secrets backend
176+
refreshFn func() (string, error)
177+
178+
// throttleInterval specifies minimum time between refresh calls
179+
throttleInterval time.Duration
180+
181+
// lastRefresh tracks when the last refresh occurred
182+
lastRefresh time.Time
183+
}
184+
185+
func (m *apiKeyManager) Get() string {
186+
m.RLock()
187+
defer m.RUnlock()
188+
return m.apiKey
189+
}
190+
191+
func (m *apiKeyManager) Update(newKey string) {
192+
m.Lock()
193+
defer m.Unlock()
194+
m.apiKey = newKey
195+
}
196+
197+
func (m *apiKeyManager) refresh() {
198+
if m.refreshFn == nil || m.throttleInterval == 0 {
199+
return
200+
}
201+
202+
m.Lock()
203+
if time.Since(m.lastRefresh) < m.throttleInterval {
204+
m.Unlock()
205+
log.Debugf("API Key refresh throttled, last refresh was %v ago", time.Since(m.lastRefresh))
206+
return
207+
}
208+
209+
// Update the last refresh time before calling refresh to prevent concurrent calls
210+
m.lastRefresh = time.Now()
211+
m.Unlock()
212+
213+
if result, err := m.refreshFn(); err != nil {
214+
log.Debugf("API Key refresh failed: %v", err)
215+
} else if result != "" {
216+
log.Infof("API Key refresh completed: %s", result)
217+
}
218+
}
219+
165220
// sender is responsible for sending payloads to a given URL. It uses a size-limited
166221
// retry queue with a backoff mechanism in case of retriable errors.
167222
type sender struct {
168-
cfg *senderConfig
223+
cfg *senderConfig
224+
apiKeyManager *apiKeyManager
169225

170226
queue chan *payload // payload queue
171227
inflight *atomic.Int32 // inflight payloads
@@ -178,14 +234,15 @@ type sender struct {
178234
}
179235

180236
// newSender returns a new sender based on the given config cfg.
181-
func newSender(cfg *senderConfig, statsd statsd.ClientInterface) *sender {
237+
func newSender(cfg *senderConfig, apiKeyManager *apiKeyManager, statsd statsd.ClientInterface) *sender {
182238
s := sender{
183-
cfg: cfg,
184-
queue: make(chan *payload, cfg.maxQueued),
185-
inflight: atomic.NewInt32(0),
186-
maxRetries: int32(cfg.maxRetries),
187-
statsd: statsd,
188-
enabled: true,
239+
cfg: cfg,
240+
apiKeyManager: apiKeyManager,
241+
queue: make(chan *payload, cfg.maxQueued),
242+
inflight: atomic.NewInt32(0),
243+
maxRetries: int32(cfg.maxRetries),
244+
statsd: statsd,
245+
enabled: true,
189246
}
190247
for i := 0; i < cfg.maxConns; i++ {
191248
go s.loop()
@@ -388,7 +445,7 @@ const (
388445
)
389446

390447
func (s *sender) do(req *http.Request) error {
391-
req.Header.Set(headerAPIKey, s.cfg.apiKey)
448+
req.Header.Set(headerAPIKey, s.apiKeyManager.Get())
392449
req.Header.Set(headerUserAgent, s.cfg.userAgent)
393450
resp, err := s.cfg.client.Do(req)
394451
if err != nil {
@@ -404,6 +461,11 @@ func (s *sender) do(req *http.Request) error {
404461
}
405462
resp.Body.Close()
406463

464+
if resp.StatusCode == http.StatusForbidden {
465+
log.Debugf("API Key invalid (403), triggering secret refresh")
466+
s.apiKeyManager.refresh()
467+
}
468+
407469
if isRetriable(resp.StatusCode) {
408470
return &retriableError{
409471
fmt.Errorf("server responded with %q", resp.Status),
@@ -419,7 +481,8 @@ func (s *sender) do(req *http.Request) error {
419481

420482
// isRetriable reports whether the give HTTP status code should be retried.
421483
func isRetriable(code int) bool {
422-
if code == http.StatusRequestTimeout || code == http.StatusTooManyRequests {
484+
// TODO: Double check what response codes are expected from the backend when API Key is invalid
485+
if code == http.StatusRequestTimeout || code == http.StatusTooManyRequests || code == http.StatusForbidden {
423486
return true
424487
}
425488
// 5xx errors can be retried

0 commit comments

Comments
 (0)