Skip to content

Commit 977106b

Browse files
author
Stan Lagun
committed
Ability to parametrize resource key in dependency
Currently, flow resource name is auto-generated from replica name of the outer (consuming) graph and dependency name, whose child is the flow resource. As a result, if we want a flow call to depend on two parent resources, we will get two flow calls after each parent becomes ready, rather than one call after both turn ready. If we make flow name generation depend on replica name only, the problem above will be solved, however it will still not be possible to have one flow call for entire graph rather than for each replica. Also in some cases we might want it to be called for each dependency, if there are different arguments passed along each of them. If we remove name generation at all, the only way, how we'll be able to achieve behavior other, than one call per graph will be to make the flow name by dynamic, i.e. include things like $AC_NAME or $arg1. Though technically it solves the problem, it has its own major cons: * Names of the flows will become ugly and flow name is a user facing thing because it will propagate into CLI commandlines * Flows were supposed to be reusable. Flow author might not know how it will be used and not include needed variable in flow name * Even if he did, one flow may still be used in different ways. For example, what if the flow name is "flow-$AC_NAME" and I want to call it just once for all replicas of my flow. There is no way to pass $AC_NAME explicitly to the flow to make it be a fixed value. This commit proposes another solution to the problem. There is no need to use variables in flow names. Instead, it can be parametrized on the caller side with new dependency syntax. Now I can add optional suffix (3rd component of the name) to parent/child names in dependencies. For example: `child: flow/my-flow/$AC_NAME` Resource definition is looked up by first 2 components (flow/my-flow), but resources from different replicas are merged only if the whole 3-component key is the same. So we get: * Ability to call flows by simple names * It is now possible to call flow several times within another flow. Without this change it is impossible, because node names are unique in the graph and if we include several flow calls, we will get cycle in the graph. However, with this change node identity is identified by 3-part string, while resource definition by just two, so we can have several usage of the same resource that differ by 3-rd component only * Similar to flows, it becomes possible to include the same resource more than once in a graph. For example it will become possible to create resource, do something and then update the same resource. Or delete it.
1 parent e97fbc8 commit 977106b

File tree

9 files changed

+239
-72
lines changed

9 files changed

+239
-72
lines changed

e2e/flows_test.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package integration
1616

1717
import (
1818
"strings"
19+
"time"
1920

2021
testutils "github.com/Mirantis/k8s-AppController/e2e/utils"
2122
"github.com/Mirantis/k8s-AppController/pkg/interfaces"
@@ -33,56 +34,56 @@ var _ = Describe("Flows Suite", func() {
3334
framework.CreateRunAndVerify("flows", interfaces.DependencyGraphOptions{MinReplicaCount: 1})
3435
Eventually(func() int {
3536
return framework.countJobs("a-job-", false)
36-
}).Should(Equal(1), "1 a-job* should have been created")
37+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 a-job* should have been created")
3738
Eventually(func() int {
3839
return framework.countJobs("b-job-", false)
39-
}).Should(Equal(1), "1 a-job* should have been created")
40+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 a-job* should have been created")
4041
Eventually(func() int {
4142
return framework.countJobs("test-job", true)
42-
}).Should(Equal(1), "1 test-job should have been created")
43+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 test-job should have been created")
4344
Eventually(func() int {
4445
return framework.countPods("a-pod-", false)
45-
}).Should(Equal(1), "1 a-pod* should have been created")
46+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 a-pod* should have been created")
4647
Eventually(func() int {
4748
return framework.countPods("b-pod-", false)
48-
}).Should(Equal(1), "1 a-pod* should have been created")
49+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 a-pod* should have been created")
4950
Eventually(func() int {
5051
return framework.countPods("test-pod", true)
51-
}).Should(Equal(1), "1 test-pod should have been created")
52+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 test-pod should have been created")
5253
Eventually(func() int {
5354
return framework.countReplicas("test-flow", true)
54-
}).Should(Equal(2), "2 test-flow replicas should have been created")
55+
}, 300*time.Second, 5*time.Second).Should(Equal(2), "2 test-flow replicas should have been created")
5556
Eventually(func() int {
5657
return framework.countReplicas("DEFAULT", true)
57-
}).Should(Equal(1), "1 DEFAULT flow replica should have been created")
58+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 DEFAULT flow replica should have been created")
5859
})
5960

6061
It("Example 'flows' with replication should finish and create two replicas", func() {
6162
framework.CreateRunAndVerify("flows", interfaces.DependencyGraphOptions{ReplicaCount: 2})
6263
Eventually(func() int {
6364
return framework.countJobs("a-job-", false)
64-
}).Should(Equal(2), "1 a-job* should have been created")
65+
}, 300*time.Second, 5*time.Second).Should(Equal(2), "1 a-job* should have been created")
6566
Eventually(func() int {
6667
return framework.countJobs("b-job-", false)
67-
}).Should(Equal(2), "1 a-job* should have been created")
68+
}, 300*time.Second, 5*time.Second).Should(Equal(2), "1 a-job* should have been created")
6869
Eventually(func() int {
6970
return framework.countJobs("test-job", true)
70-
}).Should(Equal(1), "1 test-job should have been created")
71+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 test-job should have been created")
7172
Eventually(func() int {
7273
return framework.countPods("a-pod-", false)
73-
}).Should(Equal(2), "1 a-pod* should have been created")
74+
}, 300*time.Second, 5*time.Second).Should(Equal(2), "1 a-pod* should have been created")
7475
Eventually(func() int {
7576
return framework.countPods("b-pod-", false)
76-
}).Should(Equal(2), "1 a-pod* should have been created")
77+
}, 300*time.Second, 5*time.Second).Should(Equal(2), "1 a-pod* should have been created")
7778
Eventually(func() int {
7879
return framework.countPods("test-pod", true)
79-
}).Should(Equal(1), "1 test-pod should have been created")
80+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "1 test-pod should have been created")
8081
Eventually(func() int {
8182
return framework.countReplicas("test-flow", true)
82-
}).Should(Equal(4), "2 test-flow replicas should have been created")
83+
}, 300*time.Second, 5*time.Second).Should(Equal(4), "2 test-flow replicas should have been created")
8384
Eventually(func() int {
8485
return framework.countReplicas("DEFAULT", true)
85-
}).Should(Equal(2), "1 DEFAULT flow replica should have been created")
86+
}, 300*time.Second, 5*time.Second).Should(Equal(2), "1 DEFAULT flow replica should have been created")
8687
})
8788

8889
It("Example 'flows' should cleanup after itself", func() {
@@ -96,13 +97,13 @@ var _ = Describe("Flows Suite", func() {
9697

9798
Eventually(func() int {
9899
return framework.countReplicas("", false)
99-
}).Should(Equal(0), "0 replicas should remain")
100+
}, 300*time.Second, 5*time.Second).Should(Equal(0), "0 replicas should remain")
100101
Eventually(func() int {
101102
return framework.countJobs("", false)
102-
}).Should(Equal(0), "0 jobs should remain")
103+
}, 300*time.Second, 5*time.Second).Should(Equal(0), "0 jobs should remain")
103104
Eventually(func() int {
104105
return framework.countPods("", false)
105-
}).Should(Equal(1), "only AC pod should remain")
106+
}, 300*time.Second, 5*time.Second).Should(Equal(1), "only AC pod should remain")
106107
})
107108
})
108109

examples/flows/create.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ cat job2.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_N
1515
cat pod.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_NAME create -f -
1616
cat pod2.yaml | $KUBECTL_NAME exec -i k8s-appcontroller kubeac wrap | $KUBECTL_NAME create -f -
1717

18-
$KUBECTL_NAME exec k8s-appcontroller run
18+
$KUBECTL_NAME exec k8s-appcontroller kubeac run
1919

2020
$KUBECTL_NAME logs -f k8s-appcontroller

examples/flows/deps.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ kind: Dependency
33
metadata:
44
generateName: dependency-
55
parent: flow/DEFAULT
6-
child: flow/test-flow
6+
child: flow/test-flow/$prefix$AC_NAME
77
args:
88
prefix: a
99
---
@@ -19,17 +19,18 @@ kind: Dependency
1919
metadata:
2020
generateName: dependency-
2121
parent: pod/test-pod
22-
child: flow/test-flow
22+
child: flow/test-flow/$prefix$AC_NAME
2323
args:
2424
prefix: b
2525
---
2626
apiVersion: appcontroller.k8s/v1alpha1
2727
kind: Dependency
2828
metadata:
2929
generateName: dependency-
30-
parent: flow/test-flow
30+
parent: flow/test-flow/$prefix$AC_NAME
3131
child: job/test-job
3232
---
33+
3334
apiVersion: appcontroller.k8s/v1alpha1
3435
kind: Dependency
3536
metadata:

pkg/interfaces/interfaces.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type GraphContext interface {
8282
Scheduler() Scheduler
8383
GetArg(string) string
8484
Graph() DependencyGraph
85-
Dependencies() []client.Dependency
85+
Dependency() *client.Dependency
8686
}
8787

8888
// DependencyGraphOptions contains all the input required to build a dependency graph

pkg/resources/flow.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626

2727
type flow struct {
2828
Base
29-
flow *client.Flow
30-
context interfaces.GraphContext
31-
generatedName string
32-
originalName string
33-
currentGraph interfaces.DependencyGraph
29+
flow *client.Flow
30+
context interfaces.GraphContext
31+
originalName string
32+
instanceName string
33+
currentGraph interfaces.DependencyGraph
3434
}
3535

3636
type flowTemplateFactory struct{}
@@ -52,19 +52,19 @@ func (flowTemplateFactory) Kind() string {
5252
func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource {
5353
newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow)
5454

55-
deps := gc.Dependencies()
55+
dep := gc.Dependency()
5656
var depName string
57-
if len(deps) > 0 {
58-
depName = strings.Replace(deps[0].Name, deps[0].GenerateName, "", 1)
57+
if dep != nil {
58+
depName = strings.Replace(dep.Name, dep.GenerateName, "", 1)
5959
}
6060

6161
return report.SimpleReporter{
6262
BaseResource: &flow{
63-
Base: Base{def.Meta},
64-
flow: newFlow,
65-
context: gc,
66-
generatedName: fmt.Sprintf("%s-%s%s", newFlow.Name, depName, gc.GetArg("AC_NAME")),
67-
originalName: def.Flow.Name,
63+
Base: Base{def.Meta},
64+
flow: newFlow,
65+
context: gc,
66+
originalName: def.Flow.Name,
67+
instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")),
6868
}}
6969
}
7070

@@ -77,7 +77,7 @@ func (flowTemplateFactory) NewExisting(name string, c client.Interface, gc inter
7777

7878
// Key return Flow identifier
7979
func (f flow) Key() string {
80-
return "flow/" + f.generatedName
80+
return "flow/" + f.flow.Name
8181
}
8282

8383
func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.DependencyGraph, error) {
@@ -95,7 +95,7 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D
9595
options := interfaces.DependencyGraphOptions{
9696
FlowName: f.originalName,
9797
Args: args,
98-
FlowInstanceName: f.generatedName,
98+
FlowInstanceName: f.instanceName,
9999
ReplicaCount: replicaCount,
100100
Silent: silent,
101101
FixedNumberOfReplicas: fixedNumberOfReplicas,

pkg/scheduler/dependency_graph.go

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ type dependencyGraph struct {
4040
}
4141

4242
type graphContext struct {
43-
args map[string]string
44-
graph *dependencyGraph
45-
scheduler *scheduler
46-
flow *client.Flow
47-
replica string
48-
dependencies []client.Dependency
43+
args map[string]string
44+
graph *dependencyGraph
45+
scheduler *scheduler
46+
flow *client.Flow
47+
dependency *client.Dependency
48+
replica string
4949
}
5050

5151
var _ interfaces.GraphContext = &graphContext{}
@@ -84,13 +84,13 @@ func (gc graphContext) Graph() interfaces.DependencyGraph {
8484
return gc.graph
8585
}
8686

87-
// Dependencies method returns list of incoming dependencies for the node on which operation is performed
88-
func (gc graphContext) Dependencies() []client.Dependency {
89-
return gc.dependencies
87+
// Dependency returns Dependency for which child is the resource being created with this context
88+
func (gc graphContext) Dependency() *client.Dependency {
89+
return gc.dependency
9090
}
9191

9292
// newScheduledResourceFor returns new scheduled resource for given resource in init state
93-
func newScheduledResourceFor(r interfaces.Resource, context *graphContext, existing bool) *ScheduledResource {
93+
func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource {
9494
return &ScheduledResource{
9595
Started: false,
9696
Ignored: false,
@@ -99,6 +99,7 @@ func newScheduledResourceFor(r interfaces.Resource, context *graphContext, exist
9999
Meta: map[string]map[string]string{},
100100
context: context,
101101
Existing: existing,
102+
suffix: copier.EvaluateString(suffix, getArgFunc(context)),
102103
}
103104
}
104105

@@ -252,7 +253,7 @@ func isMapContainedIn(contained, containing map[string]string) bool {
252253
}
253254

254255
// newScheduledResource is a constructor for ScheduledResource
255-
func (sched scheduler) newScheduledResource(kind, name string, resDefs map[string]client.ResourceDefinition,
256+
func (sched scheduler) newScheduledResource(kind, name, suffix string, resDefs map[string]client.ResourceDefinition,
256257
gc *graphContext, silent bool) (*ScheduledResource, error) {
257258
var r interfaces.Resource
258259

@@ -266,7 +267,7 @@ func (sched scheduler) newScheduledResource(kind, name string, resDefs map[strin
266267
return nil, err
267268
}
268269

269-
return newScheduledResourceFor(r, gc, existing), nil
270+
return newScheduledResourceFor(r, suffix, gc, existing), nil
270271
}
271272

272273
// newResource returns creates a resource controller for a given resources name and factory.
@@ -294,14 +295,17 @@ func (sched scheduler) newResource(kind, name string, resDefs map[string]client.
294295
return r, true, nil
295296
}
296297

297-
func keyParts(key string) (kind, name string, err error) {
298-
parts := strings.Split(key, "/")
298+
func keyParts(key string) (kind, name, suffix string, err error) {
299+
parts := strings.SplitN(key, "/", 31)
299300

300301
if len(parts) < 2 {
301-
return "", "", fmt.Errorf("not a proper resource key: %s. Expected KIND/NAME", key)
302+
return "", "", "", fmt.Errorf("not a proper resource key: %s. Expected KIND/NAME", key)
303+
}
304+
if len(parts) == 3 {
305+
suffix = parts[2]
302306
}
303307

304-
return parts[0], parts[1], nil
308+
return parts[0], parts[1], suffix, nil
305309
}
306310

307311
func newDependencyGraph(sched *scheduler, options interfaces.DependencyGraphOptions) *dependencyGraph {
@@ -322,17 +326,17 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string {
322326
}
323327
}
324328

325-
func (sched *scheduler) prepareContext(parentContext *graphContext, dependencies []client.Dependency, replica string) *graphContext {
329+
func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext {
326330
context := &graphContext{
327-
scheduler: sched,
328-
graph: parentContext.graph,
329-
flow: parentContext.flow,
330-
replica: replica,
331-
dependencies: dependencies,
331+
scheduler: sched,
332+
graph: parentContext.graph,
333+
flow: parentContext.flow,
334+
replica: replica,
335+
dependency: dependency,
332336
}
333337

334338
context.args = make(map[string]string)
335-
for _, dependency := range dependencies {
339+
if dependency != nil {
336340
for key, value := range dependency.Args {
337341
context.args[key] = copier.EvaluateString(value, getArgFunc(parentContext))
338342
}
@@ -344,7 +348,6 @@ func (sched *scheduler) updateContext(context, parentContext *graphContext, depe
344348
for key, value := range dependency.Args {
345349
context.args[key] = copier.EvaluateString(value, parentContext.GetArg)
346350
}
347-
context.dependencies = append(context.dependencies, dependency)
348351
}
349352

350353
func newDefaultFlowObject() *client.Flow {
@@ -695,10 +698,10 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
695698
parentContext = parent.scheduledResource.context
696699
}
697700

698-
kind, name, err := keyParts(dep.Child)
701+
kind, name, suffix, err := keyParts(dep.Child)
699702

700-
context := sched.prepareContext(parentContext, []client.Dependency{dep}, replicaName)
701-
sr, err := sched.newScheduledResource(kind, name, resDefs, context, silent)
703+
context := sched.prepareContext(parentContext, &dep, replicaName)
704+
sr, err := sched.newScheduledResource(kind, name, suffix, resDefs, context, silent)
702705
if err != nil {
703706
return err
704707
}

0 commit comments

Comments
 (0)