Skip to content

Commit 3647766

Browse files
authored
Merge pull request kubernetes#93938 from alculquicondor/revert-node-delete
Keep track of remaining pods when a node is deleted
2 parents b497fa1 + dfe9e41 commit 3647766

File tree

12 files changed

+72
-78
lines changed

12 files changed

+72
-78
lines changed

pkg/scheduler/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ go_test(
8484
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
8585
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
8686
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
87-
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
8887
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
8988
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
9089
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",

pkg/scheduler/framework/v1alpha1/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,12 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
625625
return nil
626626
}
627627

628+
// RemoveNode removes the node object, leaving all other tracking information.
629+
func (n *NodeInfo) RemoveNode() {
630+
n.node = nil
631+
n.Generation = nextGeneration()
632+
}
633+
628634
// FilterOutPods receives a list of pods and filters out those whose node names
629635
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
630636
//

pkg/scheduler/internal/cache/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ go_library(
1616
"//pkg/scheduler/metrics:go_default_library",
1717
"//pkg/util/node:go_default_library",
1818
"//staging/src/k8s.io/api/core/v1:go_default_library",
19-
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2019
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2120
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
2221
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

pkg/scheduler/internal/cache/cache.go

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"time"
2323

2424
v1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/labels"
2625
"k8s.io/apimachinery/pkg/util/sets"
2726
"k8s.io/apimachinery/pkg/util/wait"
2827
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -315,7 +314,9 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
315314
}
316315
}
317316

318-
func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, error) {
317+
// PodCount returns the number of pods in the cache (including those from deleted nodes).
318+
// DO NOT use outside of tests.
319+
func (cache *schedulerCache) PodCount() (int, error) {
319320
cache.mu.RLock()
320321
defer cache.mu.RUnlock()
321322
// podFilter is expected to return true for most or all of the pods. We
@@ -325,15 +326,11 @@ func (cache *schedulerCache) ListPods(selector labels.Selector) ([]*v1.Pod, erro
325326
for _, n := range cache.nodes {
326327
maxSize += len(n.info.Pods)
327328
}
328-
pods := make([]*v1.Pod, 0, maxSize)
329+
count := 0
329330
for _, n := range cache.nodes {
330-
for _, p := range n.info.Pods {
331-
if selector.Matches(labels.Set(p.Pod.Labels)) {
332-
pods = append(pods, p.Pod)
333-
}
334-
}
331+
count += len(n.info.Pods)
335332
}
336-
return pods, nil
333+
return count, nil
337334
}
338335

339336
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
@@ -423,13 +420,6 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
423420

424421
// Assumes that lock is already acquired.
425422
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
426-
if _, ok := cache.nodes[newPod.Spec.NodeName]; !ok {
427-
// The node might have been deleted already.
428-
// This is not a problem in the case where a pod update arrives before the
429-
// node creation, because we will always have a create pod event before
430-
// that, which will create the placeholder node item.
431-
return nil
432-
}
433423
if err := cache.removePod(oldPod); err != nil {
434424
return err
435425
}
@@ -438,18 +428,23 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
438428
}
439429

440430
// Assumes that lock is already acquired.
441-
// Removes a pod from the cached node info. When a node is removed, some pod
442-
// deletion events might arrive later. This is not a problem, as the pods in
443-
// the node are assumed to be removed already.
431+
// Removes a pod from the cached node info. If the node information was already
432+
// removed and there are no more pods left in the node, cleans up the node from
433+
// the cache.
444434
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
445435
n, ok := cache.nodes[pod.Spec.NodeName]
446436
if !ok {
437+
klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
447438
return nil
448439
}
449440
if err := n.info.RemovePod(pod); err != nil {
450441
return err
451442
}
452-
cache.moveNodeInfoToHead(pod.Spec.NodeName)
443+
if len(n.info.Pods) == 0 && n.info.Node() == nil {
444+
cache.removeNodeInfoFromList(pod.Spec.NodeName)
445+
} else {
446+
cache.moveNodeInfoToHead(pod.Spec.NodeName)
447+
}
453448
return nil
454449
}
455450

@@ -619,21 +614,30 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
619614
return n.info.SetNode(newNode)
620615
}
621616

622-
// RemoveNode removes a node from the cache.
623-
// Some nodes might still have pods because their deletion events didn't arrive
624-
// yet. For most intents and purposes, those pods are removed from the cache,
625-
// having it's source of truth in the cached nodes.
626-
// However, some information on pods (assumedPods, podStates) persist. These
627-
// caches will be eventually consistent as pod deletion events arrive.
617+
// RemoveNode removes a node from the cache's tree.
618+
// The node might still have pods because their deletion events didn't arrive
619+
// yet. Those pods are considered removed from the cache, being the node tree
620+
// the source of truth.
621+
// However, we keep a ghost node with the list of pods until all pod deletion
622+
// events have arrived. A ghost node is skipped from snapshots.
628623
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
629624
cache.mu.Lock()
630625
defer cache.mu.Unlock()
631626

632-
_, ok := cache.nodes[node.Name]
627+
n, ok := cache.nodes[node.Name]
633628
if !ok {
634629
return fmt.Errorf("node %v is not found", node.Name)
635630
}
636-
cache.removeNodeInfoFromList(node.Name)
631+
n.info.RemoveNode()
632+
// We remove NodeInfo for this node only if there aren't any pods on this node.
633+
// We can't do it unconditionally, because notifications about pods are delivered
634+
// in a different watch, and thus can potentially be observed later, even though
635+
// they happened before node removal.
636+
if len(n.info.Pods) == 0 {
637+
cache.removeNodeInfoFromList(node.Name)
638+
} else {
639+
cache.moveNodeInfoToHead(node.Name)
640+
}
637641
if err := cache.nodeTree.removeNode(node); err != nil {
638642
return err
639643
}
@@ -736,19 +740,6 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
736740
return nil
737741
}
738742

739-
// GetNodeInfo returns cached data for the node name.
740-
func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
741-
cache.mu.RLock()
742-
defer cache.mu.RUnlock()
743-
744-
n, ok := cache.nodes[nodeName]
745-
if !ok {
746-
return nil, fmt.Errorf("node %q not found in cache", nodeName)
747-
}
748-
749-
return n.info.Node(), nil
750-
}
751-
752743
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
753744
func (cache *schedulerCache) updateMetrics() {
754745
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))

pkg/scheduler/internal/cache/cache_test.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func TestExpirePod(t *testing.T) {
268268
{pod: testPods[0], finishBind: true, assumedTime: now},
269269
},
270270
cleanupTime: now.Add(2 * ttl),
271-
wNodeInfo: framework.NewNodeInfo(),
271+
wNodeInfo: nil,
272272
}, { // first one would expire, second and third would not.
273273
pods: []*testExpirePodStruct{
274274
{pod: testPods[0], finishBind: true, assumedTime: now},
@@ -1142,10 +1142,12 @@ func TestNodeOperators(t *testing.T) {
11421142
if err := cache.RemoveNode(node); err != nil {
11431143
t.Error(err)
11441144
}
1145-
if _, err := cache.GetNodeInfo(node.Name); err == nil {
1146-
t.Errorf("The node %v should be removed.", node.Name)
1145+
if n, err := cache.getNodeInfo(node.Name); err != nil {
1146+
t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err)
1147+
} else if n != nil {
1148+
t.Errorf("The node object for %v should be nil", node.Name)
11471149
}
1148-
// Check node is removed from nodeTree as well.
1150+
// Check node is removed from nodeTree.
11491151
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
11501152
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
11511153
}
@@ -1466,7 +1468,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
14661468
var i int
14671469
// Check that cache is in the expected state.
14681470
for node := cache.headNode; node != nil; node = node.next {
1469-
if node.info.Node().Name != test.expected[i].Name {
1471+
if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name {
14701472
t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
14711473
}
14721474
i++
@@ -1798,3 +1800,16 @@ func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error {
17981800
}
17991801
return nil
18001802
}
1803+
1804+
// getNodeInfo returns cached data for the node name.
1805+
func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) {
1806+
cache.mu.RLock()
1807+
defer cache.mu.RUnlock()
1808+
1809+
n, ok := cache.nodes[nodeName]
1810+
if !ok {
1811+
return nil, fmt.Errorf("node %q not found in cache", nodeName)
1812+
}
1813+
1814+
return n.info.Node(), nil
1815+
}

pkg/scheduler/internal/cache/debugger/dumper.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ func (d *CacheDumper) DumpAll() {
4545
func (d *CacheDumper) dumpNodes() {
4646
dump := d.cache.Dump()
4747
klog.Info("Dump of cached NodeInfo")
48-
for _, nodeInfo := range dump.Nodes {
49-
klog.Info(d.printNodeInfo(nodeInfo))
48+
for name, nodeInfo := range dump.Nodes {
49+
klog.Info(d.printNodeInfo(name, nodeInfo))
5050
}
5151
}
5252

@@ -61,16 +61,16 @@ func (d *CacheDumper) dumpSchedulingQueue() {
6161
}
6262

6363
// printNodeInfo writes parts of NodeInfo to a string.
64-
func (d *CacheDumper) printNodeInfo(n *framework.NodeInfo) string {
64+
func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string {
6565
var nodeData strings.Builder
66-
nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
67-
n.Node().Name, n.Requested, n.Allocatable, len(n.Pods)))
66+
nodeData.WriteString(fmt.Sprintf("\nNode name: %s\nDeleted: %t\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
67+
name, n.Node() == nil, n.Requested, n.Allocatable, len(n.Pods)))
6868
// Dumping Pod Info
6969
for _, p := range n.Pods {
7070
nodeData.WriteString(printPod(p.Pod))
7171
}
7272
// Dumping nominated pods info on the node
73-
nominatedPods := d.podQueue.NominatedPodsForNode(n.Node().Name)
73+
nominatedPods := d.podQueue.NominatedPodsForNode(name)
7474
if len(nominatedPods) != 0 {
7575
nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPods)))
7676
for _, p := range nominatedPods {

pkg/scheduler/internal/cache/fake/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ go_library(
88
deps = [
99
"//pkg/scheduler/internal/cache:go_default_library",
1010
"//staging/src/k8s.io/api/core/v1:go_default_library",
11-
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1211
],
1312
)
1413

pkg/scheduler/internal/cache/fake/fake_cache.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package fake
1818

1919
import (
2020
v1 "k8s.io/api/core/v1"
21-
"k8s.io/apimachinery/pkg/labels"
2221
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
2322
)
2423

@@ -78,20 +77,10 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error {
7877
return nil
7978
}
8079

81-
// ListPods is a fake method for testing.
82-
func (c *Cache) ListPods(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
80+
// PodCount is a fake method for testing.
81+
func (c *Cache) PodCount() (int, error) { return 0, nil }
8382

8483
// Dump is a fake method for testing.
8584
func (c *Cache) Dump() *internalcache.Dump {
8685
return &internalcache.Dump{}
8786
}
88-
89-
// GetNodeInfo is a fake method for testing.
90-
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
91-
return nil, nil
92-
}
93-
94-
// ListNodes is a fake method for testing.
95-
func (c *Cache) ListNodes() []*v1.Node {
96-
return nil
97-
}

pkg/scheduler/internal/cache/interface.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package cache
1818

1919
import (
2020
"k8s.io/api/core/v1"
21-
"k8s.io/apimachinery/pkg/labels"
2221
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
2322
)
2423

@@ -57,8 +56,8 @@ import (
5756
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
5857
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
5958
type Cache interface {
60-
// ListPods lists all pods in the cache.
61-
ListPods(selector labels.Selector) ([]*v1.Pod, error)
59+
// PodCount returns the number of pods in the cache (including those from deleted nodes).
60+
PodCount() (int, error)
6261

6362
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
6463
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).

pkg/scheduler/scheduler_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
eventsv1 "k8s.io/api/events/v1"
3737
"k8s.io/apimachinery/pkg/api/resource"
3838
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39-
"k8s.io/apimachinery/pkg/labels"
4039
"k8s.io/apimachinery/pkg/runtime"
4140
"k8s.io/apimachinery/pkg/types"
4241
"k8s.io/apimachinery/pkg/util/sets"
@@ -570,12 +569,12 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
570569
return
571570
default:
572571
}
573-
pods, err := scache.ListPods(labels.Everything())
572+
pods, err := scache.PodCount()
574573
if err != nil {
575574
errChan <- fmt.Errorf("cache.List failed: %v", err)
576575
return
577576
}
578-
if len(pods) == 0 {
577+
if pods == 0 {
579578
close(waitPodExpireChan)
580579
return
581580
}

0 commit comments

Comments
 (0)