Skip to content

Commit 3ba9be3

Browse files
committed
Preferred topology
1 parent 374f23c commit 3ba9be3

File tree

2 files changed

+281
-48
lines changed

2 files changed

+281
-48
lines changed

pkg/controller/topology.go

Lines changed: 86 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,30 +72,56 @@ func GenerateAccessibilityRequirements(
7272
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
7373
requirement := &csi.TopologyRequirement{}
7474

75-
var topologyTerms []topologyTerm
75+
/* Requisite */
76+
var requisiteTerms []topologyTerm
7677
if len(allowedTopologies) == 0 {
7778
// Aggregate existing topologies in nodes across the entire cluster.
7879
var err error
79-
topologyTerms, err = aggregateTopologies(kubeClient, csiAPIClient, driverName, selectedNode)
80+
requisiteTerms, err = aggregateTopologies(kubeClient, csiAPIClient, driverName, selectedNode)
8081
if err != nil {
8182
return nil, err
8283
}
8384
} else {
84-
topologyTerms = flatten(allowedTopologies)
85+
// Distribute out one of the OR layers in allowedTopologies
86+
requisiteTerms = flatten(allowedTopologies)
8587
}
8688

87-
if len(topologyTerms) == 0 {
89+
if len(requisiteTerms) == 0 {
8890
return nil, nil
8991
}
9092

91-
topologyTerms = deduplicate(topologyTerms)
93+
requisiteTerms = deduplicate(requisiteTerms)
9294
// TODO (verult) reduce subset duplicate terms (advanced reduction)
9395

94-
var requisite []*csi.Topology
95-
for _, term := range topologyTerms {
96-
requisite = append(requisite, &csi.Topology{Segments: term})
96+
requirement.Requisite = toCSITopology(requisiteTerms)
97+
98+
/* Preferred */
99+
if selectedNode != nil {
100+
// TODO (verult) reuse selected node info from aggregateTopologies
101+
// TODO (verult) retry
102+
nodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
103+
if err != nil {
104+
return nil, fmt.Errorf("error getting node info for selected node: %v", err)
105+
}
106+
107+
topologyKeys := getTopologyKeys(nodeInfo, driverName)
108+
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
109+
if isMissingKey {
110+
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINodeInfo %v", selectedNode.Labels, topologyKeys)
111+
}
112+
113+
preferredTerms := sortAndShift(requisiteTerms, selectedTopology)
114+
if preferredTerms == nil {
115+
// Topology from selected node is not in requisite. This case should never be hit:
116+
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
117+
// constraint.
118+
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
119+
// selected node.
120+
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite", selectedTopology, selectedNode.Name)
121+
}
122+
123+
requirement.Preferred = toCSITopology(preferredTerms)
97124
}
98-
requirement.Requisite = requisite
99125

100126
return requirement, nil
101127
}
@@ -122,7 +148,7 @@ func aggregateTopologies(
122148

123149
// Pick the first node with topology keys
124150
for _, nodeInfo := range nodeInfos.Items {
125-
topologyKeys = getTopologyKeysFromNodeInfo(&nodeInfo, driverName)
151+
topologyKeys = getTopologyKeys(&nodeInfo, driverName)
126152
if topologyKeys != nil {
127153
break
128154
}
@@ -135,7 +161,7 @@ func aggregateTopologies(
135161
glog.Warningf("error getting CSINodeInfo for selected node %q: %v; proceeding to provision without topology information", selectedNode.Name, err)
136162
return nil, nil
137163
}
138-
topologyKeys = getTopologyKeysFromNodeInfo(selectedNodeInfo, driverName)
164+
topologyKeys = getTopologyKeys(selectedNodeInfo, driverName)
139165
}
140166

141167
if len(topologyKeys) == 0 {
@@ -155,7 +181,13 @@ func aggregateTopologies(
155181
return nil, fmt.Errorf("error listing nodes: %v", err)
156182
}
157183

158-
return extractTopologyFromNodes(nodes, topologyKeys), nil
184+
var terms []topologyTerm
185+
for _, node := range nodes.Items {
186+
// missingKey bool can be ignored because nodes were selected by these keys.
187+
term, _ := getTopologyFromNode(&node, topologyKeys)
188+
terms = append(terms, term)
189+
}
190+
return terms, nil
159191
}
160192

161193
// AllowedTopologies is an OR of TopologySelectorTerms.
@@ -234,7 +266,24 @@ func deduplicate(terms []topologyTerm) []topologyTerm {
234266
return dedupedTerms
235267
}
236268

237-
func getTopologyKeysFromNodeInfo(nodeInfo *csiv1alpha1.CSINodeInfo, driverName string) []string {
269+
// Sort the given terms in place,
270+
// then return a new list of terms equivalent to the sorted terms, but shifted so that the primary
271+
// term is the first in the list.
272+
func sortAndShift(terms []topologyTerm, primary topologyTerm) []topologyTerm {
273+
var preferredTerms []topologyTerm
274+
sort.Slice(terms, func(i, j int) bool {
275+
return terms[i].less(terms[j])
276+
})
277+
for i, t := range terms {
278+
if t.equal(primary) {
279+
preferredTerms = append(terms[i:], terms[:i]...)
280+
break
281+
}
282+
}
283+
return preferredTerms
284+
}
285+
286+
func getTopologyKeys(nodeInfo *csiv1alpha1.CSINodeInfo, driverName string) []string {
238287
for _, driver := range nodeInfo.CSIDrivers {
239288
if driver.Driver == driverName {
240289
return driver.TopologyKeys
@@ -243,17 +292,16 @@ func getTopologyKeysFromNodeInfo(nodeInfo *csiv1alpha1.CSINodeInfo, driverName s
243292
return nil
244293
}
245294

246-
func extractTopologyFromNodes(nodes *v1.NodeList, topologyKeys []string) []topologyTerm {
247-
var terms []topologyTerm
248-
for _, node := range nodes.Items {
249-
segments := make(map[string]string)
250-
for _, key := range topologyKeys {
251-
// Key always exists because nodes were selected by these keys.
252-
segments[key] = node.Labels[key]
295+
func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTerm, isMissingKey bool) {
296+
term = make(topologyTerm)
297+
for _, key := range topologyKeys {
298+
v, ok := node.Labels[key]
299+
if !ok {
300+
return nil, true
253301
}
254-
terms = append(terms, segments)
302+
term[key] = v
255303
}
256-
return terms
304+
return term, false
257305
}
258306

259307
func buildTopologyKeySelector(topologyKeys []string) (string, error) {
@@ -303,3 +351,19 @@ func (t topologyTerm) hash() string {
303351
sort.Strings(segments)
304352
return strings.Join(segments, ",")
305353
}
354+
355+
func (t topologyTerm) less(other topologyTerm) bool {
356+
return t.hash() < other.hash()
357+
}
358+
359+
func (t topologyTerm) equal(other topologyTerm) bool {
360+
return t.hash() == other.hash()
361+
}
362+
363+
func toCSITopology(terms []topologyTerm) []*csi.Topology {
364+
var out []*csi.Topology
365+
for _, term := range terms {
366+
out = append(out, &csi.Topology{Segments: term})
367+
}
368+
return out
369+
}

0 commit comments

Comments
 (0)