Skip to content

Commit 767958d

Browse files
authored
direct: do not process nodes if dependency failed (#3441)
## Changes - Update dagrun to allow callback return success/failure and receive failedNode in the callback. - Use this functionality during plan/deploy to skip & log resources that could not be deployed because dependency was not deployed. Each skipped resource is logged like this: `Error: cannot create jobs.bar: dependency failed: jobs.foo`. ## Tests Unit tests and new acceptance test. Takes advantage of error injection in tests: #3437 and EnvVaryOutput #3438
1 parent 009dc5c commit 767958d

File tree

11 files changed

+402
-74
lines changed

11 files changed

+402
-74
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
resources:
2+
jobs:
3+
foo:
4+
name: foo
5+
description: INJECT_ERROR
6+
bar:
7+
name: bar
8+
description: depends on foo id ${resources.jobs.foo.id}
9+
baz:
10+
name: baz
11+
description: depends on bar id ${resources.jobs.bar.id}
12+
independent:
13+
name: independent
14+
description: should be deployed anyway
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
2+
Deploying resources...
3+
Error: cannot create jobs.foo: INJECTED (500 UNKNOWN)
4+
5+
Endpoint: POST [DATABRICKS_URL]/api/2.2/jobs/create
6+
HTTP Status: 500 Internal Server Error
7+
API error_code: UNKNOWN
8+
API message: INJECTED
9+
10+
Error: cannot create jobs.bar: dependency failed: jobs.foo
11+
12+
Error: cannot create jobs.baz: dependency failed: jobs.foo
13+
14+
Updating deployment state...
15+
16+
Exit code (musterr): 1
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
2+
Deploying resources...
3+
Error: terraform apply: exit status 1
4+
5+
Error: cannot create job: INJECTED
6+
7+
with databricks_job.foo,
8+
on bundle.tf.json line 56, in resource.databricks_job.foo:
9+
56: },
10+
11+
12+
13+
Updating deployment state...
14+
15+
Exit code (musterr): 1
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Local = true
2+
Cloud = false
3+
4+
[EnvMatrix]
5+
DATABRICKS_CLI_DEPLOYMENT = ["terraform", "direct-exp"]
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
2+
>>> [CLI] bundle plan
3+
create jobs.bar
4+
create jobs.baz
5+
create jobs.foo
6+
create jobs.independent
7+
8+
>>> print_sorted_requests
9+
{
10+
"body": {
11+
"deployment": {
12+
"kind": "BUNDLE",
13+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
14+
},
15+
"description": "INJECT_ERROR",
16+
"edit_mode": "UI_LOCKED",
17+
"format": "MULTI_TASK",
18+
"max_concurrent_runs": 1,
19+
"name": "foo",
20+
"queue": {
21+
"enabled": true
22+
}
23+
},
24+
"method": "POST",
25+
"path": "/api/2.2/jobs/create"
26+
}
27+
{
28+
"body": {
29+
"deployment": {
30+
"kind": "BUNDLE",
31+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
32+
},
33+
"description": "should be deployed anyway",
34+
"edit_mode": "UI_LOCKED",
35+
"format": "MULTI_TASK",
36+
"max_concurrent_runs": 1,
37+
"name": "independent",
38+
"queue": {
39+
"enabled": true
40+
}
41+
},
42+
"method": "POST",
43+
"path": "/api/2.2/jobs/create"
44+
}
45+
46+
>>> [CLI] bundle summary
47+
Name: test-bundle
48+
Target: default
49+
Workspace:
50+
User: [USERNAME]
51+
Path: /Workspace/Users/[USERNAME]/.bundle/test-bundle/default
52+
Resources:
53+
Jobs:
54+
bar:
55+
Name: bar
56+
URL: (not deployed)
57+
baz:
58+
Name: baz
59+
URL: (not deployed)
60+
foo:
61+
Name: foo
62+
URL: (not deployed)
63+
independent:
64+
Name: independent
65+
URL: [DATABRICKS_URL]/jobs/[IND_ID]?o=[NUMID]
66+
67+
=== Plan should still contain foo and bar
68+
>>> [CLI] bundle plan
69+
create jobs.bar
70+
create jobs.baz
71+
create jobs.foo
72+
73+
=== Expecting no difference in the output between first and second deploy
74+
>>> print_requests
75+
{
76+
"body": {
77+
"deployment": {
78+
"kind": "BUNDLE",
79+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
80+
},
81+
"description": "INJECT_ERROR",
82+
"edit_mode": "UI_LOCKED",
83+
"format": "MULTI_TASK",
84+
"max_concurrent_runs": 1,
85+
"name": "foo",
86+
"queue": {
87+
"enabled": true
88+
}
89+
},
90+
"method": "POST",
91+
"path": "/api/2.2/jobs/create"
92+
}
93+
94+
>>> [CLI] bundle destroy --auto-approve
95+
The following resources will be deleted:
96+
delete job independent
97+
98+
All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/test-bundle/default
99+
100+
Deleting files...
101+
Destroy complete!
102+
103+
>>> print_requests
104+
{
105+
"body": {
106+
"job_id": [IND_ID]
107+
},
108+
"method": "POST",
109+
"path": "/api/2.2/jobs/delete"
110+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
print_requests() {
2+
jq --sort-keys 'select(.method != "GET" and (.path | contains("/jobs")))' < out.requests.txt
3+
rm out.requests.txt
4+
}
5+
6+
print_sorted_requests() {
7+
jq -c < out.requests.txt | sort | jq --sort-keys 'select(.method != "GET" and (.path | contains("/jobs")))'
8+
rm out.requests.txt
9+
}
10+
11+
echo "*" > .gitignore
12+
# In direct, plan is sorted by deployment order. In TF it comes back sorted alphabetically.
13+
# I think deployment order is better, sorting here to remove the mismatch
14+
trace $CLI bundle plan 2>&1 | sort
15+
musterr $CLI bundle deploy &> out.deploy.$DATABRICKS_CLI_DEPLOYMENT.txt
16+
trace print_sorted_requests
17+
18+
trace $CLI bundle summary
19+
20+
ind_id=`read_id.py jobs independent`
21+
echo "$ind_id:IND_ID" >> ACC_REPLS
22+
23+
title "Plan should still contain foo and bar"
24+
trace $CLI bundle plan 2>&1 | sort
25+
26+
rm out.requests.txt
27+
musterr $CLI bundle deploy &> out.deploy2.$DATABRICKS_CLI_DEPLOYMENT.txt
28+
title "Expecting no difference in the output between first and second deploy"
29+
diff.py out.deploy.$DATABRICKS_CLI_DEPLOYMENT.txt out.deploy2.$DATABRICKS_CLI_DEPLOYMENT.txt
30+
rm out.deploy2.$DATABRICKS_CLI_DEPLOYMENT.txt
31+
trace print_requests
32+
33+
trace $CLI bundle destroy --auto-approve
34+
trace print_requests

acceptance/bundle/resources/jobs/create-error/output.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Warning: required field "new_cluster" is not set
66

77
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files...
88
Deploying resources...
9-
Error: deploying jobs.foo: creating: Shared job cluster feature is only supported in multi-task jobs. (400 INVALID_PARAMETER_VALUE)
9+
Error: cannot create jobs.foo: Shared job cluster feature is only supported in multi-task jobs. (400 INVALID_PARAMETER_VALUE)
1010

1111
Endpoint: POST [DATABRICKS_URL]/api/2.2/jobs/create
1212
HTTP Status: 400 Bad Request

bundle/terranova/apply.go

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (m *terranovaApplyMutator) Apply(ctx context.Context, b *bundle.Bundle) dia
4646

4747
for _, action := range b.Plan.Actions {
4848
node := nodeKey{action.Group, action.Name}
49-
plannedActionsMap[nodeKey{action.Group, action.Name}] = action.ActionType
49+
plannedActionsMap[node] = action.ActionType
5050
if !g.HasNode(node) {
5151
if action.ActionType == deployplan.ActionTypeDelete {
5252
// it is expected that this node is not seen by makeResourceGraph
@@ -69,21 +69,29 @@ func (m *terranovaApplyMutator) Apply(ctx context.Context, b *bundle.Bundle) dia
6969

7070
client := b.WorkspaceClient()
7171

72-
g.Run(defaultParallelism, func(node nodeKey) {
73-
// TODO: if a given node fails, all downstream nodes should not be run. We should report those nodes.
72+
g.Run(defaultParallelism, func(node nodeKey, failedDependency *nodeKey) bool {
73+
actionType := plannedActionsMap[node]
74+
75+
errorPrefix := fmt.Sprintf("cannot %s %s.%s", actionType.String(), node.Group, node.Name)
76+
77+
// If a dependency failed, report and skip execution for this node by returning false
78+
if failedDependency != nil {
79+
logdiag.LogError(ctx, fmt.Errorf("%s: dependency failed: %s", errorPrefix, failedDependency.String()))
80+
return false
81+
}
82+
7483
// TODO: ensure that config for this node is fully resolved at this point.
7584

7685
settings, ok := SupportedResources[node.Group]
7786
if !ok {
78-
return
87+
// Unexpected, this should be filtered at plan.
88+
return false
7989
}
8090

81-
actionType := plannedActionsMap[node]
82-
8391
// The way plan currently works, is that it does not add resources with Noop action, turning them into Unset.
8492
// So we skip both, although at this point we will not see Noop here.
8593
if actionType == deployplan.ActionTypeUnset || actionType == deployplan.ActionTypeNoop {
86-
return
94+
return true
8795
}
8896

8997
d := Deployer{
@@ -97,55 +105,59 @@ func (m *terranovaApplyMutator) Apply(ctx context.Context, b *bundle.Bundle) dia
97105
if actionType == deployplan.ActionTypeDelete {
98106
err = d.destroy(ctx)
99107
if err != nil {
100-
logdiag.LogError(ctx, fmt.Errorf("destroying %s.%s: %w", d.group, d.resourceName, err))
108+
logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err))
109+
return false
101110
}
102-
return
111+
return true
103112
}
104113

105114
config, ok := b.GetResourceConfig(node.Group, node.Name)
106115
if !ok {
107-
logdiag.LogError(ctx, fmt.Errorf("internal error when reading config for %s.%s", node.Group, node.Name))
108-
return
116+
logdiag.LogError(ctx, fmt.Errorf("%s: internal error when reading config", errorPrefix))
117+
return false
109118
}
110119

111120
// Fetch the references to ensure all are resolved
112121
myReferences, err := extractReferences(b.Config.Value(), node)
113122
if err != nil {
114-
logdiag.LogError(ctx, err)
115-
return
123+
logdiag.LogError(ctx, fmt.Errorf("%s: reading references from config: %w", errorPrefix, err))
124+
return false
116125
}
117126

118127
// At this point it's an error to have unresolved deps
119128
if len(myReferences) > 0 {
120129
// TODO: include the deps themselves in the message
121-
logdiag.LogError(ctx, fmt.Errorf("cannot deploy %s.%s due to unresolved deps", node.Group, node.Name))
122-
return
130+
logdiag.LogError(ctx, fmt.Errorf("%s: unresolved deps", errorPrefix))
131+
return false
123132
}
124133

125-
// TODO: redo plan to downgrade planned action if possible (?)
134+
// TODO: redo calcDiff to downgrade planned action if possible (?)
126135

127136
err = d.Deploy(ctx, config, actionType)
128137
if err != nil {
129-
logdiag.LogError(ctx, err)
130-
return
138+
logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err))
139+
return false
131140
}
132141

142+
// Update resources.id after successful deploy so that future ${resources...id} refs are replaced
133143
if isReferenced[node] {
134-
err = resolveIDReference(ctx, b, d.group, d.resourceName)
144+
err = resolveIDReference(ctx, b, node.Group, node.Name)
135145
if err != nil {
136-
logdiag.LogError(ctx, fmt.Errorf("failed to replace ref to resources.%s.%s.id: %w", d.group, d.resourceName, err))
137-
return
146+
// not using errorPrefix because resource was deployed
147+
logdiag.LogError(ctx, fmt.Errorf("failed to replace ref to resources.%s.%s.id: %w", node.Group, node.Name, err))
148+
return false
138149
}
139150
}
151+
152+
return true
140153
})
141154

155+
// This must run even if deploy failed:
142156
err = b.ResourceDatabase.Finalize()
143157
if err != nil {
144158
logdiag.LogError(ctx, err)
145159
}
146160

147-
// TODO: check if all planned actions were performed
148-
149161
return nil
150162
}
151163

@@ -157,18 +169,10 @@ type Deployer struct {
157169
settings ResourceSettings
158170
}
159171

160-
func (d *Deployer) Deploy(ctx context.Context, inputConfig any, actionType deployplan.ActionType) error {
161-
err := d.deploy(ctx, inputConfig, actionType)
162-
if err != nil {
163-
return fmt.Errorf("deploying %s.%s: %w", d.group, d.resourceName, err)
164-
}
165-
return nil
166-
}
167-
168172
func (d *Deployer) destroy(ctx context.Context) error {
169173
entry, hasEntry := d.db.GetResourceEntry(d.group, d.resourceName)
170174
if !hasEntry {
171-
log.Infof(ctx, "%s.%s: Cannot delete, missing from state", d.group, d.resourceName)
175+
log.Infof(ctx, "Cannot delete %s.%s: missing from state", d.group, d.resourceName)
172176
return nil
173177
}
174178

@@ -184,7 +188,7 @@ func (d *Deployer) destroy(ctx context.Context) error {
184188
return nil
185189
}
186190

187-
func (d *Deployer) deploy(ctx context.Context, inputConfig any, actionType deployplan.ActionType) error {
191+
func (d *Deployer) Deploy(ctx context.Context, inputConfig any, actionType deployplan.ActionType) error {
188192
resource, _, err := New(d.client, d.group, d.resourceName, inputConfig)
189193
if err != nil {
190194
return err
@@ -218,14 +222,15 @@ func (d *Deployer) deploy(ctx context.Context, inputConfig any, actionType deplo
218222
}
219223
return d.UpdateWithID(ctx, resource, updater, oldID, config)
220224
default:
221-
return fmt.Errorf("internal error: unexpected plan: %#v", actionType)
225+
return fmt.Errorf("internal error: unexpected actionType: %#v", actionType)
222226
}
223227
}
224228

225229
func (d *Deployer) Create(ctx context.Context, resource IResource, config any) error {
226230
newID, err := resource.DoCreate(ctx)
227231
if err != nil {
228-
return fmt.Errorf("creating: %w", err)
232+
// No need to prefix error, there is no ambiguity (only one operation - DoCreate) and no additional context (like id)
233+
return err
229234
}
230235

231236
log.Infof(ctx, "Created %s.%s id=%#v", d.group, d.resourceName, newID)
@@ -260,7 +265,7 @@ func (d *Deployer) Recreate(ctx context.Context, resource IResource, oldID strin
260265
}
261266

262267
// TODO: This should be at notice level (info < notice < warn) and it should be visible by default,
263-
// but to match terraform output today, we hide it.
268+
// but to match terraform output today, we hide it (and also we don't have notice level)
264269
log.Infof(ctx, "Recreated %s.%s id=%#v (previously %#v)", d.group, d.resourceName, newID, oldID)
265270
err = d.db.SaveState(d.group, d.resourceName, newID, config)
266271
if err != nil {

0 commit comments

Comments
 (0)