Skip to content

Commit 23b4690

Browse files
committed
fix: correct deduplication of watch events
This is extracted from #633 as a separate fix. The idea is that if we merge/deduplicate watch events, we should do this by immutable attributes of the resource: namespace, type, id. The previous implementation might split two events on e.g. finalizers being empty, so two events which report finalizers empty/finalizers non-empty might be delivered in the wrong order: first empty, next non-empty. It might not trigger a real bug due to the way controllers work, but still is not consistent and correct. Signed-off-by: Andrey Smirnov <[email protected]>
1 parent 40d59e1 commit 23b4690

File tree

2 files changed

+38
-12
lines changed

2 files changed

+38
-12
lines changed

pkg/controller/runtime/internal/reduced/reduced.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,42 @@ package reduced
77

88
import "github.com/cosi-project/runtime/pkg/resource"
99

10-
// Metadata is _comparable_, so that it can be a map key.
10+
// Metadata reduces resource metadata for deduplication.
11+
//
12+
// It consists of two parts:
13+
// - a comparable Key which is used for deduplication.
14+
// - a Value which is reduced for duplicate keys to the last observed value.
1115
type Metadata struct {
12-
Namespace resource.Namespace
13-
Typ resource.Type
14-
ID resource.ID
16+
Key
17+
Value
18+
}
19+
20+
// Key is a comparable representation of deduplication entry.
21+
type Key struct {
22+
Namespace resource.Namespace
23+
Typ resource.Type
24+
ID resource.ID
25+
}
26+
27+
// Value is a reduced representation of resource metadata.
28+
type Value struct {
29+
Labels *resource.Labels
1530
Phase resource.Phase
1631
FinalizersEmpty bool
1732
}
1833

1934
// NewMetadata creates a new reduced Metadata from a resource.Metadata.
2035
func NewMetadata(md *resource.Metadata) Metadata {
2136
return Metadata{
22-
Namespace: md.Namespace(),
23-
Typ: md.Type(),
24-
ID: md.ID(),
25-
Phase: md.Phase(),
26-
FinalizersEmpty: md.Finalizers().Empty(),
37+
Key: Key{
38+
Namespace: md.Namespace(),
39+
Typ: md.Type(),
40+
ID: md.ID(),
41+
},
42+
Value: Value{
43+
Phase: md.Phase(),
44+
FinalizersEmpty: md.Finalizers().Empty(),
45+
Labels: md.Labels(),
46+
},
2747
}
2848
}

pkg/controller/runtime/runtime.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,18 @@ func (runtime *Runtime) watch(resourceNamespace resource.Namespace, resourceType
266266
return runtime.state.WatchKindAggregated(runtime.runCtx, kind, runtime.watchCh, state.WithBootstrapBookmark(true))
267267
}
268268

269-
type dedup map[reduced.Metadata]struct{}
269+
type dedup map[reduced.Key]reduced.Value
270270

271271
func (d dedup) takeOne() reduced.Metadata {
272272
for k := range d {
273+
md := reduced.Metadata{
274+
Key: k,
275+
Value: d[k],
276+
}
277+
273278
delete(d, k)
274279

275-
return k
280+
return md
276281
}
277282

278283
panic("dedup is empty")
@@ -353,7 +358,8 @@ eventLoop:
353358
}
354359
}
355360

356-
m[reduced.NewMetadata(e.Resource.Metadata())] = struct{}{}
361+
reducedMD := reduced.NewMetadata(e.Resource.Metadata())
362+
m[reducedMD.Key] = reducedMD.Value
357363
}
358364

359365
return true

0 commit comments

Comments
 (0)