Skip to content

Commit b5d25a3

Browse files
authored
[APMLP-876] Add bucketTagResolver implementation for gradual rollout (#45951)
### What does this PR do? Add a new `imageresolver.Resolver` implementation called `bucketTagResolver`. There are a few changes in the other files in order to mock the HTTP calls in the resolver testing (namely the addition of the `http.RoundTripper` to the `newHTTPDigestFetcher(...)` constructor). NOTE: This does **NOT** change existing gradual rollout behavior. The new resolver implementation is additive, and enabling this to be the default will be its own separate PR. ### Motivation This is specifically for the tag-based gradual rollout initiative, and will allow the gradual rollout of APM tracer libraries and APM Injector releases to subsets of customers by generating a bucket ID between 0-9 based on the API key configured on the cluster agent. This is to ensure as much of an even distribution across the gradual rollout buckets as possible. This change also uses the new `imageresolver.cache` that was implemented (#45451) to help proactively promote the use of newer releases based on a configurable cache TTL (to be implemented in a subsequent PR, it is currently hard coded to default to 1 hour). If a tag -> digest mapping is expired according to the TTL, or a tag entry is not available in the cache, we will fetch the digest using the new `imageresolver.httpDigestFetcher` (#45653). This is essentially our own implementation of `crane.Digest` (see source code [here](https://github.com/google/go-containerregistry/blob/main/pkg/crane/digest.go)). ### Describe how you validated your changes - Addition of new unit tests in `resolver_test.go` - Manual E2E testing with `injector-dev` ### Additional Notes - (WIP) E2E testing integrated into CI (we will have a separate PR to add these when we actually enable the tag-based gradual rollout in favor of the current remote configuration-based gradual rollout). Co-authored-by: erika.yasuda <erika.yasuda@datadoghq.com>
1 parent e996792 commit b5d25a3

File tree

8 files changed

+321
-38
lines changed

8 files changed

+321
-38
lines changed

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/cache.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package imageresolver
99

1010
import (
11+
"net/http"
1112
"sync"
1213
"time"
1314
)
@@ -27,17 +28,17 @@ type httpDigestCache struct {
2728
fetcher *httpDigestFetcher
2829
}
2930

30-
func (c *httpDigestCache) get(registry string, repository string, tag string) (string, bool) {
31+
func (c *httpDigestCache) get(registry string, repository string, tag string) (string, error) {
3132
if digest := c.checkCache(registry, repository, tag); digest != "" {
32-
return digest, true
33+
return digest, nil
3334
}
3435

3536
digest, err := c.fetcher.digest(registry + "/" + repository + ":" + tag)
3637
if err != nil {
37-
return "", false
38+
return "", err
3839
}
3940

40-
return c.store(registry, repository, tag, digest), true
41+
return c.store(registry, repository, tag, digest), nil
4142
}
4243

4344
func (c *httpDigestCache) checkCache(registry, repository, tag string) string {
@@ -75,3 +76,16 @@ func (c *httpDigestCache) store(registry, repository, tag, digest string) string
7576
}
7677
return digest
7778
}
79+
80+
func newHTTPDigestCache(ttl time.Duration, ddRegistries map[string]struct{}, rt http.RoundTripper) *httpDigestCache {
81+
cache := make(registryCache)
82+
for registry := range ddRegistries {
83+
cache[registry] = make(repositoryCache)
84+
}
85+
86+
return &httpDigestCache{
87+
cache: cache,
88+
ttl: ttl,
89+
fetcher: newHTTPDigestFetcher(rt),
90+
}
91+
}

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/cache_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ func TestHttpDigestCache_Get_Success(t *testing.T) {
145145
tt.setupCache(cc)
146146
tt.setupMock(transport)
147147

148-
resolved, ok := cc.get("test-registry", tt.repository, tt.tag)
148+
resolved, err := cc.get("test-registry", tt.repository, tt.tag)
149149

150-
require.True(t, ok, "Expected successful get")
150+
require.NoError(t, err, "Expected successful get")
151151
require.NotNil(t, resolved, "Expected non-nil resolved image")
152152
require.Equal(t, tt.expectedDigest, resolved)
153153
require.Equal(t, tt.expectedCallCount, transport.CallCount())
@@ -157,9 +157,9 @@ func TestHttpDigestCache_Get_Success(t *testing.T) {
157157

158158
func TestHttpDigestCache_Get_Failure(t *testing.T) {
159159
cc, transport := mockHTTPDigestCache(1 * time.Minute)
160-
resolved, ok := cc.get("test-registry", "dd-lib-python-init", "v1")
160+
resolved, err := cc.get("test-registry", "dd-lib-python-init", "v1")
161161

162-
require.False(t, ok, "Expected failed get")
162+
require.Error(t, err, "Expected failed get")
163163
require.Empty(t, resolved, "Expected empty digest")
164164
require.Equal(t, 1, transport.CallCount())
165165
}
@@ -169,11 +169,11 @@ func TestHttpDigestCache_Get_MultipleRepositories(t *testing.T) {
169169
transport.addImage("registry1", "dd-lib-python-init", "v1", "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
170170
transport.addImage("registry2", "dd-lib-java-init", "v2", "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
171171

172-
resolved1, ok1 := cc.get("registry1", "dd-lib-python-init", "v1")
173-
resolved2, ok2 := cc.get("registry2", "dd-lib-java-init", "v2")
172+
resolved1, err1 := cc.get("registry1", "dd-lib-python-init", "v1")
173+
resolved2, err2 := cc.get("registry2", "dd-lib-java-init", "v2")
174174

175-
require.True(t, ok1, "Should fetch python lib")
176-
require.True(t, ok2, "Should fetch java lib")
175+
require.NoError(t, err1, "Should fetch python lib")
176+
require.NoError(t, err2, "Should fetch java lib")
177177
require.Equal(t, "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", resolved1)
178178
require.Equal(t, "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", resolved2)
179179
require.Equal(t, 2, transport.CallCount(), "Should have fetched digest twice")
@@ -185,13 +185,13 @@ func TestHttpDigestCache_Get_SameRepoMultipleTags(t *testing.T) {
185185
transport.addImage("registry", "dd-lib-python-init", "v2", "sha256:2222222222222222222222222222222222222222222222222222222222222222")
186186
transport.addImage("registry", "dd-lib-python-init", "latest", "sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
187187

188-
resolved1, ok1 := cc.get("registry", "dd-lib-python-init", "v1")
189-
resolved2, ok2 := cc.get("registry", "dd-lib-python-init", "v2")
190-
resolved3, ok3 := cc.get("registry", "dd-lib-python-init", "latest")
188+
resolved1, err1 := cc.get("registry", "dd-lib-python-init", "v1")
189+
resolved2, err2 := cc.get("registry", "dd-lib-python-init", "v2")
190+
resolved3, err3 := cc.get("registry", "dd-lib-python-init", "latest")
191191

192-
require.True(t, ok1)
193-
require.True(t, ok2)
194-
require.True(t, ok3)
192+
require.NoError(t, err1)
193+
require.NoError(t, err2)
194+
require.NoError(t, err3)
195195
require.Equal(t, "sha256:1111111111111111111111111111111111111111111111111111111111111111", resolved1)
196196
require.Equal(t, "sha256:2222222222222222222222222222222222222222222222222222222222222222", resolved2)
197197
require.Equal(t, "sha256:ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", resolved3)
@@ -202,8 +202,8 @@ func TestHttpDigestCache_Get_ConcurrentCacheHit(t *testing.T) {
202202
cc, transport := mockHTTPDigestCache(5 * time.Minute)
203203
transport.addImage("registry", "repo", "v1", "sha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890")
204204

205-
resolved, ok := cc.get("registry", "repo", "v1")
206-
require.True(t, ok)
205+
resolved, err := cc.get("registry", "repo", "v1")
206+
require.NoError(t, err)
207207
require.NotNil(t, resolved)
208208
require.Equal(t, 1, transport.CallCount(), "Should have made exactly one fetch to warm cache")
209209

@@ -213,8 +213,8 @@ func TestHttpDigestCache_Get_ConcurrentCacheHit(t *testing.T) {
213213
wg.Add(1)
214214
go func() {
215215
defer wg.Done()
216-
resolved, ok := cc.get("registry", "repo", "v1")
217-
require.True(t, ok)
216+
resolved, err := cc.get("registry", "repo", "v1")
217+
require.NoError(t, err)
218218
require.NotNil(t, resolved)
219219
require.Equal(t, "sha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890", resolved)
220220
}()

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Config struct {
3737
MaxInitRetries int
3838
InitRetryDelay time.Duration
3939
BucketID string
40+
DigestCacheTTL time.Duration
4041
}
4142

4243
func calculateRolloutBucket(apiKey string) string {
@@ -46,7 +47,6 @@ func calculateRolloutBucket(apiKey string) string {
4647
return strconv.Itoa(int(hashInt % rolloutBucketCount))
4748
}
4849

49-
// NewConfig creates a new Config
5050
func NewConfig(cfg config.Component, rcClient RemoteConfigClient) Config {
5151
return Config{
5252
Site: cfg.GetString("site"),
@@ -55,5 +55,6 @@ func NewConfig(cfg config.Component, rcClient RemoteConfigClient) Config {
5555
MaxInitRetries: 5,
5656
InitRetryDelay: 1 * time.Second,
5757
BucketID: calculateRolloutBucket(cfg.GetString("api_key")),
58+
DigestCacheTTL: 1 * time.Hour, // DEV: Make this configurable
5859
}
5960
}

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/config_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func TestNewConfig(t *testing.T) {
3838
MaxInitRetries: 5,
3939
InitRetryDelay: 1 * time.Second,
4040
BucketID: "2",
41+
DigestCacheTTL: 1 * time.Hour,
4142
},
4243
},
4344
{
@@ -55,6 +56,7 @@ func TestNewConfig(t *testing.T) {
5556
MaxInitRetries: 5,
5657
InitRetryDelay: 1 * time.Second,
5758
BucketID: "2",
59+
DigestCacheTTL: 1 * time.Hour,
5860
},
5961
},
6062
{
@@ -71,6 +73,7 @@ func TestNewConfig(t *testing.T) {
7173
MaxInitRetries: 5,
7274
InitRetryDelay: 1 * time.Second,
7375
BucketID: "2",
76+
DigestCacheTTL: 1 * time.Hour,
7477
},
7578
},
7679
{
@@ -88,6 +91,7 @@ func TestNewConfig(t *testing.T) {
8891
MaxInitRetries: 5,
8992
InitRetryDelay: 1 * time.Second,
9093
BucketID: "0",
94+
DigestCacheTTL: 1 * time.Hour,
9195
},
9296
},
9397
}

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/digest_fetcher.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,14 @@ func (h *httpDigestFetcher) digest(ref string) (string, error) {
101101
return digest, nil
102102
}
103103

104-
func newHTTPDigestFetcher() *httpDigestFetcher {
105-
transport := http.DefaultTransport.(*http.Transport).Clone()
104+
func newHTTPDigestFetcher(rt http.RoundTripper) *httpDigestFetcher {
105+
if rt == nil {
106+
rt = http.DefaultTransport.(*http.Transport).Clone()
107+
}
106108
return &httpDigestFetcher{
107109
client: &http.Client{
108110
Timeout: requestTimeout,
109-
Transport: transport,
111+
Transport: rt,
110112
},
111113
}
112114
}

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/digest_fetcher_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ func makeTestImageRef(server *httptest.Server) string {
3030
}
3131

3232
func TestHttpDigestFetcher_buildManifestRequest_Success(t *testing.T) {
33-
f := newHTTPDigestFetcher()
33+
transport := &mockRoundTripper{
34+
responses: make(map[string]*http.Response),
35+
}
36+
f := newHTTPDigestFetcher(transport)
3437
tests := []struct {
3538
name string
3639
imageRef string
@@ -94,7 +97,10 @@ func TestHttpDigestFetcher_buildManifestRequest_Success(t *testing.T) {
9497
}
9598

9699
func TestHttpDigestFetcher_buildManifestRequest_Error(t *testing.T) {
97-
f := newHTTPDigestFetcher()
100+
transport := &mockRoundTripper{
101+
responses: make(map[string]*http.Response),
102+
}
103+
f := newHTTPDigestFetcher(transport)
98104
tests := []struct {
99105
name string
100106
imageRef string

pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/image_resolver.go renamed to pkg/clusteragent/admission/mutate/autoinstrumentation/imageresolver/resolver.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package imageresolver
99

1010
import (
1111
"fmt"
12+
"net/http"
1213
"reflect"
1314
"strings"
1415
"sync"
@@ -31,6 +32,8 @@ type Resolver interface {
3132
// This is used when no remote config client is available.
3233
type noOpResolver struct{}
3334

35+
var _ Resolver = (*noOpResolver)(nil)
36+
3437
// NewNoOpResolver creates a new noOpImageResolver.
3538
// This is useful for testing or when image resolution is not needed.
3639
func NewNoOpResolver() Resolver {
@@ -58,6 +61,8 @@ type rcResolver struct {
5861
retryDelay time.Duration
5962
}
6063

64+
var _ Resolver = (*rcResolver)(nil)
65+
6166
func newRcResolver(cfg Config) Resolver {
6267
resolver := &rcResolver{
6368
rcClient: cfg.RCClient,
@@ -211,13 +216,68 @@ func (r *rcResolver) processUpdate(update map[string]state.RawConfig, applyState
211216
}
212217
}
213218

219+
// bucketTagResolver resolves image references using bucket tags.
220+
// It maintains a cache of image digests and resolves bucket tags to digests if possible.
221+
// Defaults to returning the original tag if resolution fails for any reason.
222+
type bucketTagResolver struct {
223+
cache *httpDigestCache
224+
bucketID string
225+
datadoghqRegistries map[string]struct{}
226+
}
227+
228+
var _ Resolver = (*bucketTagResolver)(nil)
229+
230+
func (r *bucketTagResolver) createBucketTag(tag string) string {
231+
normalizedTag := strings.TrimPrefix(tag, "v")
232+
233+
// DEV: Only create bucket tag for major versions (single number like "1" or "v1")
234+
if !strings.Contains(normalizedTag, ".") {
235+
return fmt.Sprintf("%s-gr%s", normalizedTag, r.bucketID)
236+
}
237+
return normalizedTag
238+
}
239+
240+
func (r *bucketTagResolver) Resolve(registry string, repository string, tag string) (*ResolvedImage, bool) {
241+
var result = tag
242+
var resolvedTag = tag
243+
defer func() {
244+
metrics.ImageResolutionAttempts.Inc(repository, resolvedTag, result)
245+
}()
246+
if !isDatadoghqRegistry(registry, r.datadoghqRegistries) {
247+
log.Debugf("%s is not a Datadoghq registry, not opting into gradual rollout", registry)
248+
return nil, false
249+
}
250+
251+
bucketTag := r.createBucketTag(tag)
252+
253+
digest, err := r.cache.get(registry, repository, bucketTag)
254+
resolvedTag = bucketTag
255+
if err != nil {
256+
log.Debugf("Failed to resolve %s/%s:%s for gradual rollout - %v", registry, repository, bucketTag, err)
257+
return nil, false
258+
}
259+
result = digest
260+
return &ResolvedImage{
261+
FullImageRef: registry + "/" + repository + "@" + digest,
262+
CanonicalVersion: tag, // DEV: This is the customer provided tag
263+
}, true
264+
}
265+
266+
func newBucketTagResolver(cfg Config) *bucketTagResolver {
267+
rt := http.DefaultTransport.(*http.Transport).Clone()
268+
return &bucketTagResolver{
269+
cache: newHTTPDigestCache(cfg.DigestCacheTTL, cfg.DDRegistries, rt),
270+
bucketID: cfg.BucketID,
271+
datadoghqRegistries: cfg.DDRegistries,
272+
}
273+
}
274+
214275
// New creates the appropriate Resolver based on whether
215276
// a remote config client is available.
216277
func New(cfg Config) Resolver {
217278
if cfg.RCClient == nil || reflect.ValueOf(cfg.RCClient).IsNil() {
218279
log.Debugf("No remote config client available")
219280
return NewNoOpResolver()
220281
}
221-
222282
return newRcResolver(cfg)
223283
}

0 commit comments

Comments
 (0)