Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion comp/trace/agent/impl/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,18 @@ func NewAgent(deps dependencies) (traceagent.Component, error) {

prepGoRuntime(tracecfg)

tracecfg.SecretsRefreshFn = func() bool {
if deps.Secrets == nil {
log.Error("Secrets component not available, cannot trigger refresh")
return false
}
_, _ = deps.Secrets.Refresh(false)
return true
}

c.Agent = pkgagent.NewAgent(
ctx,
c.config.Object(),
tracecfg,
c.telemetryCollector,
statsdCl,
deps.Compressor,
Expand Down
5 changes: 5 additions & 0 deletions pkg/trace/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,11 @@ type AgentConfig struct {

// APMMode specifies whether using "edge" APM mode. May support other modes in the future. If unset, it has no impact.
APMMode string

// SecretsRefreshFn is called when a 403 response is received to trigger
// API key refresh from the secrets backend. It returns true if the refresh
// was triggered, false if throttled or unavailable.
SecretsRefreshFn func() bool `json:"-"`
}

// RemoteClient client is used to APM Sampling Updates from a remote source.
Expand Down
35 changes: 24 additions & 11 deletions pkg/trace/writer/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ func newSenders(cfg *config.AgentConfig, r eventRecorder, path string, climit, q
os.Exit(1)
}
senders[i] = newSender(&senderConfig{
client: cfg.NewHTTPClient(),
maxConns: int(maxConns),
maxQueued: qsize,
maxRetries: cfg.MaxSenderRetries,
url: url,
apiKey: endpoint.APIKey,
recorder: r,
userAgent: fmt.Sprintf("Datadog Trace Agent/%s/%s", cfg.AgentVersion, cfg.GitCommit),
isMRF: endpoint.IsMRF,
MRFFailoverAPM: cfg.MRFFailoverAPM,
client: cfg.NewHTTPClient(),
maxConns: int(maxConns),
maxQueued: qsize,
maxRetries: cfg.MaxSenderRetries,
url: url,
apiKey: endpoint.APIKey,
recorder: r,
userAgent: fmt.Sprintf("Datadog Trace Agent/%s/%s", cfg.AgentVersion, cfg.GitCommit),
isMRF: endpoint.IsMRF,
MRFFailoverAPM: cfg.MRFFailoverAPM,
secretsRefreshFn: cfg.SecretsRefreshFn,
}, statsd)
}
return senders
Expand Down Expand Up @@ -160,6 +161,10 @@ type senderConfig struct {
isMRF bool
// MRFFailoverAPM determines whether APM data should be failed over to the secondary (MRF) DC.
MRFFailoverAPM func() bool
// secretsRefreshFn is called when a 403 response is received to trigger
// API key refresh from the secrets backend. It returns true if the refresh
// was triggered, false if throttled or unavailable.
secretsRefreshFn func() bool
}

// sender is responsible for sending payloads to a given URL. It uses a size-limited
Expand Down Expand Up @@ -404,6 +409,13 @@ func (s *sender) do(req *http.Request) error {
}
resp.Body.Close()

if resp.StatusCode == http.StatusForbidden {
log.Debugf("API Key invalid (403), triggering secret refresh")
if s.cfg.secretsRefreshFn != nil {
s.cfg.secretsRefreshFn()
}
}

if isRetriable(resp.StatusCode) {
return &retriableError{
fmt.Errorf("server responded with %q", resp.Status),
Expand All @@ -419,7 +431,8 @@ func (s *sender) do(req *http.Request) error {

// isRetriable reports whether the give HTTP status code should be retried.
func isRetriable(code int) bool {
if code == http.StatusRequestTimeout || code == http.StatusTooManyRequests {
// TODO: Double check what response codes are expected from the backend when API Key is invalid
if code == http.StatusRequestTimeout || code == http.StatusTooManyRequests || code == http.StatusForbidden {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern I have here is what do we do when the Forbidden status code is legit / the API key has been rotated and the customer is not using RC / api key refresh? I suppose it's probably not a big deal since if the API key is bad then we're going to drop things anyways, but it does mean we'll retry requests more than we probably should? I'm wondering if there's a way we can gate these retries so that they only retry when we know we can try to get a new api key (And then only retry after we have a new api key?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question. I initially approached the issue from that angle, but quickly saw that keeping track of invalid keys and blocking the sender had it's own tradeoffs: more complex api key changes propagation, refactor goroutines to avoid race conditions when reading/writing the key... and the sender will still be unable to send other payloads if the api key is invalid. This concern you raise is also the reason I'm not 100% fond of the proposed solution 🤔

return true
}
// 5xx errors can be retried
Expand Down
67 changes: 62 additions & 5 deletions pkg/trace/writer/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestMaxConns(t *testing.T) {
func TestIsRetriable(t *testing.T) {
for code, want := range map[int]bool{
400: false,
403: true,
404: false,
408: true,
409: false,
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestSender(t *testing.T) {
s := newSender(testSenderConfig(server.URL), statsd)
s.Push(expectResponses(503, 503, 200))
for i := 0; i < 20; i++ {
s.Push(expectResponses(403))
s.Push(expectResponses(404))
}

s.Stop()
Expand Down Expand Up @@ -231,7 +232,7 @@ func TestSender(t *testing.T) {
s.Push(expectResponses(200))
s.Push(expectResponses(200))
for i := 0; i < 4; i++ {
s.Push(expectResponses(403))
s.Push(expectResponses(404))
}
s.Stop()

Expand All @@ -247,7 +248,7 @@ func TestSender(t *testing.T) {
sent := recorder.data(eventTypeSent)
assert.Equal(3, len(sent))
for i := 0; i < 3; i++ {
assert.True(sent[i].bytes > len("|403"))
assert.True(sent[i].bytes > len("|404"))
assert.NoError(sent[i].err)
assert.Equal(1, sent[i].count)
assert.True(time.Since(start)-sent[i].duration < time.Second)
Expand All @@ -256,8 +257,8 @@ func TestSender(t *testing.T) {
failed := recorder.data(eventTypeRejected)
assert.Equal(4, len(failed))
for i := 0; i < 4; i++ {
assert.True(failed[i].bytes > len("|403"))
assert.Equal("403 Forbidden", failed[i].err.Error())
assert.True(failed[i].bytes > len("|404"))
assert.Equal("404 Not Found", failed[i].err.Error())
assert.Equal(1, failed[i].count)
assert.True(time.Since(start)-failed[i].duration < time.Second)
}
Expand Down Expand Up @@ -314,6 +315,62 @@ func TestSender(t *testing.T) {
assert.Equal(0, servers[2].Retried(), "retry")
assert.Equal(20, servers[2].Failed(), "failed")
})

t.Run("403_secrets_refresh_fn", func(t *testing.T) {
assert := assert.New(t)
server := newTestServer()
defer server.Close()

callbackInvoked := false
cfg := testSenderConfig(server.URL)
cfg.secretsRefreshFn = func() bool {
callbackInvoked = true
return true
}

s := newSender(cfg, statsd)
s.Push(expectResponses(403))
s.Stop()

assert.True(callbackInvoked, "secrets refresh callback should have been invoked on 403")
})
t.Run("403_secrets_refresh_fn_nil", func(t *testing.T) {
assert := assert.New(t)
server := newTestServer()
defer server.Close()

cfg := testSenderConfig(server.URL)
cfg.secretsRefreshFn = nil

s := newSender(cfg, statsd)
assert.NotPanics(func() {
s.Push(expectResponses(403))
s.Stop()
})
})

t.Run("403_retries_with_backoff", func(t *testing.T) {
assert := assert.New(t)
server := newTestServer()
defer server.Close()
defer useBackoffDuration(time.Nanosecond)()

callbackInvoked := false
cfg := testSenderConfig(server.URL)
cfg.secretsRefreshFn = func() bool {
callbackInvoked = true
return true
}

s := newSender(cfg, statsd)
s.Push(expectResponses(403, 403, 200))
s.Stop()

assert.Equal(3, server.Total(), "should have made 3 requests")
assert.Equal(2, server.Retried(), "should have retried twice")
assert.Equal(1, server.Accepted(), "should have succeeded once")
assert.True(callbackInvoked, "secrets refresh callback should have been invoked on 403")
})
}

func TestPayload(t *testing.T) {
Expand Down
Loading