Skip to content

Commit a28876f

Browse files
committed
linear: keep streamState to respond resources
Signed-off-by: Fu-Sheng <[email protected]>
1 parent 5270cdc commit a28876f

File tree

2 files changed

+42
-11
lines changed

2 files changed

+42
-11
lines changed

pkg/cache/v3/linear.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2828
)
2929

30-
type watches = map[chan Response]struct{}
30+
type watches = map[chan Response]stream.StreamState
3131

3232
// LinearCache supports collections of opaque resources. This cache has a
3333
// single collection indexed by resource names and manages resource versions
@@ -114,24 +114,30 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
114114
}
115115

116116
func (cache *LinearCache) respond(value chan Response, staleResources []string) {
117-
var resources []types.ResourceWithTTL
117+
var (
118+
resources []types.ResourceWithTTL
119+
respondResourceNames []string
120+
)
121+
118122
// TODO: optimize the resources slice creations across different clients
119123
if len(staleResources) == 0 {
120124
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
121-
for _, resource := range cache.resources {
125+
for name, resource := range cache.resources {
122126
resources = append(resources, types.ResourceWithTTL{Resource: resource})
127+
respondResourceNames = append(respondResourceNames, name)
123128
}
124129
} else {
125130
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
126131
for _, name := range staleResources {
127132
resource := cache.resources[name]
128133
if resource != nil {
129134
resources = append(resources, types.ResourceWithTTL{Resource: resource})
135+
respondResourceNames = append(respondResourceNames, name)
130136
}
131137
}
132138
}
133139
value <- &RawResponse{
134-
Request: &Request{TypeUrl: cache.typeURL},
140+
Request: &Request{TypeUrl: cache.typeURL, ResourceNames: respondResourceNames},
135141
Resources: resources,
136142
Version: cache.getVersion(),
137143
}
@@ -141,11 +147,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
141147
// de-duplicate watches that need to be responded
142148
notifyList := make(map[chan Response][]string)
143149
for name := range modified {
144-
for watch := range cache.watches[name] {
145-
notifyList[watch] = append(notifyList[watch], name)
150+
for watch, streamState := range cache.watches[name] {
151+
resourceNames := streamState.GetKnownResourceNames(cache.typeURL)
152+
modifiedNameInResourceName := false
153+
for resourceName := range resourceNames {
154+
if !modifiedNameInResourceName && resourceName == name {
155+
modifiedNameInResourceName = true
156+
}
157+
// To avoid the stale in notifyList becomes empty slice.
158+
// Don't skip resource name that has been deleted here.
159+
// It would be filtered out in respond because the corresponding resource has been deleted.
160+
notifyList[watch] = append(notifyList[watch], resourceName)
161+
}
162+
if !modifiedNameInResourceName {
163+
notifyList[watch] = append(notifyList[watch], name)
164+
}
146165
}
147166
delete(cache.watches, name)
148167
}
168+
149169
for value, stale := range notifyList {
150170
cache.respond(value, stale)
151171
}
@@ -293,10 +313,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
293313
stale = lastVersion != cache.version
294314
} else {
295315
for _, name := range request.ResourceNames {
316+
_, has := streamState.GetKnownResourceNames(request.TypeUrl)[name]
317+
version, exists := cache.versionVector[name]
318+
296319
// When a resource is removed, its version defaults 0 and it is not considered stale.
297-
if lastVersion < cache.versionVector[name] {
320+
if lastVersion < version || (!has && exists) {
298321
stale = true
299-
staleResources = append(staleResources, name)
322+
323+
// Here we collect all requested names.
324+
// It would be filtered out in respond if the resource name doesn't appear in cache.
325+
staleResources = request.ResourceNames
300326
}
301327
}
302328
}
@@ -306,7 +332,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
306332
}
307333
// Create open watches since versions are up to date.
308334
if len(request.ResourceNames) == 0 {
309-
cache.watchAll[value] = struct{}{}
335+
cache.watchAll[value] = streamState
310336
return func() {
311337
cache.mu.Lock()
312338
defer cache.mu.Unlock()
@@ -319,7 +345,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea
319345
set = make(watches)
320346
cache.watches[name] = set
321347
}
322-
set[value] = struct{}{}
348+
set[value] = streamState
323349
}
324350
return func() {
325351
cache.mu.Lock()

pkg/cache/v3/linear_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ func TestLinearSetResources(t *testing.T) {
230230

231231
// Create new resources
232232
w1 := make(chan Response, 1)
233+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
233234
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
234235
mustBlock(t, w1)
235236
w2 := make(chan Response, 1)
@@ -297,6 +298,7 @@ func TestLinearVersionPrefix(t *testing.T) {
297298
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
298299
verifyResponse(t, w, "instance1-1", 1)
299300

301+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
300302
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w)
301303
mustBlock(t, w)
302304
checkWatchCount(t, c, "a", 1)
@@ -306,6 +308,7 @@ func TestLinearDeletion(t *testing.T) {
306308
streamState := stream.NewStreamState(false, map[string]string{})
307309
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
308310
w := make(chan Response, 1)
311+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
309312
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
310313
mustBlock(t, w)
311314
checkWatchCount(t, c, "a", 1)
@@ -325,14 +328,15 @@ func TestLinearWatchTwo(t *testing.T) {
325328
streamState := stream.NewStreamState(false, map[string]string{})
326329
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
327330
w := make(chan Response, 1)
331+
streamState.SetKnownResourceNamesAsList(testType, []string{"a", "b"})
328332
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
329333
mustBlock(t, w)
330334
w1 := make(chan Response, 1)
331335
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
332336
mustBlock(t, w1)
333337
require.NoError(t, c.UpdateResource("a", testResource("aa")))
334338
// should only get the modified resource
335-
verifyResponse(t, w, "1", 1)
339+
verifyResponse(t, w, "1", 2)
336340
verifyResponse(t, w1, "1", 2)
337341
}
338342

@@ -350,6 +354,7 @@ func TestLinearCancel(t *testing.T) {
350354
checkWatchCount(t, c, "a", 0)
351355

352356
// cancel watch for "a"
357+
streamState.SetKnownResourceNamesAsList(testType, []string{"a"})
353358
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
354359
mustBlock(t, w)
355360
checkWatchCount(t, c, "a", 1)

0 commit comments

Comments
 (0)