Skip to content

Commit 7a836bc

Browse files
authored
Merge pull request #798 from ffromani/nrt-log-generation
nrt: log: introduce and use "generation" for cache
2 parents 0744b26 + 0dae3ec commit 7a836bc

File tree

10 files changed

+68
-36
lines changed

10 files changed

+68
-36
lines changed

pkg/noderesourcetopology/cache/cache.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,29 @@ import (
2424
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
2525
)
2626

27+
type CachedNRTInfo struct {
28+
// Generation is akin to the object resourceVersion and represents
29+
// the observed state in the cache. It's an opaque monotonically increasing number which can only compared for equality
30+
// and which is only increased in the resync loop. It is used to cross correlate resync attempts with observed state
31+
// with cache content. Used only in logging. If the cache implementation has no concept of caching nor generation,
32+
// it should always return 0 (zero).
33+
Generation uint64
34+
35+
// Fresh signals the caller if the NRT data is fresh.
36+
// If true, the data is fresh and ready to be consumed.
37+
// If false, the data is stale and the caller need to wait for a future refresh.
38+
Fresh bool
39+
}
40+
2741
type Interface interface {
2842
// GetCachedNRTCopy retrieves a NRT copy from cache, and then deducts over-reserved resources if necessary.
2943
// It will be used as the source of truth across the Pod's scheduling cycle.
3044
// Over-reserved resources are the resources consumed by pods scheduled to that node after the last update
3145
// of NRT pertaining to the same node, pessimistically overallocated on ALL the NUMA zones of the node.
3246
// The pod argument is used only for logging purposes.
3347
// Returns nil if there is no NRT data available for the node named `nodeName`.
34-
// Returns a boolean to signal the caller if the NRT data is fresh.
35-
// If true, the data is fresh and ready to be consumed.
36-
// If false, the data is stale and the caller need to wait for a future refresh.
37-
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)
48+
// Returns a CachedNRTInfo describing the NRT data returned. Meaningful only if `nrt` != nil.
49+
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo)
3850

3951
// NodeMaybeOverReserved declares a node was filtered out for not enough resources available.
4052
// This means this node is eligible for a resync. When a node is marked discarded (dirty), it matters not

pkg/noderesourcetopology/cache/cache_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ func checkGetCachedNRTCopy(t *testing.T, makeCache func(client ctrlclient.WithWa
100100
nrtCache.NodeHasForeignPods(tc.nodeName, pod)
101101
}
102102

103-
gotNRT, gotOK := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod)
103+
gotNRT, gotInfo := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod)
104104

105-
if gotOK != tc.expectedOK {
106-
t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotOK, tc.expectedOK)
105+
if gotInfo.Fresh != tc.expectedOK {
106+
t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotInfo.Fresh, tc.expectedOK)
107107
}
108108
if gotNRT != nil && tc.expectedNRT == nil {
109109
t.Fatalf("object from cache not nil but expected nil")

pkg/noderesourcetopology/cache/discardreserved.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,21 @@ func NewDiscardReserved(lh logr.Logger, client ctrlclient.Client) Interface {
5858
}
5959
}
6060

61-
func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
61+
func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
6262
pt.rMutex.RLock()
6363
defer pt.rMutex.RUnlock()
6464
if t, ok := pt.reservationMap[nodeName]; ok {
6565
if len(t) > 0 {
66-
return nil, false
66+
return nil, CachedNRTInfo{}
6767
}
6868
}
6969

70+
info := CachedNRTInfo{Fresh: true}
7071
nrt := &topologyv1alpha2.NodeResourceTopology{}
7172
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
72-
return nil, true
73+
return nil, info
7374
}
74-
return nrt, true
75+
return nrt, info
7576
}
7677

7778
func (pt *DiscardReserved) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {}

pkg/noderesourcetopology/cache/discardreserved_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ func TestDiscardReservedNodesGetNRTCopyFails(t *testing.T) {
6666
},
6767
}
6868

69-
nrtObj, ok := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{})
70-
if ok {
69+
nrtObj, nrtInfo := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{})
70+
if nrtInfo.Fresh {
7171
t.Fatal("expected false\ngot true\n")
7272
}
7373
if nrtObj != nil {

pkg/noderesourcetopology/cache/overreserve.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type OverReserve struct {
4545
lh logr.Logger
4646
client ctrlclient.Reader
4747
lock sync.Mutex
48+
generation uint64
4849
nrts *nrtStore
4950
assumedResources map[string]*resourceStore // nodeName -> resourceStore
5051
// nodesMaybeOverreserved counts how many times a node is filtered out. This is used as trigger condition to try
@@ -97,30 +98,33 @@ func NewOverReserve(ctx context.Context, lh logr.Logger, cfg *apiconfig.NodeReso
9798
return obj, nil
9899
}
99100

100-
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
101+
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
101102
ov.lock.Lock()
102103
defer ov.lock.Unlock()
103104
if ov.nodesWithForeignPods.IsSet(nodeName) {
104-
return nil, false
105+
return nil, CachedNRTInfo{}
105106
}
106107

108+
info := CachedNRTInfo{Fresh: true}
107109
nrt := ov.nrts.GetNRTCopyByNodeName(nodeName)
108110
if nrt == nil {
109-
return nil, true
111+
return nil, info
110112
}
113+
114+
info.Generation = ov.generation
111115
nodeAssumedResources, ok := ov.assumedResources[nodeName]
112116
if !ok {
113-
return nrt, true
117+
return nrt, info
114118
}
115119

116120
logID := klog.KObj(pod)
117-
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
121+
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName, logging.KeyGeneration, ov.generation)
118122

119123
lh.V(6).Info("NRT", "fromcache", stringify.NodeResourceTopologyResources(nrt))
120124
nodeAssumedResources.UpdateNRT(nrt, logging.KeyPod, logID)
121125

122126
lh.V(5).Info("NRT", "withassumed", stringify.NodeResourceTopologyResources(nrt))
123-
return nrt, true
127+
return nrt, info
124128
}
125129

126130
func (ov *OverReserve) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {
@@ -176,6 +180,7 @@ func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod)
176180
}
177181

178182
type DesyncedNodes struct {
183+
Generation uint64
179184
MaybeOverReserved []string
180185
ConfigChanged []string
181186
}
@@ -207,6 +212,10 @@ func (rn DesyncedNodes) DirtyCount() int {
207212
func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
208213
ov.lock.Lock()
209214
defer ov.lock.Unlock()
215+
216+
// make sure to log the generation to be able to crosscorrelate with later logs
217+
lh = lh.WithValues(logging.KeyGeneration, ov.generation)
218+
210219
// this is intentionally aggressive. We don't yet make any attempt to find out if the
211220
// node was discarded because pessimistically overrserved (which should indeed trigger
212221
// a resync) or if it was discarded because the actual resources on the node really were
@@ -229,6 +238,7 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
229238
lh.V(4).Info("found dirty nodes", "foreign", foreignCount, "discarded", overreservedCount, "configChange", configChangeCount, "total", nodes.Len())
230239
}
231240
return DesyncedNodes{
241+
Generation: ov.generation,
232242
MaybeOverReserved: nodes.Keys(),
233243
ConfigChanged: configChangeNodes.Keys(),
234244
}
@@ -244,11 +254,14 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
244254
// too aggressive resync attempts, so to more, likely unnecessary, computation work on the scheduler side.
245255
func (ov *OverReserve) Resync() {
246256
// we are not working with a specific pod, so we need a unique key to track this flow
247-
lh_ := ov.lh.WithName(logging.FlowCacheSync).WithValues(logging.KeyLogID, logging.TimeLogID())
257+
lh_ := ov.lh.WithName(logging.FlowCacheSync)
248258
lh_.V(4).Info(logging.FlowBegin)
249259
defer lh_.V(4).Info(logging.FlowEnd)
250260

251261
nodes := ov.GetDesyncedNodes(lh_)
262+
// we start without because chicken/egg problem. This is the earliest we can use the generation value.
263+
lh_ = lh_.WithValues(logging.KeyGeneration, nodes.Generation)
264+
252265
// avoid as much as we can unnecessary work and logs.
253266
if nodes.Len() == 0 {
254267
lh_.V(5).Info("no dirty nodes detected")
@@ -331,6 +344,7 @@ func (ov *OverReserve) Resync() {
331344
func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.NodeResourceTopology) {
332345
ov.lock.Lock()
333346
defer ov.lock.Unlock()
347+
334348
for _, nrt := range nrts {
335349
lh.V(2).Info("flushing", logging.KeyNode, nrt.Name)
336350
ov.nrts.Update(nrt)
@@ -339,6 +353,14 @@ func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.Node
339353
ov.nodesWithForeignPods.Delete(nrt.Name)
340354
ov.nodesWithAttrUpdate.Delete(nrt.Name)
341355
}
356+
357+
if len(nrts) == 0 {
358+
return
359+
}
360+
361+
// increase only if we mutated the internal state
362+
ov.generation += 1
363+
lh.V(2).Info("generation", "new", ov.generation)
342364
}
343365

344366
// to be used only in tests

pkg/noderesourcetopology/cache/overreserve_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,8 +720,8 @@ func TestNodeWithForeignPods(t *testing.T) {
720720
t.Errorf("unexpected dirty nodes: %v", nodes.MaybeOverReserved)
721721
}
722722

723-
_, ok := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{})
724-
if ok {
723+
_, info := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{})
724+
if info.Fresh {
725725
t.Errorf("succesfully got node with foreign pods!")
726726
}
727727
}

pkg/noderesourcetopology/cache/passthrough.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,15 @@ func NewPassthrough(lh logr.Logger, client ctrlclient.Client) Interface {
4040
}
4141
}
4242

43-
func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
43+
func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
4444
pt.lh.V(5).Info("lister for NRT plugin")
45+
info := CachedNRTInfo{Fresh: true}
4546
nrt := &topologyv1alpha2.NodeResourceTopology{}
4647
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
4748
pt.lh.V(5).Error(err, "cannot get nrts from lister")
48-
return nil, true
49+
return nil, info
4950
}
50-
return nrt, true
51+
return nrt, info
5152
}
5253

5354
func (pt Passthrough) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {}

pkg/noderesourcetopology/filter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,9 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
196196
lh.V(4).Info(logging.FlowBegin)
197197
defer lh.V(4).Info(logging.FlowEnd)
198198

199-
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
200-
if !ok {
199+
nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
200+
lh = lh.WithValues(logging.KeyGeneration, info.Generation)
201+
if !info.Fresh {
201202
lh.V(2).Info("invalid topology data")
202203
return framework.NewStatus(framework.Unschedulable, "invalid node topology data")
203204
}

pkg/noderesourcetopology/logging/logging.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ limitations under the License.
1717
package logging
1818

1919
import (
20-
"fmt"
2120
"reflect"
22-
"time"
2321

2422
corev1 "k8s.io/api/core/v1"
2523
)
@@ -33,6 +31,7 @@ const (
3331
KeyFlow string = "flow"
3432
KeyContainer string = "container"
3533
KeyContainerKind string = "kind"
34+
KeyGeneration string = "generation"
3635
)
3736

3837
const (
@@ -63,7 +62,3 @@ func PodUID(pod *corev1.Pod) string {
6362
}
6463
return string(pod.GetUID())
6564
}
66-
67-
func TimeLogID() string {
68-
return fmt.Sprintf("uts/%v", time.Now().UnixMilli())
69-
}

pkg/noderesourcetopology/score.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState,
7373
return framework.MaxNodeScore, nil
7474
}
7575

76-
nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
77-
78-
if !ok {
76+
nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
77+
lh = lh.WithValues(logging.KeyGeneration, info.Generation)
78+
if !info.Fresh {
7979
lh.V(4).Info("noderesourcetopology is not valid for node")
8080
return 0, nil
8181
}

0 commit comments

Comments
 (0)