Skip to content

Commit 2d2a875

Browse files
author
Stan Lagun
committed
Dependency replication
This change adds ability to replicate dependency with index parameters iterated over arbitrary number of lists. For each dependency it is now possible to specify map of indexVariableName -> listExpression listExpression := range|item + [, listExpression] range := number '..' number item := STRING for example, if for "i: 1..3" the dependency will be replicated into 3 clones, each one of them having argument i set to value in range [1, 3] This also allows to consume N flow replicas by replicating the dependency that leads to the consumed flow
1 parent 977106b commit 2d2a875

File tree

6 files changed

+275
-28
lines changed

6 files changed

+275
-28
lines changed

pkg/client/dependencies.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type Dependency struct {
4141

4242
// Arguments passed to dependent resource
4343
Args map[string]string `json:"args,omitempty"`
44+
45+
// map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists
46+
GenerateFor map[string]string `json:"generateFor,omitempty"`
4447
}
4548

4649
// DependencyList is a k8s object representing list of dependencies

pkg/interfaces/interfaces.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ type GraphContext interface {
8282
Scheduler() Scheduler
8383
GetArg(string) string
8484
Graph() DependencyGraph
85-
Dependency() *client.Dependency
8685
}
8786

8887
// DependencyGraphOptions contains all the input required to build a dependency graph

pkg/resources/flow.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
package resources
1616

1717
import (
18-
"fmt"
1918
"log"
20-
"strings"
2119

2220
"github.com/Mirantis/k8s-AppController/pkg/client"
2321
"github.com/Mirantis/k8s-AppController/pkg/interfaces"
@@ -52,19 +50,13 @@ func (flowTemplateFactory) Kind() string {
5250
func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource {
5351
newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow)
5452

55-
dep := gc.Dependency()
56-
var depName string
57-
if dep != nil {
58-
depName = strings.Replace(dep.Name, dep.GenerateName, "", 1)
59-
}
60-
6153
return report.SimpleReporter{
6254
BaseResource: &flow{
6355
Base: Base{def.Meta},
6456
flow: newFlow,
6557
context: gc,
6658
originalName: def.Flow.Name,
67-
instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")),
59+
instanceName: gc.GetArg("AC_ID"),
6860
}}
6961
}
7062

pkg/scheduler/dependency_graph.go

Lines changed: 131 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"log"
2121
"sort"
22+
"strconv"
2223
"strings"
2324
"time"
2425

@@ -40,12 +41,12 @@ type dependencyGraph struct {
4041
}
4142

4243
type graphContext struct {
43-
args map[string]string
44-
graph *dependencyGraph
45-
scheduler *scheduler
46-
flow *client.Flow
47-
dependency *client.Dependency
48-
replica string
44+
args map[string]string
45+
graph *dependencyGraph
46+
scheduler *scheduler
47+
flow *client.Flow
48+
id string
49+
replica string
4950
}
5051

5152
var _ interfaces.GraphContext = &graphContext{}
@@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string {
6263
return gc.replica
6364
case "AC_FLOW_NAME":
6465
return gc.flow.Name
66+
case "AC_ID":
67+
return gc.id
6568
default:
6669
val, ok := gc.args[name]
6770
if ok {
@@ -84,11 +87,6 @@ func (gc graphContext) Graph() interfaces.DependencyGraph {
8487
return gc.graph
8588
}
8689

87-
// Dependency returns Dependency for which child is the resource being created with this context
88-
func (gc graphContext) Dependency() *client.Dependency {
89-
return gc.dependency
90-
}
91-
9290
// newScheduledResourceFor returns new scheduled resource for given resource in init state
9391
func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource {
9492
return &ScheduledResource{
@@ -159,7 +157,9 @@ func groupDependencies(dependencies []client.Dependency,
159157
defaultFlow = []client.Dependency{}
160158
addResource := func(name string) {
161159
if !strings.HasPrefix(name, "flow/") && !isDependant[name] {
162-
defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name})
160+
dep := client.Dependency{Parent: defaultFlowName, Child: name}
161+
dep.Name = name
162+
defaultFlow = append(defaultFlow, dep)
163163
isDependant[name] = true
164164
}
165165
}
@@ -328,11 +328,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string {
328328

329329
func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext {
330330
context := &graphContext{
331-
scheduler: sched,
332-
graph: parentContext.graph,
333-
flow: parentContext.flow,
334-
replica: replica,
335-
dependency: dependency,
331+
scheduler: sched,
332+
graph: parentContext.graph,
333+
flow: parentContext.flow,
334+
replica: replica,
335+
id: getVertexID(dependency, replica),
336336
}
337337

338338
context.args = make(map[string]string)
@@ -344,6 +344,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *
344344
return context
345345
}
346346

347+
func getVertexID(dependency *client.Dependency, replica string) string {
348+
var depName string
349+
if dependency != nil {
350+
depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1)
351+
}
352+
depName += replica
353+
return depName
354+
}
355+
347356
func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) {
348357
for key, value := range dependency.Args {
349358
context.args[key] = copier.EvaluateString(value, parentContext.GetArg)
@@ -661,6 +670,110 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
661670
return depGraph, nil
662671
}
663672

673+
func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow,
674+
useDestructionSelector bool) []client.Dependency {
675+
676+
deps := filterDependencies(dependencies, parent, flow, useDestructionSelector)
677+
var result []client.Dependency
678+
for _, dep := range deps {
679+
if len(dep.GenerateFor) == 0 {
680+
result = append(result, dep)
681+
continue
682+
}
683+
684+
var keys []string
685+
for k := range dep.GenerateFor {
686+
keys = append(keys, k)
687+
}
688+
sort.Strings(keys)
689+
lists := make([][]string, len(dep.GenerateFor))
690+
for i, key := range keys {
691+
lists[i] = expandListExpression(dep.GenerateFor[key])
692+
}
693+
for n, combination := range permute(lists) {
694+
newArgs := make(map[string]string, len(dep.Args)+len(keys))
695+
for k, v := range dep.Args {
696+
newArgs[k] = v
697+
}
698+
for i, key := range keys {
699+
newArgs[key] = combination[i]
700+
}
701+
depCopy := dep
702+
depCopy.Args = newArgs
703+
depCopy.Name += strconv.Itoa(n + 1)
704+
result = append(result, depCopy)
705+
}
706+
}
707+
return result
708+
}
709+
710+
func permute(variants [][]string) [][]string {
711+
switch len(variants) {
712+
case 0:
713+
return variants
714+
case 1:
715+
var result [][]string
716+
for _, v := range variants[0] {
717+
result = append(result, []string{v})
718+
}
719+
return result
720+
default:
721+
var result [][]string
722+
for _, tail := range variants[len(variants)-1] {
723+
for _, p := range permute(variants[:len(variants)-1]) {
724+
result = append(result, append(p, tail))
725+
}
726+
}
727+
return result
728+
}
729+
}
730+
731+
func expandListExpression(expr string) []string {
732+
var result []string
733+
734+
for _, part := range strings.Split(expr, ",") {
735+
part = strings.TrimSpace(part)
736+
737+
isRange := true
738+
var from, to int
739+
740+
rangeParts := strings.SplitN(part, "..", 2)
741+
if len(rangeParts) != 2 {
742+
isRange = false
743+
}
744+
745+
var err error
746+
if isRange {
747+
from, err = strconv.Atoi(rangeParts[0])
748+
if err != nil {
749+
isRange = false
750+
}
751+
}
752+
if isRange {
753+
to, err = strconv.Atoi(rangeParts[1])
754+
if err != nil {
755+
isRange = false
756+
}
757+
}
758+
759+
if isRange {
760+
step := 1
761+
if to < from {
762+
step = -1
763+
}
764+
for i := from; ; i += step {
765+
result = append(result, strconv.Itoa(i))
766+
if i == to {
767+
break
768+
}
769+
}
770+
} else {
771+
result = append(result, part)
772+
}
773+
}
774+
return result
775+
}
776+
664777
func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
665778
resDefs map[string]client.ResourceDefinition,
666779
dependencies map[string][]client.Dependency,
@@ -683,7 +796,7 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
683796
for e := queue.Front(); e != nil; e = e.Next() {
684797
parent := e.Value.(*Block)
685798

686-
deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector)
799+
deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector)
687800

688801
for _, dep := range deps {
689802
if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") {

pkg/scheduler/dependency_graph_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package scheduler
1616

1717
import (
18+
"strings"
1819
"testing"
1920

2021
"github.com/Mirantis/k8s-AppController/pkg/client"
@@ -319,3 +320,72 @@ func TestDependencyToFlowMatching(t *testing.T) {
319320
}
320321
}
321322
}
323+
324+
// TestPermute tests permute function
325+
func TestPermute(t *testing.T) {
326+
alphabets := [][]string{
327+
{"1", "2", "3"},
328+
{"+", "-"},
329+
{"a", "b"},
330+
{"="},
331+
}
332+
333+
expected := map[string]bool{
334+
"1+a=": true,
335+
"1+b=": true,
336+
"1-a=": true,
337+
"1-b=": true,
338+
"2+a=": true,
339+
"2+b=": true,
340+
"2-a=": true,
341+
"2-b=": true,
342+
"3+a=": true,
343+
"3+b=": true,
344+
"3-a=": true,
345+
"3-b=": true,
346+
}
347+
permutations := permute(alphabets)
348+
for _, combination := range permutations {
349+
combinationStr := strings.Join(combination, "")
350+
if !expected[combinationStr] {
351+
t.Errorf("unexpected combination %s", combinationStr)
352+
} else {
353+
delete(expected, combinationStr)
354+
}
355+
}
356+
if len(expected) != 0 {
357+
t.Error("not all combinations were generated")
358+
}
359+
360+
alphabets = append(alphabets, make([]string, 0))
361+
if len(permute(alphabets)) != 0 {
362+
t.Error("empty alphabet didin't result in empty permutation list")
363+
}
364+
}
365+
366+
// TestExpendListExpression tests list expression translation to list of strings
367+
func TestExpendListExpression(t *testing.T) {
368+
table := map[string][]string{
369+
"1": {"1"},
370+
"1..5": {"1", "2", "3", "4", "5"},
371+
"2..-1": {"2", "1", "0", "-1"},
372+
"a, b": {"a", "b"},
373+
"a, b, 2..4": {"a", "b", "2", "3", "4"},
374+
"-1..1, 2..4, x": {"-1", "0", "1", "2", "3", "4", "x"},
375+
"a..b": {"a..b"},
376+
"a..b, 1..3": {"a..b", "1", "2", "3"},
377+
"a..b, c..d": {"a..b", "c..d"},
378+
}
379+
for expr, expected := range table {
380+
result := expandListExpression(expr)
381+
if len(result) != len(expected) {
382+
t.Errorf("unexpected result length for expression %s: %d != %d", expr, len(result), len(expected))
383+
} else {
384+
for i := range expected {
385+
if expected[i] != result[i] {
386+
t.Errorf("invalid entry %d for expression %s: %s != %s", i, expr, expected[i], result[i])
387+
}
388+
}
389+
}
390+
}
391+
}

0 commit comments

Comments
 (0)