Skip to content

Commit 07cfe9c

Browse files
Merge pull request #32 from DataDog/vr/delta-resource
Support snapshot resources in linear cache
2 parents 0b418af + ea48895 commit 07cfe9c

File tree

11 files changed

+430
-84
lines changed

11 files changed

+430
-84
lines changed

envoy/config/cluster/v3/cluster.pb.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cache/internal/cached_resource.go

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package internal
33
import (
44
"crypto/sha256"
55
"encoding/hex"
6+
"errors"
67
"fmt"
78
"sync"
89
"time"
@@ -29,16 +30,15 @@ type ResourceWithTTL struct {
2930
// It contains the resource itself and its associated version (currently in two different modes).
3031
// It should not be altered once created, to allow concurrent access.
3132
type CachedResource struct {
32-
Name string
33-
typeURL string
33+
Name string
3434

3535
resource Resource
3636
ttl *time.Duration
3737

3838
// cacheVersion is the version of the cache at the time of last update, used in sotw.
3939
cacheVersion string
4040

41-
marshalFunc func() ([]byte, error)
41+
marshalFunc func() (*anypb.Any, error)
4242
computeResourceVersionFunc func() (string, error)
4343
}
4444

@@ -49,23 +49,29 @@ func WithCacheVersion(version string) CachedResourceOption {
4949
return func(r *CachedResource) { r.cacheVersion = version }
5050
}
5151

52-
// WithMarshaledResource enables the user to provide the already marshaled bytes if they have them.
53-
// Those bytes should strive at being consistent if the object has not changed (beware protobuf non-deterministic marshaling)
52+
// WithMarshaledResource enables the user to provide the already marshaled resource if they have them.
53+
// This serialization should strive at being consistent if the object has not changed (beware protobuf non-deterministic marshaling through anypb.New)
5454
// or alternatively the resource version should also then be set.
5555
// By default it is computed by performing a deterministic protobuf marshaling.
56-
func WithMarshaledResource(bytes []byte) CachedResourceOption {
57-
if len(bytes) == 0 {
58-
return func(*CachedResource) {}
56+
func WithMarshaledResource(res *anypb.Any) CachedResourceOption {
57+
if res == nil {
58+
return func(r *CachedResource) {
59+
r.marshalFunc = nil
60+
}
61+
}
62+
return func(r *CachedResource) {
63+
r.marshalFunc = func() (*anypb.Any, error) { return res, nil }
5964
}
60-
return func(r *CachedResource) { r.marshalFunc = func() ([]byte, error) { return bytes, nil } }
6165
}
6266

6367
// WithResourceVersion enables the user to provide the resource version to be used.
6468
// This version should be constant if the object has not changed to avoid needlessly sending resources to clients.
6569
// By default it is computed by hashing the serialized version of the resource.
6670
func WithResourceVersion(version string) CachedResourceOption {
6771
if version == "" {
68-
return func(*CachedResource) {}
72+
return func(r *CachedResource) {
73+
r.computeResourceVersionFunc = nil
74+
}
6975
}
7076
return func(r *CachedResource) { r.computeResourceVersionFunc = func() (string, error) { return version, nil } }
7177
}
@@ -75,17 +81,17 @@ func WithResourceTTL(ttl *time.Duration) CachedResourceOption {
7581
return func(r *CachedResource) { r.ttl = ttl }
7682
}
7783

78-
func NewCachedResource(name, typeURL string, res Resource, opts ...CachedResourceOption) *CachedResource {
84+
func NewCachedResource(name string, res Resource, opts ...CachedResourceOption) *CachedResource {
7985
cachedRes := &CachedResource{
8086
Name: name,
81-
typeURL: typeURL,
8287
resource: res,
8388
}
8489
for _, opt := range opts {
8590
opt(cachedRes)
8691
}
92+
8793
if cachedRes.marshalFunc == nil {
88-
cachedRes.marshalFunc = sync.OnceValues(func() ([]byte, error) {
94+
cachedRes.marshalFunc = sync.OnceValues(func() (*anypb.Any, error) {
8995
return marshalResource(res)
9096
})
9197
}
@@ -95,7 +101,7 @@ func NewCachedResource(name, typeURL string, res Resource, opts ...CachedResourc
95101
if err != nil {
96102
return "", fmt.Errorf("marshaling resource: %w", err)
97103
}
98-
return hashResource(marshaled), nil
104+
return hashResource(marshaled.Value), nil
99105
})
100106
}
101107
return cachedRes
@@ -114,7 +120,7 @@ func (c *CachedResource) HasTTL() bool {
114120
}
115121

116122
// getMarshaledResource lazily marshals the resource and returns the bytes.
117-
func (c *CachedResource) getMarshaledResource() ([]byte, error) {
123+
func (c *CachedResource) getMarshaledResource() (*anypb.Any, error) {
118124
return c.marshalFunc()
119125
}
120126

@@ -143,50 +149,36 @@ func (c *CachedResource) GetRawResource() ResourceWithTTL {
143149
}
144150
}
145151

146-
var deltaResourceTypeURL = "type.googleapis.com/" + string(proto.MessageName(&discovery.Resource{}))
147-
148-
// getResourceVersion lazily hashes the resource and returns the stable hash used to track version changes.
152+
// GetSotwResource returns the resource as is to be wrapped in a sotw response (i.e. DiscoveryResponse).
153+
// If the response is a heartbeat the underlying resource is not included.
149154
func (c *CachedResource) GetSotwResource(isHeartbeat bool) (*anypb.Any, error) {
150-
buildResource := func() (*anypb.Any, error) {
151-
marshaled, err := c.getMarshaledResource()
152-
if err != nil {
153-
return nil, fmt.Errorf("marshaling: %w", err)
154-
}
155-
return &anypb.Any{
156-
TypeUrl: c.typeURL,
157-
Value: marshaled,
158-
}, nil
155+
if isHeartbeat && c.ttl == nil {
156+
return nil, errors.New("heartbeat requested without ttl set")
159157
}
160158

161159
if c.ttl == nil {
162-
return buildResource()
160+
// No TTL set, directly return the anypb format of the resource.
161+
return c.getMarshaledResource()
163162
}
164163

164+
// A TTL is set, wrapped the resource into a discovery resource.
165165
wrappedResource := &discovery.Resource{
166166
Name: c.Name,
167167
Ttl: durationpb.New(*c.ttl),
168168
}
169169

170170
if !isHeartbeat {
171-
rsrc, err := buildResource()
171+
rsrc, err := c.getMarshaledResource()
172172
if err != nil {
173-
return nil, err
173+
return nil, fmt.Errorf("marshaling resource: %w", err)
174174
}
175175
wrappedResource.Resource = rsrc
176176
}
177177

178-
marshaled, err := marshalResource(wrappedResource)
179-
if err != nil {
180-
return nil, fmt.Errorf("marshaling discovery resource: %w", err)
181-
}
182-
183-
return &anypb.Any{
184-
TypeUrl: deltaResourceTypeURL,
185-
Value: marshaled,
186-
}, nil
178+
return marshalResource(wrappedResource)
187179
}
188180

189-
// getResourceVersion lazily hashes the resource and returns the stable hash used to track version changes.
181+
// GetDeltaResource returns the resource as is to be wrapped in a delta response (i.e. DeltaDiscoveryResponse).
190182
func (c *CachedResource) GetDeltaResource() (*discovery.Resource, error) {
191183
marshaled, err := c.getMarshaledResource()
192184
if err != nil {
@@ -197,12 +189,9 @@ func (c *CachedResource) GetDeltaResource() (*discovery.Resource, error) {
197189
return nil, fmt.Errorf("computing version: %w", err)
198190
}
199191
return &discovery.Resource{
200-
Name: c.Name,
201-
Resource: &anypb.Any{
202-
TypeUrl: c.typeURL,
203-
Value: marshaled,
204-
},
205-
Version: version,
192+
Name: c.Name,
193+
Resource: marshaled,
194+
Version: version,
206195
}, nil
207196
}
208197

@@ -213,7 +202,9 @@ func hashResource(resource []byte) string {
213202
return hex.EncodeToString(hasher.Sum(nil))
214203
}
215204

216-
// marshalResource converts the Resource to MarshaledResource.
217-
func marshalResource(resource Resource) ([]byte, error) {
218-
return proto.MarshalOptions{Deterministic: true}.Marshal(resource)
205+
// marshalResource performs the same operation as anypb.New but using deterministic marshaling.
206+
func marshalResource(resource Resource) (*anypb.Any, error) {
207+
ret := new(anypb.Any)
208+
err := anypb.MarshalFrom(ret, resource, proto.MarshalOptions{Deterministic: true})
209+
return ret, err
219210
}

0 commit comments

Comments
 (0)