Skip to content

Commit fcc35b0

Browse files
authored
Merge pull request kubernetes#85899 from gongguan/slim_down_lister
slim down some lister expansions
2 parents 398e2bc + 7f5076d commit fcc35b0

28 files changed

+419
-351
lines changed

pkg/controller/deployment/deployment_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
231231
// getDeploymentsForReplicaSet returns a list of Deployments that potentially
232232
// match a ReplicaSet.
233233
func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment {
234-
deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
234+
deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
235235
if err != nil || len(deployments) == 0 {
236236
return nil
237237
}

pkg/controller/deployment/util/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ go_library(
1818
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
1919
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
2020
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
21+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2122
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2223
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
2324
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
2425
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
2526
"//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
27+
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
2628
"//vendor/k8s.io/klog:go_default_library",
2729
"//vendor/k8s.io/utils/integer:go_default_library",
2830
],
@@ -46,6 +48,7 @@ go_test(
4648
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
4749
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
4850
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
51+
"//staging/src/k8s.io/client-go/informers:go_default_library",
4952
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
5053
"//staging/src/k8s.io/client-go/testing:go_default_library",
5154
],

pkg/controller/deployment/util/deployment_util.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,19 @@ import (
2424
"strings"
2525
"time"
2626

27-
"k8s.io/klog"
28-
2927
apps "k8s.io/api/apps/v1"
3028
v1 "k8s.io/api/core/v1"
3129
apiequality "k8s.io/apimachinery/pkg/api/equality"
3230
"k8s.io/apimachinery/pkg/api/meta"
3331
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/labels"
3433
"k8s.io/apimachinery/pkg/runtime"
3534
"k8s.io/apimachinery/pkg/types"
3635
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
3736
"k8s.io/apimachinery/pkg/util/wait"
3837
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
38+
appslisters "k8s.io/client-go/listers/apps/v1"
39+
"k8s.io/klog"
3940
"k8s.io/kubernetes/pkg/controller"
4041
labelsutil "k8s.io/kubernetes/pkg/util/labels"
4142
"k8s.io/utils/integer"
@@ -912,3 +913,38 @@ func HasProgressDeadline(d *apps.Deployment) bool {
912913
func HasRevisionHistoryLimit(d *apps.Deployment) bool {
913914
return d.Spec.RevisionHistoryLimit != nil && *d.Spec.RevisionHistoryLimit != math.MaxInt32
914915
}
916+
917+
// GetDeploymentsForReplicaSet returns a list of Deployments that potentially
918+
// match a ReplicaSet. Only the one specified in the ReplicaSet's ControllerRef
919+
// will actually manage it.
920+
// Returns an error only if no matching Deployments are found.
921+
func GetDeploymentsForReplicaSet(deploymentLister appslisters.DeploymentLister, rs *apps.ReplicaSet) ([]*apps.Deployment, error) {
922+
if len(rs.Labels) == 0 {
923+
return nil, fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)
924+
}
925+
926+
// TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
927+
dList, err := deploymentLister.Deployments(rs.Namespace).List(labels.Everything())
928+
if err != nil {
929+
return nil, err
930+
}
931+
932+
var deployments []*apps.Deployment
933+
for _, d := range dList {
934+
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
935+
if err != nil {
936+
return nil, fmt.Errorf("invalid label selector: %v", err)
937+
}
938+
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
939+
if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) {
940+
continue
941+
}
942+
deployments = append(deployments, d)
943+
}
944+
945+
if len(deployments) == 0 {
946+
return nil, fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels)
947+
}
948+
949+
return deployments, nil
950+
}

pkg/controller/deployment/util/deployment_util_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/apimachinery/pkg/types"
3535
"k8s.io/apimachinery/pkg/util/intstr"
3636
"k8s.io/apiserver/pkg/storage/names"
37+
"k8s.io/client-go/informers"
3738
"k8s.io/client-go/kubernetes/fake"
3839
core "k8s.io/client-go/testing"
3940
"k8s.io/kubernetes/pkg/controller"
@@ -1402,3 +1403,84 @@ func TestReplicasAnnotationsNeedUpdate(t *testing.T) {
14021403
})
14031404
}
14041405
}
1406+
1407+
func TestGetDeploymentsForReplicaSet(t *testing.T) {
1408+
fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second)
1409+
var deployments []*apps.Deployment
1410+
for i := 0; i < 3; i++ {
1411+
deployment := &apps.Deployment{
1412+
ObjectMeta: metav1.ObjectMeta{
1413+
Name: fmt.Sprintf("deployment-%d", i),
1414+
Namespace: "test",
1415+
},
1416+
Spec: apps.DeploymentSpec{
1417+
Selector: &metav1.LabelSelector{
1418+
MatchLabels: map[string]string{
1419+
"app": fmt.Sprintf("test-%d", i),
1420+
},
1421+
},
1422+
},
1423+
}
1424+
deployments = append(deployments, deployment)
1425+
fakeInformerFactory.Apps().V1().Deployments().Informer().GetStore().Add(deployment)
1426+
}
1427+
var rss []*apps.ReplicaSet
1428+
for i := 0; i < 5; i++ {
1429+
rs := &apps.ReplicaSet{
1430+
ObjectMeta: metav1.ObjectMeta{
1431+
Namespace: "test",
1432+
Name: fmt.Sprintf("test-replicaSet-%d", i),
1433+
Labels: map[string]string{
1434+
"app": fmt.Sprintf("test-%d", i),
1435+
"label": fmt.Sprintf("label-%d", i),
1436+
},
1437+
},
1438+
}
1439+
rss = append(rss, rs)
1440+
}
1441+
tests := []struct {
1442+
name string
1443+
rs *apps.ReplicaSet
1444+
err error
1445+
expect []*apps.Deployment
1446+
}{
1447+
{
1448+
name: "GetDeploymentsForReplicaSet for rs-0",
1449+
rs: rss[0],
1450+
expect: []*apps.Deployment{deployments[0]},
1451+
},
1452+
{
1453+
name: "GetDeploymentsForReplicaSet for rs-1",
1454+
rs: rss[1],
1455+
expect: []*apps.Deployment{deployments[1]},
1456+
},
1457+
{
1458+
name: "GetDeploymentsForReplicaSet for rs-2",
1459+
rs: rss[2],
1460+
expect: []*apps.Deployment{deployments[2]},
1461+
},
1462+
{
1463+
name: "GetDeploymentsForReplicaSet for rs-3",
1464+
rs: rss[3],
1465+
err: fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rss[3].Name, rss[3].Namespace, rss[3].Labels),
1466+
},
1467+
{
1468+
name: "GetDeploymentsForReplicaSet for rs-4",
1469+
rs: rss[4],
1470+
err: fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rss[4].Name, rss[4].Namespace, rss[4].Labels),
1471+
},
1472+
}
1473+
for _, test := range tests {
1474+
t.Run(test.name, func(t *testing.T) {
1475+
get, err := GetDeploymentsForReplicaSet(fakeInformerFactory.Apps().V1().Deployments().Lister(), test.rs)
1476+
if err != nil {
1477+
if err.Error() != test.err.Error() {
1478+
t.Errorf("Error from GetDeploymentsForReplicaSet: %v", err)
1479+
}
1480+
} else if !reflect.DeepEqual(get, test.expect) {
1481+
t.Errorf("Expect deployments %v, but got %v", test.expect, get)
1482+
}
1483+
})
1484+
}
1485+
1486+
}

pkg/controller/service/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ go_library(
1212
deps = [
1313
"//staging/src/k8s.io/api/core/v1:go_default_library",
1414
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
1516
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
1617
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
1718
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",

pkg/controller/service/controller.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
v1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/api/errors"
29+
"k8s.io/apimachinery/pkg/labels"
2930
"k8s.io/apimachinery/pkg/util/runtime"
3031
"k8s.io/apimachinery/pkg/util/sets"
3132
"k8s.io/apimachinery/pkg/util/wait"
@@ -372,7 +373,7 @@ func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) (
372373
}
373374

374375
func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
375-
nodes, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
376+
nodes, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
376377
if err != nil {
377378
return nil, err
378379
}
@@ -601,7 +602,7 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
601602
return nodeNames(x).Equal(nodeNames(y))
602603
}
603604

604-
func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
605+
func getNodeConditionPredicate() NodeConditionPredicate {
605606
return func(node *v1.Node) bool {
606607
// We add the master to the node list, but its unschedulable. So we use this to filter
607608
// the master.
@@ -645,7 +646,7 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
645646
// nodeSyncLoop handles updating the hosts pointed to by all load
646647
// balancers whenever the set of nodes in the cluster changes.
647648
func (s *Controller) nodeSyncLoop() {
648-
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
649+
newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
649650
if err != nil {
650651
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
651652
return
@@ -847,3 +848,24 @@ func (s *Controller) patchStatus(service *v1.Service, previousStatus, newStatus
847848
_, err := patch(s.kubeClient.CoreV1(), service, updated)
848849
return err
849850
}
851+
852+
// NodeConditionPredicate is a function that indicates whether the given node's conditions meet
853+
// some set of criteria defined by the function.
854+
type NodeConditionPredicate func(node *v1.Node) bool
855+
856+
// listWithPredicate gets nodes that matches predicate function.
857+
func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) {
858+
nodes, err := nodeLister.List(labels.Everything())
859+
if err != nil {
860+
return nil, err
861+
}
862+
863+
var filtered []*v1.Node
864+
for i := range nodes {
865+
if predicate(nodes[i]) {
866+
filtered = append(filtered, nodes[i])
867+
}
868+
}
869+
870+
return filtered, nil
871+
}

pkg/controller/service/controller_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"reflect"
24+
"sort"
2425
"strings"
2526
"testing"
2627
"time"
@@ -1428,3 +1429,67 @@ func Test_getNodeConditionPredicate(t *testing.T) {
14281429
})
14291430
}
14301431
}
1432+
1433+
func TestListWithPredicate(t *testing.T) {
1434+
fakeInformerFactory := informers.NewSharedInformerFactory(&fake.Clientset{}, 0*time.Second)
1435+
var nodes []*v1.Node
1436+
for i := 0; i < 5; i++ {
1437+
var phase v1.NodePhase
1438+
if i%2 == 0 {
1439+
phase = v1.NodePending
1440+
} else {
1441+
phase = v1.NodeRunning
1442+
}
1443+
node := &v1.Node{
1444+
ObjectMeta: metav1.ObjectMeta{
1445+
Name: fmt.Sprintf("node-%d", i),
1446+
},
1447+
Status: v1.NodeStatus{
1448+
Phase: phase,
1449+
},
1450+
}
1451+
nodes = append(nodes, node)
1452+
fakeInformerFactory.Core().V1().Nodes().Informer().GetStore().Add(node)
1453+
}
1454+
1455+
tests := []struct {
1456+
name string
1457+
predicate NodeConditionPredicate
1458+
expect []*v1.Node
1459+
}{
1460+
{
1461+
name: "ListWithPredicate filter Running node",
1462+
predicate: func(node *v1.Node) bool {
1463+
return node.Status.Phase == v1.NodeRunning
1464+
},
1465+
expect: []*v1.Node{nodes[1], nodes[3]},
1466+
},
1467+
{
1468+
name: "ListWithPredicate filter Pending node",
1469+
predicate: func(node *v1.Node) bool {
1470+
return node.Status.Phase == v1.NodePending
1471+
},
1472+
expect: []*v1.Node{nodes[0], nodes[2], nodes[4]},
1473+
},
1474+
{
1475+
name: "ListWithPredicate filter Terminated node",
1476+
predicate: func(node *v1.Node) bool {
1477+
return node.Status.Phase == v1.NodeTerminated
1478+
},
1479+
expect: nil,
1480+
},
1481+
}
1482+
for _, test := range tests {
1483+
t.Run(test.name, func(t *testing.T) {
1484+
get, err := listWithPredicate(fakeInformerFactory.Core().V1().Nodes().Lister(), test.predicate)
1485+
sort.Slice(get, func(i, j int) bool {
1486+
return get[i].Name < get[j].Name
1487+
})
1488+
if err != nil {
1489+
t.Errorf("Error from ListWithPredicate: %v", err)
1490+
} else if !reflect.DeepEqual(get, test.expect) {
1491+
t.Errorf("Expect nodes %v, but got %v", test.expect, get)
1492+
}
1493+
})
1494+
}
1495+
}

pkg/kubelet/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ go_test(
243243
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
244244
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
245245
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
246-
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
247246
"//staging/src/k8s.io/client-go/rest:go_default_library",
248247
"//staging/src/k8s.io/client-go/testing:go_default_library",
249248
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

pkg/kubelet/kubelet_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"k8s.io/apimachinery/pkg/util/sets"
4040
"k8s.io/apimachinery/pkg/util/wait"
4141
"k8s.io/client-go/kubernetes/fake"
42-
corelisters "k8s.io/client-go/listers/core/v1"
4342
"k8s.io/client-go/tools/record"
4443
"k8s.io/client-go/util/flowcontrol"
4544
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
@@ -446,10 +445,6 @@ func (nl testNodeLister) List(_ labels.Selector) (ret []*v1.Node, err error) {
446445
return nl.nodes, nil
447446
}
448447

449-
func (nl testNodeLister) ListWithPredicate(_ corelisters.NodeConditionPredicate) ([]*v1.Node, error) {
450-
return nl.nodes, nil
451-
}
452-
453448
func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) {
454449
status, found := kl.statusManager.GetPodStatus(pod.UID)
455450
require.True(t, found, "Status of pod %q is not found in the status map", pod.UID)

pkg/scheduler/algorithm/predicates/predicates.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata)
10031003
return
10041004
}
10051005
// Store services which match the pod.
1006-
matchingPodServices, err := s.serviceLister.GetPodServices(pm.pod)
1006+
matchingPodServices, err := schedulerlisters.GetPodServices(s.serviceLister, pm.pod)
10071007
if err != nil {
10081008
klog.Errorf("Error precomputing service affinity: could not list services: %v", err)
10091009
}

0 commit comments

Comments
 (0)