Skip to content

Commit 80b7bd3

Browse files
author
Stan Lagun
committed
Dependency replication
This change adds ability to replicate dependency with index parameters iterated over arbitrary number of lists. For each dependency it is now possible to specify map of indexVariableName -> listExpression listExpression := range|item + [, listExpression] range := number '..' number item := STRING for example, if for "i: 1..3" the dependency will be replicated into 3 clones, each one of them having argument i set to value in range [1, 3] This also allows to consume N flow replicas by replicating the dependency that leads to the consumed flow
1 parent 6b44656 commit 80b7bd3

File tree

7 files changed

+395
-31
lines changed

7 files changed

+395
-31
lines changed

docs/flows.md

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ my-flow -> -> pod/pod-$arg
195195
```
196196
will create two pods: `pod-a` and `pod-b`.
197197
198-
## Replication of flows
198+
## Replication
199199
200200
Flow replication is an AppController feature that makes specified number of flow graph copies, each one with
201201
a unique name and then merges them into a single graph. Because each replica name may be used in some of resource
@@ -234,6 +234,96 @@ If there were 7 of them, 4 replicas would be deleted.\
234234
`kubeac run my-flow` if there are no replicas exist, create one, otherwise validate status of
235235
resources of existing replicas.
236236
237+
### Replication of dependencies
238+
239+
With commandline parameters one can create number of flow replicas. But sometimes there is a need to have flow
240+
that creates several replicas of another flow, or just several resources with the same specification that differ
241+
only in name.
242+
243+
One possible solution is to utilize technique shown above: make parameter value be part of resource name and
244+
them duplicate the dependency that leads to this resource and pass different parameter value along each of
245+
dependencies. This works well for small and fixed number of replicas. But if the number goes big, it becomes hard
246+
to manage such number of dependency objects. Moreover if the number itself is not fixed but rather passed as a
247+
parameter replicating resource by manual replication of dependencies becomes impossible.
248+
249+
Luckily, the dependencies can be automatically replicated. This is done through the `generateFor` field of the
250+
`Dependency` object. `generateFor` is a map where keys are argument names and values are list expressions. Each list
251+
expression is comma-separated list of values. If the value has a form of `number..number`, it is expended into a
252+
list of integers in the given range. For example `"1..3, 10..11, abc"` will turn into `["1", "2", "3", "10", "11", "abc"]`.
253+
Then the dependency is going to be replicated automatically with each replica getting on of the list values as an
254+
additional argument. There can be several `generateFor` arguments. In this case there is going to be one dependency
255+
for each combination of the list values. For example
256+
257+
```YAML
258+
apiVersion: appcontroller.k8s/v1alpha1
259+
kind: Dependency
260+
metadata:
261+
name: dependency
262+
parent: pod/podName
263+
child: flow/flowName-$x-$y
264+
generateFor:
265+
x: 1..2
266+
y: a, b
267+
```
268+
269+
has the same effect as
270+
271+
```YAML
272+
apiVersion: appcontroller.k8s/v1alpha1
273+
kind: Dependency
274+
metadata:
275+
name: dependency1
276+
parent: pod/podName
277+
child: flow/flowName-$x-$y
278+
args:
279+
x: 1
280+
y: a
281+
---
282+
apiVersion: appcontroller.k8s/v1alpha1
283+
kind: Dependency
284+
metadata:
285+
name: dependency2
286+
parent: pod/podName
287+
child: flow/flowName-$x-$y
288+
args:
289+
x: 2
290+
y: a
291+
---
292+
apiVersion: appcontroller.k8s/v1alpha1
293+
kind: Dependency
294+
metadata:
295+
name: dependency3
296+
parent: pod/podName
297+
child: flow/flowName-$x-$y
298+
args:
299+
x: 1
300+
y: b
301+
---
302+
apiVersion: appcontroller.k8s/v1alpha1
303+
kind: Dependency
304+
metadata:
305+
name: dependency4
306+
parent: pod/podName
307+
child: flow/flowName-$x-$y
308+
args:
309+
x: 2
310+
y: b
311+
```
312+
313+
Besides simplifying the dependency graph, dependency replication makes possible to have dynamic number of replicas
314+
by using parameter value right inside the list expressions:
315+
316+
```YAML
317+
apiVersion: appcontroller.k8s/v1alpha1
318+
kind: Dependency
319+
metadata:
320+
name: dependency
321+
parent: pod/podName
322+
child: flow/flowName-$index
323+
generateFor:
324+
index: 1..$replicaCount
325+
```
326+
237327
### Replica-spaces and contexts
238328
239329
Replica-space, is a tag that all replicas of the flow share. When new `Replica` object for the flow is created,

pkg/client/dependencies.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ type Dependency struct {
4141

4242
// Arguments passed to dependent resource
4343
Args map[string]string `json:"args,omitempty"`
44+
45+
// map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists
46+
GenerateFor map[string]string `json:"generateFor,omitempty"`
4447
}
4548

4649
// DependencyList is a k8s object representing list of dependencies

pkg/interfaces/interfaces.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ type GraphContext interface {
8282
Scheduler() Scheduler
8383
GetArg(string) string
8484
Graph() DependencyGraph
85-
Dependency() *client.Dependency
8685
}
8786

8887
// DependencyGraphOptions contains all the input required to build a dependency graph

pkg/resources/flow.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
package resources
1616

1717
import (
18-
"fmt"
1918
"log"
20-
"strings"
2119

2220
"github.com/Mirantis/k8s-AppController/pkg/client"
2321
"github.com/Mirantis/k8s-AppController/pkg/interfaces"
@@ -29,7 +27,6 @@ type flow struct {
2927
flow *client.Flow
3028
context interfaces.GraphContext
3129
originalName string
32-
instanceName string
3330
currentGraph interfaces.DependencyGraph
3431
}
3532

@@ -52,19 +49,12 @@ func (flowTemplateFactory) Kind() string {
5249
func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource {
5350
newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow)
5451

55-
dep := gc.Dependency()
56-
var depName string
57-
if dep != nil {
58-
depName = strings.Replace(dep.Name, dep.GenerateName, "", 1)
59-
}
60-
6152
return report.SimpleReporter{
6253
BaseResource: &flow{
6354
Base: Base{def.Meta},
6455
flow: newFlow,
6556
context: gc,
6657
originalName: def.Flow.Name,
67-
instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")),
6858
}}
6959
}
7060

@@ -98,7 +88,7 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D
9888
options := interfaces.DependencyGraphOptions{
9989
FlowName: f.originalName,
10090
Args: args,
101-
FlowInstanceName: f.instanceName,
91+
FlowInstanceName: f.context.GetArg("AC_ID"),
10292
ReplicaCount: replicaCount,
10393
Silent: silent,
10494
FixedNumberOfReplicas: fixedNumberOfReplicas,

pkg/scheduler/dependency_graph.go

Lines changed: 126 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"log"
2121
"sort"
22+
"strconv"
2223
"strings"
2324
"time"
2425

@@ -40,12 +41,12 @@ type dependencyGraph struct {
4041
}
4142

4243
type graphContext struct {
43-
args map[string]string
44-
graph *dependencyGraph
45-
scheduler *scheduler
46-
flow *client.Flow
47-
dependency *client.Dependency
48-
replica string
44+
args map[string]string
45+
graph *dependencyGraph
46+
scheduler *scheduler
47+
flow *client.Flow
48+
id string
49+
replica string
4950
}
5051

5152
var _ interfaces.GraphContext = &graphContext{}
@@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string {
6263
return gc.replica
6364
case "AC_FLOW_NAME":
6465
return gc.flow.Name
66+
case "AC_ID":
67+
return gc.id
6568
default:
6669
val, ok := gc.args[name]
6770
if ok {
@@ -84,11 +87,6 @@ func (gc graphContext) Graph() interfaces.DependencyGraph {
8487
return gc.graph
8588
}
8689

87-
// Dependency returns Dependency for which child is the resource being created with this context
88-
func (gc graphContext) Dependency() *client.Dependency {
89-
return gc.dependency
90-
}
91-
9290
// newScheduledResourceFor returns new scheduled resource for given resource in init state
9391
func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource {
9492
return &ScheduledResource{
@@ -159,7 +157,9 @@ func groupDependencies(dependencies []client.Dependency,
159157
defaultFlow = []client.Dependency{}
160158
addResource := func(name string) {
161159
if !strings.HasPrefix(name, "flow/") && !isDependant[name] {
162-
defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name})
160+
dep := client.Dependency{Parent: defaultFlowName, Child: name}
161+
dep.Name = name
162+
defaultFlow = append(defaultFlow, dep)
163163
isDependant[name] = true
164164
}
165165
}
@@ -328,11 +328,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string {
328328

329329
func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext {
330330
context := &graphContext{
331-
scheduler: sched,
332-
graph: parentContext.graph,
333-
flow: parentContext.flow,
334-
replica: replica,
335-
dependency: dependency,
331+
scheduler: sched,
332+
graph: parentContext.graph,
333+
flow: parentContext.flow,
334+
replica: replica,
335+
id: getVertexID(dependency, replica),
336336
}
337337

338338
context.args = make(map[string]string)
@@ -344,6 +344,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *
344344
return context
345345
}
346346

347+
func getVertexID(dependency *client.Dependency, replica string) string {
348+
var depName string
349+
if dependency != nil {
350+
depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1)
351+
}
352+
depName += replica
353+
return depName
354+
}
355+
347356
func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) {
348357
for key, value := range dependency.Args {
349358
context.args[key] = copier.EvaluateString(value, parentContext.GetArg)
@@ -661,6 +670,105 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
661670
return depGraph, nil
662671
}
663672

673+
func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow,
674+
useDestructionSelector bool, context *graphContext) []client.Dependency {
675+
676+
deps := filterDependencies(dependencies, parent, flow, useDestructionSelector)
677+
var result []client.Dependency
678+
for _, dep := range deps {
679+
if len(dep.GenerateFor) == 0 {
680+
result = append(result, dep)
681+
continue
682+
}
683+
684+
var keys []string
685+
for k := range dep.GenerateFor {
686+
keys = append(keys, k)
687+
}
688+
sort.Strings(keys)
689+
lists := make([][]string, len(dep.GenerateFor))
690+
for i, key := range keys {
691+
lists[i] = expandListExpression(copier.EvaluateString(dep.GenerateFor[key], getArgFunc(context)))
692+
}
693+
for n, combination := range permute(lists) {
694+
newArgs := make(map[string]string, len(dep.Args)+len(keys))
695+
for k, v := range dep.Args {
696+
newArgs[k] = v
697+
}
698+
for i, key := range keys {
699+
newArgs[key] = combination[i]
700+
}
701+
depCopy := dep
702+
depCopy.Args = newArgs
703+
depCopy.Name += strconv.Itoa(n + 1)
704+
result = append(result, depCopy)
705+
}
706+
}
707+
return result
708+
}
709+
710+
func permute(variants [][]string) [][]string {
711+
switch len(variants) {
712+
case 0:
713+
return variants
714+
case 1:
715+
var result [][]string
716+
for _, v := range variants[0] {
717+
result = append(result, []string{v})
718+
}
719+
return result
720+
default:
721+
var result [][]string
722+
for _, tail := range variants[len(variants)-1] {
723+
for _, p := range permute(variants[:len(variants)-1]) {
724+
result = append(result, append(p, tail))
725+
}
726+
}
727+
return result
728+
}
729+
}
730+
731+
func expandListExpression(expr string) []string {
732+
var result []string
733+
for _, part := range strings.Split(expr, ",") {
734+
part = strings.TrimSpace(part)
735+
if part == "" {
736+
continue
737+
}
738+
739+
isRange := true
740+
var from, to int
741+
742+
rangeParts := strings.SplitN(part, "..", 2)
743+
if len(rangeParts) != 2 {
744+
isRange = false
745+
}
746+
747+
var err error
748+
if isRange {
749+
from, err = strconv.Atoi(rangeParts[0])
750+
if err != nil {
751+
isRange = false
752+
}
753+
}
754+
if isRange {
755+
to, err = strconv.Atoi(rangeParts[1])
756+
if err != nil {
757+
isRange = false
758+
}
759+
}
760+
761+
if isRange {
762+
for i := from; i <= to ; i ++ {
763+
result = append(result, strconv.Itoa(i))
764+
}
765+
} else {
766+
result = append(result, part)
767+
}
768+
}
769+
return result
770+
}
771+
664772
func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
665773
resDefs map[string]client.ResourceDefinition,
666774
dependencies map[string][]client.Dependency,
@@ -683,7 +791,7 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
683791
for e := queue.Front(); e != nil; e = e.Next() {
684792
parent := e.Value.(*Block)
685793

686-
deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector)
794+
deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext)
687795

688796
for _, dep := range deps {
689797
if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") {

0 commit comments

Comments
 (0)