Skip to content

Commit 70543c4

Browse files
authored
dagrun: split Run() and DetectCycle() (#3433)
## Why It is more explicit, allowing for better error messages. I plan to extend Run() with more features and return values, so getting this out of the way helps.
1 parent 4a0a8ab commit 70543c4

File tree

6 files changed

+69
-54
lines changed

6 files changed

+69
-54
lines changed

acceptance/bundle/resource_deps/loop_jobs/direct/output.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ Workspace:
1010
Validation OK!
1111

1212
>>> musterr [CLI] bundle plan
13-
Error: while reading resources config: cycle detected: jobs.bar refers to jobs.foo via ${resources.jobs.bar.id} which refers to jobs.bar via ${resources.jobs.foo.id}
13+
Error: cycle detected: jobs.bar refers to jobs.foo via ${resources.jobs.bar.id} which refers to jobs.bar via ${resources.jobs.foo.id}
1414

1515

1616
Exit code (musterr): 1
1717

1818
>>> musterr [CLI] bundle deploy
1919
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
20-
Error: while reading resources config: cycle detected: jobs.bar refers to jobs.foo via ${resources.jobs.bar.id} which refers to jobs.bar via ${resources.jobs.foo.id}
20+
Error: cycle detected: jobs.bar refers to jobs.foo via ${resources.jobs.bar.id} which refers to jobs.bar via ${resources.jobs.foo.id}
2121

2222

2323
Exit code (musterr): 1

acceptance/bundle/resource_deps/loop_self/direct/output.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ Workspace:
1010
Validation OK!
1111

1212
>>> musterr [CLI] bundle plan
13-
Error: while reading resources config: cycle detected: jobs.foo refers to itself via ${resources.jobs.foo.id}
13+
Error: cycle detected: jobs.foo refers to itself via ${resources.jobs.foo.id}
1414

1515

1616
Exit code (musterr): 1
1717

1818
>>> musterr [CLI] bundle deploy
1919
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
20-
Error: while reading resources config: cycle detected: jobs.foo refers to itself via ${resources.jobs.foo.id}
20+
Error: cycle detected: jobs.foo refers to itself via ${resources.jobs.foo.id}
2121

2222

2323
Exit code (musterr): 1

bundle/terranova/apply.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,14 @@ func (m *terranovaApplyMutator) Apply(ctx context.Context, b *bundle.Bundle) dia
6363
}
6464
}
6565

66+
err = g.DetectCycle()
67+
if err != nil {
68+
return diag.FromErr(err)
69+
}
70+
6671
client := b.WorkspaceClient()
6772

68-
err = g.Run(defaultParallelism, func(node nodeKey) {
73+
g.Run(defaultParallelism, func(node nodeKey) {
6974
// TODO: if a given node fails, all downstream nodes should not be run. We should report those nodes.
7075
// TODO: ensure that config for this node is fully resolved at this point.
7176

@@ -134,9 +139,6 @@ func (m *terranovaApplyMutator) Apply(ctx context.Context, b *bundle.Bundle) dia
134139
}
135140
}
136141
})
137-
if err != nil {
138-
logdiag.LogError(ctx, err)
139-
}
140142

141143
err = b.ResourceDatabase.Finalize()
142144
if err != nil {

bundle/terranova/plan.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ func CalculateDeployActions(ctx context.Context, b *bundle.Bundle) ([]deployplan
8585
return nil, fmt.Errorf("reading config: %w", err)
8686
}
8787

88+
err = g.DetectCycle()
89+
if err != nil {
90+
return nil, err
91+
}
92+
8893
// Remained in state are resources that no longer present in the config
8994
for _, group := range utils.SortedKeys(state) {
9095
groupData := state[group]
@@ -102,7 +107,7 @@ func CalculateDeployActions(ctx context.Context, b *bundle.Bundle) ([]deployplan
102107
// we might have already got rid of this reference, thus potentially downgrading actionType
103108

104109
// parallelism is set to 1, so there is no multi-threaded access there.
105-
err = g.Run(1, func(node nodeKey) {
110+
g.Run(1, func(node nodeKey) {
106111
settings, ok := SupportedResources[node.Group]
107112
if !ok {
108113
logdiag.LogError(ctx, fmt.Errorf("resource not supported on direct backend: %s", node.Group))
@@ -151,9 +156,6 @@ func CalculateDeployActions(ctx context.Context, b *bundle.Bundle) ([]deployplan
151156
ActionType: actionType,
152157
})
153158
})
154-
if err != nil {
155-
return nil, fmt.Errorf("while reading resources config: %w", err)
156-
}
157159

158160
if logdiag.HasError(ctx) {
159161
return nil, errors.New("planning failed")

libs/dagrun/dagrun.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,26 @@ func (g *Graph[N]) DetectCycle() error {
150150
return nil
151151
}
152152

153-
func (g *Graph[N]) Run(pool int, runUnit func(N)) error {
154-
if err := g.DetectCycle(); err != nil {
155-
return err
156-
}
153+
func (g *Graph[N]) Run(pool int, runUnit func(N)) {
157154
if pool <= 0 || pool > len(g.adj) {
158155
pool = len(g.adj)
159156
}
160157

161158
in := g.indegrees()
159+
160+
// Prepare initial ready nodes in stable insertion order
161+
var initial []N
162+
for _, n := range g.nodes {
163+
if in[n] == 0 {
164+
initial = append(initial, n)
165+
}
166+
}
167+
168+
// If there are nodes but no entry points, the run cannot start
169+
if len(in) > 0 && len(initial) == 0 {
170+
panic("dagrun: no entry points")
171+
}
172+
162173
ready := make(chan N, len(in))
163174
done := make(chan N, len(in))
164175

@@ -174,11 +185,8 @@ func (g *Graph[N]) Run(pool int, runUnit func(N)) error {
174185
}()
175186
}
176187

177-
// stable initial-ready order based on insertion order
178-
for _, n := range g.nodes {
179-
if in[n] == 0 {
180-
ready <- n
181-
}
188+
for _, n := range initial {
189+
ready <- n
182190
}
183191

184192
for remaining := len(in); remaining > 0; {
@@ -192,5 +200,4 @@ func (g *Graph[N]) Run(pool int, runUnit func(N)) error {
192200
}
193201
close(ready)
194202
wg.Wait()
195-
return nil
196203
}

libs/dagrun/dagrun_test.go

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,31 @@ func TestRun_VariousGraphsAndPools(t *testing.T) {
2424
pools := []int{1, 2, 3, 4}
2525

2626
tests := []struct {
27-
name string
28-
nodes []string
29-
seen []string
30-
seen_sorted []string
31-
edges []edge
32-
cycle bool
33-
msg string
27+
name string
28+
nodes []string
29+
seen []string
30+
seenSorted []string
31+
edges []edge
32+
cycle string
3433
}{
3534
// disconnected graphs
35+
{
36+
name: "empty graph",
37+
},
3638
{
3739
name: "one node",
3840
nodes: []string{"A"},
3941
seen: []string{"A"},
4042
},
4143
{
42-
name: "two nodes",
43-
nodes: []string{"A", "B"},
44-
seen_sorted: []string{"A", "B"},
44+
name: "two nodes",
45+
nodes: []string{"A", "B"},
46+
seenSorted: []string{"A", "B"},
4547
},
4648
{
47-
name: "three nodes",
48-
nodes: []string{"A", "B", "C"},
49-
seen_sorted: []string{"A", "B", "C"},
49+
name: "three nodes",
50+
nodes: []string{"A", "B", "C"},
51+
seenSorted: []string{"A", "B", "C"},
5052
},
5153
{
5254
name: "simple DAG",
@@ -61,17 +63,15 @@ func TestRun_VariousGraphsAndPools(t *testing.T) {
6163
edges: []edge{
6264
{"A", "A", "${A.id}"},
6365
},
64-
cycle: true,
65-
msg: "cycle detected: A refers to itself via ${A.id}",
66+
cycle: "cycle detected: A refers to itself via ${A.id}",
6667
},
6768
{
6869
name: "two-node cycle",
6970
edges: []edge{
7071
{"A", "B", "${A.id}"},
7172
{"B", "A", "${B.id}"},
7273
},
73-
cycle: true,
74-
msg: "cycle detected: A refers to B via ${A.id} which refers to A via ${B.id}",
74+
cycle: "cycle detected: A refers to B via ${A.id} which refers to A via ${B.id}",
7575
},
7676
{
7777
name: "three-node cycle",
@@ -80,7 +80,7 @@ func TestRun_VariousGraphsAndPools(t *testing.T) {
8080
{"Y", "Z", "e2"},
8181
{"Z", "X", "e3"},
8282
},
83-
cycle: true,
83+
cycle: "cycle detected: X refers to Y via e1 Y refers to Z via e2 which refers to X via e3",
8484
},
8585
}
8686

@@ -95,30 +95,34 @@ func TestRun_VariousGraphsAndPools(t *testing.T) {
9595
g.AddDirectedEdge(stringWrapper{e.from}, stringWrapper{e.to}, e.name)
9696
}
9797

98+
err := g.DetectCycle()
99+
if tc.cycle != "" {
100+
require.Error(t, err, "expected cycle, got none")
101+
require.Equal(t, tc.cycle, err.Error())
102+
innerCalled := 0
103+
require.Panics(t, func() {
104+
g.Run(p, func(n stringWrapper) {
105+
innerCalled += 1
106+
})
107+
})
108+
require.Zero(t, innerCalled)
109+
return
110+
}
111+
require.NoError(t, err)
112+
98113
var mu sync.Mutex
99114
var seen []string
100-
err := g.Run(p, func(n stringWrapper) {
115+
g.Run(p, func(n stringWrapper) {
101116
mu.Lock()
102117
seen = append(seen, n.Value)
103118
mu.Unlock()
104119
})
105120

106-
if tc.cycle {
107-
if err == nil {
108-
t.Fatalf("expected cycle, got none")
109-
}
110-
if tc.msg != "" && err.Error() != tc.msg {
111-
t.Fatalf("wrong msg:\n got %q\nwant %q", err, tc.msg)
112-
}
113-
} else {
114-
require.NoError(t, err)
115-
}
116-
117121
if tc.seen != nil {
118122
assert.Equal(t, tc.seen, seen)
119-
} else if tc.seen_sorted != nil {
123+
} else if tc.seenSorted != nil {
120124
sort.Strings(seen)
121-
assert.Equal(t, tc.seen_sorted, seen)
125+
assert.Equal(t, tc.seenSorted, seen)
122126
} else {
123127
assert.Empty(t, seen)
124128
}

0 commit comments

Comments
 (0)