Skip to content

Commit 65e4940

Browse files
committed
linear: keep streamState to respond resources
Signed-off-by: Fu-Sheng <[email protected]>
1 parent ff00532 commit 65e4940

File tree

2 files changed

+42
-11
lines changed

2 files changed

+42
-11
lines changed

pkg/cache/v3/linear.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2828
)
2929

30-
type watches = map[chan Response]struct{}
30+
type watches = map[chan Response]stream.StreamState
3131

3232
// LinearCache supports collections of opaque resources. This cache has a
3333
// single collection indexed by resource names and manages resource versions
@@ -114,24 +114,30 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
114114
}
115115

116116
func (cache *LinearCache) respond(value chan Response, staleResources []string) {
117-
var resources []types.ResourceWithTTL
117+
var (
118+
resources []types.ResourceWithTTL
119+
respondResourceNames []string
120+
)
121+
118122
// TODO: optimize the resources slice creations across different clients
119123
if len(staleResources) == 0 {
120124
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
121-
for _, resource := range cache.resources {
125+
for name, resource := range cache.resources {
122126
resources = append(resources, types.ResourceWithTTL{Resource: resource})
127+
respondResourceNames = append(respondResourceNames, name)
123128
}
124129
} else {
125130
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
126131
for _, name := range staleResources {
127132
resource := cache.resources[name]
128133
if resource != nil {
129134
resources = append(resources, types.ResourceWithTTL{Resource: resource})
135+
respondResourceNames = append(respondResourceNames, name)
130136
}
131137
}
132138
}
133139
value <- &RawResponse{
134-
Request: &Request{TypeUrl: cache.typeURL},
140+
Request: &Request{TypeUrl: cache.typeURL, ResourceNames: respondResourceNames},
135141
Resources: resources,
136142
Version: cache.getVersion(),
137143
Ctx: context.Background(),
@@ -142,11 +148,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
142148
// de-duplicate watches that need to be responded
143149
notifyList := make(map[chan Response][]string)
144150
for name := range modified {
145-
for watch := range cache.watches[name] {
146-
notifyList[watch] = append(notifyList[watch], name)
151+
for watch, streamState := range cache.watches[name] {
152+
resourceNames := streamState.GetKnownResourceNames(cache.typeURL)
153+
modifiedNameInResourceName := false
154+
for resourceName := range resourceNames {
155+
if !modifiedNameInResourceName && resourceName == name {
156+
modifiedNameInResourceName = true
157+
}
158+
// To avoid the stale in notifyList becomes empty slice.
159+
// Don't skip resource name that has been deleted here.
160+
// It would be filtered out in respond because the corresponding resource has been deleted.
161+
notifyList[watch] = append(notifyList[watch], resourceName)
162+
}
163+
if !modifiedNameInResourceName {
164+
notifyList[watch] = append(notifyList[watch], name)
165+
}
147166
}
148167
delete(cache.watches, name)
149168
}
169+
150170
for value, stale := range notifyList {
151171
cache.respond(value, stale)
152172
}
@@ -328,10 +348,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
328348
stale = lastVersion != cache.version
329349
} else {
330350
for _, name := range request.ResourceNames {
351+
_, has := streamState.GetKnownResourceNames(request.TypeUrl)[name]
352+
version, exists := cache.versionVector[name]
353+
331354
// When a resource is removed, its version defaults 0 and it is not considered stale.
332-
if lastVersion < cache.versionVector[name] {
355+
if lastVersion < version || (!has && exists) {
333356
stale = true
334-
staleResources = append(staleResources, name)
357+
358+
// Here we collect all requested names.
359+
// It would be filtered out in respond if the resource name doesn't appear in cache.
360+
staleResources = request.ResourceNames
335361
}
336362
}
337363
}
@@ -341,7 +367,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
341367
}
342368
// Create open watches since versions are up to date.
343369
if len(request.ResourceNames) == 0 {
344-
cache.watchAll[value] = struct{}{}
370+
cache.watchAll[value] = streamState
345371
return func() {
346372
cache.mu.Lock()
347373
defer cache.mu.Unlock()
@@ -354,7 +380,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
354380
set = make(watches)
355381
cache.watches[name] = set
356382
}
357-
set[value] = struct{}{}
383+
set[value] = streamState
358384
}
359385
return func() {
360386
cache.mu.Lock()

pkg/cache/v3/linear_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ func TestLinearSetResources(t *testing.T) {
274274

275275
// Create new resources
276276
w1 := make(chan Response, 1)
277+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
277278
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
278279
mustBlock(t, w1)
279280
w2 := make(chan Response, 1)
@@ -341,6 +342,7 @@ func TestLinearVersionPrefix(t *testing.T) {
341342
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
342343
verifyResponse(t, w, "instance1-1", 1)
343344

345+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
344346
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w)
345347
mustBlock(t, w)
346348
checkWatchCount(t, c, "a", 1)
@@ -350,6 +352,7 @@ func TestLinearDeletion(t *testing.T) {
350352
streamState := stream.NewStreamState(false, map[string]string{})
351353
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
352354
w := make(chan Response, 1)
355+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
353356
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
354357
mustBlock(t, w)
355358
checkWatchCount(t, c, "a", 1)
@@ -369,14 +372,15 @@ func TestLinearWatchTwo(t *testing.T) {
369372
streamState := stream.NewStreamState(false, map[string]string{})
370373
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
371374
w := make(chan Response, 1)
375+
streamState.SetKnownResourceNamesAsList(testType, []string{"a", "b"})
372376
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
373377
mustBlock(t, w)
374378
w1 := make(chan Response, 1)
375379
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
376380
mustBlock(t, w1)
377381
require.NoError(t, c.UpdateResource("a", testResource("aa")))
378382
// should only get the modified resource
379-
verifyResponse(t, w, "1", 1)
383+
verifyResponse(t, w, "1", 2)
380384
verifyResponse(t, w1, "1", 2)
381385
}
382386

@@ -394,6 +398,7 @@ func TestLinearCancel(t *testing.T) {
394398
checkWatchCount(t, c, "a", 0)
395399

396400
// cancel watch for "a"
401+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
397402
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
398403
mustBlock(t, w)
399404
checkWatchCount(t, c, "a", 1)

0 commit comments

Comments
 (0)