@@ -19,9 +19,13 @@ package controller
1919import (
2020 "fmt"
2121 "github.com/container-storage-interface/spec/lib/go/csi/v0"
22+ "github.com/golang/glog"
2223 "k8s.io/api/core/v1"
24+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2325 "k8s.io/client-go/kubernetes"
26+ csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
2427 csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
28+ "math/rand"
2529 "sort"
2630 "strings"
2731)
@@ -63,20 +67,95 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo
6367func GenerateAccessibilityRequirements (
6468 kubeClient kubernetes.Interface ,
6569 csiAPIClient csiclientset.Interface ,
66- allowedTopologies []v1.TopologySelectorTerm ) (* csi.TopologyRequirement , error ) {
70+ driverName string ,
71+ allowedTopologies []v1.TopologySelectorTerm ,
72+ selectedNode * v1.Node ) (* csi.TopologyRequirement , error ) {
73+ requirement := & csi.TopologyRequirement {}
74+
75+ var topologyTerms []topologyTerm
6776 if len (allowedTopologies ) == 0 {
68- return nil , fmt .Errorf ("topology aggregation not implemented" )
77+ // Aggregate existing topologies in nodes across the entire cluster.
78+ var err error
79+ topologyTerms , err = aggregateTopologies (kubeClient , csiAPIClient , driverName , selectedNode )
80+ if err != nil {
81+ return nil , err
82+ }
6983 } else {
70- topologyTerms := flatten (allowedTopologies )
71- topologyTerms = deduplicate (topologyTerms )
72- // TODO (verult) reduce subset duplicate terms (advanced reduction)
84+ topologyTerms = flatten (allowedTopologies )
85+ }
86+
87+ if len (topologyTerms ) == 0 {
88+ return nil , nil
89+ }
90+
91+ topologyTerms = deduplicate (topologyTerms )
92+ // TODO (verult) reduce subset duplicate terms (advanced reduction)
93+
94+ var requisite []* csi.Topology
95+ for _ , term := range topologyTerms {
96+ requisite = append (requisite , & csi.Topology {Segments : term })
97+ }
98+ requirement .Requisite = requisite
99+
100+ return requirement , nil
101+ }
102+
103+ func aggregateTopologies (
104+ kubeClient kubernetes.Interface ,
105+ csiAPIClient csiclientset.Interface ,
106+ driverName string ,
107+ selectedNode * v1.Node ) ([]topologyTerm , error ) {
73108
74- var requisite []* csi.Topology
75- for _ , term := range topologyTerms {
76- requisite = append (requisite , & csi.Topology {Segments : term })
109+ var topologyKeys []string
110+ if selectedNode == nil {
111+ // TODO (verult) retry
112+ nodeInfos , err := csiAPIClient .CsiV1alpha1 ().CSINodeInfos ().List (metav1.ListOptions {})
113+ if err != nil {
114+ // We must support provisioning if CSINodeInfo is missing, for backward compatibility.
115+ glog .Warningf ("error listing CSINodeInfos: %v; proceeding to provision without topology information" , err )
116+ return nil , nil
77117 }
78- return & csi.TopologyRequirement {Requisite : requisite }, nil
118+
119+ rand .Shuffle (len (nodeInfos .Items ), func (i , j int ) {
120+ nodeInfos .Items [i ], nodeInfos .Items [j ] = nodeInfos .Items [j ], nodeInfos .Items [i ]
121+ })
122+
123+ // Pick the first node with topology keys
124+ for _ , nodeInfo := range nodeInfos .Items {
125+ topologyKeys = getTopologyKeysFromNodeInfo (& nodeInfo , driverName )
126+ if topologyKeys != nil {
127+ break
128+ }
129+ }
130+ } else {
131+ // TODO (verult) retry
132+ selectedNodeInfo , err := csiAPIClient .CsiV1alpha1 ().CSINodeInfos ().Get (selectedNode .Name , metav1.GetOptions {})
133+ if err != nil {
134+ // We must support provisioning if CSINodeInfo is missing, for backward compatibility.
135+ glog .Warningf ("error getting CSINodeInfo for selected node %q: %v; proceeding to provision without topology information" , selectedNode .Name , err )
136+ return nil , nil
137+ }
138+ topologyKeys = getTopologyKeysFromNodeInfo (selectedNodeInfo , driverName )
139+ }
140+
141+ if len (topologyKeys ) == 0 {
142+ // Assuming the external provisioner is never running during node driver upgrades.
143+ // If selectedNode != nil, the scheduler selected a node with no topology information.
144+ // If selectedNode == nil, all nodes in the cluster are missing topology information.
145+ // In either case, provisioning needs to be allowed to proceed.
146+ return nil , nil
79147 }
148+
149+ selector , err := buildTopologyKeySelector (topologyKeys )
150+ if err != nil {
151+ return nil , err
152+ }
153+ nodes , err := kubeClient .CoreV1 ().Nodes ().List (metav1.ListOptions {LabelSelector : selector })
154+ if err != nil {
155+ return nil , fmt .Errorf ("error listing nodes: %v" , err )
156+ }
157+
158+ return extractTopologyFromNodes (nodes , topologyKeys ), nil
80159}
81160
82161// AllowedTopologies is an OR of TopologySelectorTerms.
@@ -155,6 +234,49 @@ func deduplicate(terms []topologyTerm) []topologyTerm {
155234 return dedupedTerms
156235}
157236
237+ func getTopologyKeysFromNodeInfo (nodeInfo * csiv1alpha1.CSINodeInfo , driverName string ) []string {
238+ for _ , driver := range nodeInfo .CSIDrivers {
239+ if driver .Driver == driverName {
240+ return driver .TopologyKeys
241+ }
242+ }
243+ return nil
244+ }
245+
246+ func extractTopologyFromNodes (nodes * v1.NodeList , topologyKeys []string ) []topologyTerm {
247+ var terms []topologyTerm
248+ for _ , node := range nodes .Items {
249+ segments := make (map [string ]string )
250+ for _ , key := range topologyKeys {
251+ // Key always exists because nodes were selected by these keys.
252+ segments [key ] = node .Labels [key ]
253+ }
254+ terms = append (terms , segments )
255+ }
256+ return terms
257+ }
258+
259+ func buildTopologyKeySelector (topologyKeys []string ) (string , error ) {
260+ var expr []metav1.LabelSelectorRequirement
261+ for _ , key := range topologyKeys {
262+ expr = append (expr , metav1.LabelSelectorRequirement {
263+ Key : key ,
264+ Operator : metav1 .LabelSelectorOpExists ,
265+ })
266+ }
267+
268+ labelSelector := metav1.LabelSelector {
269+ MatchExpressions : expr ,
270+ }
271+
272+ selector , err := metav1 .LabelSelectorAsSelector (& labelSelector )
273+ if err != nil {
274+ return "" , fmt .Errorf ("error parsing topology keys selector: %v" , err )
275+ }
276+
277+ return selector .String (), nil
278+ }
279+
158280func (t topologyTerm ) clone () topologyTerm {
159281 ret := make (topologyTerm )
160282 for k , v := range t {
0 commit comments