Skip to content

Commit f58fa40

Browse files
authored
direct: Use read/write locking inside DAG (#3745)
## Changes Fix locking inside DAG to use proper read/write lock. ## Why Handle multiple resources referencing one resource correctly. ## Tests New test to trigger the bug: `go test ../../.. -run ^TestAccept$/^bundle$/^resource_deps$/^id_star$/direct -count=50 -failfast` passes on this branch; on main it crashes with: ``` acceptance_test.go:803: Diff: --- bundle/resource_deps/id_star/output.txt +++ /var/folders/5y/9kkdnjw91p11vsqwk0cvmk200000gp/T/TestAcceptbundleresource_depsid_starDATABRICKS_BUNDLE_ENGINE331072887/001/output.txt @@ -1 +1,17 @@ +panic: internal DAG error, concurrent access to "resources.jobs.a" + +goroutine 49 [running]: +github.com/databricks/cli/bundle/deployplan.(*Plan).LockEntry(0x140004d01e0, {0x1400000e6a0, 0x10}) + /Users/denis.bilenko/work/cli-side/bundle/deployplan/plan.go:86 +0x1bc +github.com/databricks/cli/bundle/direct.(*DeploymentBundle).LookupReferenceLocal(0x1400023c828, {0x13?, 0x3?}, 0x1400061a2a0) + /Users/denis.bilenko/work/cli-side/bundle/direct/bundle_plan.go:284 +0x80 +github.com/databricks/cli/bundle/direct.(*DeploymentBundle).resolveReferences(0x1400023c828, {0x1038fec58, 0x140005c12c0}, 0x1400049e840, {0x1400004c1e0, 0x1c}, 0x1) + /Users/denis.bilenko/work/cli-side/bundle/direct/bundle_plan.go:387 +0x36c +github.com/databricks/cli/bundle/direct.(*DeploymentBundle).CalculatePlan.func1({0x140003bdff0, 0x10}, 0x0) + /Users/denis.bilenko/work/cli-side/bundle/direct/bundle_plan.go:119 +0x1b8 +github.com/databricks/cli/libs/dagrun.runWorkerLoop(0x0?, 0x140006ae310, 0x140006ae380, 0x140004ac330) + /Users/denis.bilenko/work/cli-side/libs/dagrun/dagrun.go:235 +0xb8 +created by github.com/databricks/cli/libs/dagrun.(*Graph).Run in goroutine 1 + /Users/denis.bilenko/work/cli-side/libs/dagrun/dagrun.go:184 +0x3d8 + +Exit code: 2 ```
1 parent 10fb7e8 commit f58fa40

File tree

10 files changed

+270
-26
lines changed

10 files changed

+270
-26
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
2+
resources:
3+
jobs:
4+
a:
5+
name: aa
6+
description: aa_desc
7+
b:
8+
name: bb
9+
description: prefix ${resources.jobs.a.id}
10+
c:
11+
name: cc
12+
description: prefix ${resources.jobs.a.id}
13+
d:
14+
name: dd
15+
description: prefix ${resources.jobs.a.id}
16+
e:
17+
name: ee
18+
description: prefix ${resources.jobs.a.id}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
{
2+
"plan": {
3+
"resources.jobs.a": {
4+
"action": "create",
5+
"new_state": {
6+
"config": {
7+
"deployment": {
8+
"kind": "BUNDLE",
9+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
10+
},
11+
"description": "aa_desc",
12+
"edit_mode": "UI_LOCKED",
13+
"format": "MULTI_TASK",
14+
"max_concurrent_runs": 1,
15+
"name": "aa",
16+
"queue": {
17+
"enabled": true
18+
}
19+
}
20+
}
21+
},
22+
"resources.jobs.b": {
23+
"depends_on": [
24+
{
25+
"node": "resources.jobs.a",
26+
"label": "${resources.jobs.a.id}"
27+
}
28+
],
29+
"action": "create",
30+
"new_state": {
31+
"config": {
32+
"deployment": {
33+
"kind": "BUNDLE",
34+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
35+
},
36+
"description": "prefix ${resources.jobs.a.id}",
37+
"edit_mode": "UI_LOCKED",
38+
"format": "MULTI_TASK",
39+
"max_concurrent_runs": 1,
40+
"name": "bb",
41+
"queue": {
42+
"enabled": true
43+
}
44+
},
45+
"vars": {
46+
"description": "prefix ${resources.jobs.a.id}"
47+
}
48+
}
49+
},
50+
"resources.jobs.c": {
51+
"depends_on": [
52+
{
53+
"node": "resources.jobs.a",
54+
"label": "${resources.jobs.a.id}"
55+
}
56+
],
57+
"action": "create",
58+
"new_state": {
59+
"config": {
60+
"deployment": {
61+
"kind": "BUNDLE",
62+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
63+
},
64+
"description": "prefix ${resources.jobs.a.id}",
65+
"edit_mode": "UI_LOCKED",
66+
"format": "MULTI_TASK",
67+
"max_concurrent_runs": 1,
68+
"name": "cc",
69+
"queue": {
70+
"enabled": true
71+
}
72+
},
73+
"vars": {
74+
"description": "prefix ${resources.jobs.a.id}"
75+
}
76+
}
77+
},
78+
"resources.jobs.d": {
79+
"depends_on": [
80+
{
81+
"node": "resources.jobs.a",
82+
"label": "${resources.jobs.a.id}"
83+
}
84+
],
85+
"action": "create",
86+
"new_state": {
87+
"config": {
88+
"deployment": {
89+
"kind": "BUNDLE",
90+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
91+
},
92+
"description": "prefix ${resources.jobs.a.id}",
93+
"edit_mode": "UI_LOCKED",
94+
"format": "MULTI_TASK",
95+
"max_concurrent_runs": 1,
96+
"name": "dd",
97+
"queue": {
98+
"enabled": true
99+
}
100+
},
101+
"vars": {
102+
"description": "prefix ${resources.jobs.a.id}"
103+
}
104+
}
105+
},
106+
"resources.jobs.e": {
107+
"depends_on": [
108+
{
109+
"node": "resources.jobs.a",
110+
"label": "${resources.jobs.a.id}"
111+
}
112+
],
113+
"action": "create",
114+
"new_state": {
115+
"config": {
116+
"deployment": {
117+
"kind": "BUNDLE",
118+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/test-bundle/default/state/metadata.json"
119+
},
120+
"description": "prefix ${resources.jobs.a.id}",
121+
"edit_mode": "UI_LOCKED",
122+
"format": "MULTI_TASK",
123+
"max_concurrent_runs": 1,
124+
"name": "ee",
125+
"queue": {
126+
"enabled": true
127+
}
128+
},
129+
"vars": {
130+
"description": "prefix ${resources.jobs.a.id}"
131+
}
132+
}
133+
}
134+
}
135+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"plan": {
3+
"resources.jobs.a": {
4+
"action": "create"
5+
},
6+
"resources.jobs.b": {
7+
"action": "create"
8+
},
9+
"resources.jobs.c": {
10+
"action": "create"
11+
},
12+
"resources.jobs.d": {
13+
"action": "create"
14+
},
15+
"resources.jobs.e": {
16+
"action": "create"
17+
}
18+
}
19+
}

acceptance/bundle/resource_deps/id_star/out.test.toml

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

acceptance/bundle/resource_deps/id_star/output.txt

Whitespace-only changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
$CLI bundle debug plan > out.plan_create.$DATABRICKS_BUNDLE_ENGINE.json
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
RecordRequests = false

bundle/deployplan/plan.go

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ type Plan struct {
1919
// - Store a path to state file
2020
Plan map[string]*PlanEntry `json:"plan,omitzero"`
2121

22-
mutex sync.Mutex `json:"-"`
23-
locks map[string]bool `json:"-"`
22+
mutex sync.Mutex `json:"-"`
23+
lockmap lockmap `json:"-"`
2424
}
2525

2626
func NewPlan() *Plan {
2727
return &Plan{
28-
Plan: make(map[string]*PlanEntry),
29-
locks: make(map[string]bool),
28+
Plan: make(map[string]*PlanEntry),
29+
lockmap: newLockmap(),
3030
}
3131
}
3232

@@ -75,31 +75,79 @@ func (p *Plan) GetActions() []Action {
7575
return actions
7676
}
7777

78-
// LockEntry returns *PlanEntry; subsequent calls before UnlockEntry() with the same resourceKey will panic.
79-
func (p *Plan) LockEntry(resourceKey string) *PlanEntry {
78+
func (p *Plan) WriteLockEntry(resourceKey string) (*PlanEntry, error) {
8079
p.mutex.Lock()
8180
defer p.mutex.Unlock()
8281

83-
entry, ok := p.Plan[resourceKey]
84-
if ok {
85-
if p.locks[resourceKey] {
86-
panic(fmt.Sprintf("internal DAG error, concurrent access to %q", resourceKey))
87-
}
88-
p.locks[resourceKey] = true
89-
return entry
82+
if p.lockmap.TryLock(resourceKey) {
83+
return p.Plan[resourceKey], nil
84+
}
85+
86+
return nil, fmt.Errorf("write lock: concurrent access to %q", resourceKey)
87+
}
88+
89+
func (p *Plan) ReadLockEntry(resourceKey string) (*PlanEntry, error) {
90+
p.mutex.Lock()
91+
defer p.mutex.Unlock()
92+
93+
if p.lockmap.TryRLock(resourceKey) {
94+
return p.Plan[resourceKey], nil
9095
}
96+
return nil, fmt.Errorf("read lock: concurrent access to %q", resourceKey)
97+
}
9198

92-
return nil
99+
func (p *Plan) WriteUnlockEntry(resourceKey string) {
100+
p.mutex.Lock()
101+
defer p.mutex.Unlock()
102+
p.lockmap.Unlock(resourceKey)
93103
}
94104

95-
func (p *Plan) UnlockEntry(resourceKey string) {
105+
func (p *Plan) ReadUnlockEntry(resourceKey string) {
96106
p.mutex.Lock()
97107
defer p.mutex.Unlock()
98-
p.locks[resourceKey] = false
108+
p.lockmap.RUnlock(resourceKey)
99109
}
100110

101111
func (p *Plan) RemoveEntry(resourceKey string) {
102112
p.mutex.Lock()
103113
defer p.mutex.Unlock()
104114
delete(p.Plan, resourceKey)
105115
}
116+
117+
type lockmap struct {
118+
state map[string]int
119+
}
120+
121+
func newLockmap() lockmap {
122+
return lockmap{
123+
state: make(map[string]int),
124+
}
125+
}
126+
127+
func (p *lockmap) TryLock(resourceKey string) bool {
128+
if p.state[resourceKey] == 0 {
129+
p.state[resourceKey] = -1
130+
return true
131+
}
132+
return false
133+
}
134+
135+
func (p *lockmap) Unlock(resourceKey string) {
136+
if p.state[resourceKey] == -1 {
137+
p.state[resourceKey] = 0
138+
}
139+
}
140+
141+
func (p *lockmap) TryRLock(resourceKey string) bool {
142+
if p.state[resourceKey] >= 0 {
143+
p.state[resourceKey] += 1
144+
return true
145+
}
146+
return false
147+
}
148+
149+
func (p *lockmap) RUnlock(resourceKey string) {
150+
if p.state[resourceKey] > 0 {
151+
p.state[resourceKey] -= 1
152+
}
153+
}

bundle/direct/bundle_apply.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,19 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa
3333
}
3434

3535
g.Run(defaultParallelism, func(resourceKey string, failedDependency *string) bool {
36-
entry := plan.LockEntry(resourceKey)
37-
defer plan.UnlockEntry(resourceKey)
36+
entry, err := plan.WriteLockEntry(resourceKey)
37+
if err != nil {
38+
logdiag.LogError(ctx, fmt.Errorf("%s: internal error: %w", resourceKey, err))
39+
return false
40+
}
3841

3942
if entry == nil {
40-
logdiag.LogError(ctx, fmt.Errorf("internal error: node not in graph: %q", resourceKey))
43+
logdiag.LogError(ctx, fmt.Errorf("%s: internal error: node not in graph", resourceKey))
4144
return false
4245
}
4346

47+
defer plan.WriteUnlockEntry(resourceKey)
48+
4449
action := entry.Action
4550
errorPrefix := fmt.Sprintf("cannot %s %s", action, resourceKey)
4651

@@ -133,13 +138,17 @@ func (b *DeploymentBundle) LookupReferenceRemote(ctx context.Context, path *stru
133138
fieldPath := path.SkipPrefix(3)
134139
fieldPathS := fieldPath.String()
135140

136-
targetEntry := b.Plan.LockEntry(targetResourceKey)
137-
defer b.Plan.UnlockEntry(targetResourceKey)
141+
targetEntry, err := b.Plan.ReadLockEntry(targetResourceKey)
142+
if err != nil {
143+
return nil, err
144+
}
138145

139146
if targetEntry == nil {
140147
return nil, fmt.Errorf("internal error: %s: missing entry in the plan", targetResourceKey)
141148
}
142149

150+
defer b.Plan.ReadUnlockEntry(targetResourceKey)
151+
143152
targetAction := deployplan.ActionTypeFromString(targetEntry.Action)
144153
if targetAction == deployplan.ActionTypeUnset {
145154
return nil, fmt.Errorf("internal error: %s: missing action in the plan", targetResourceKey)

bundle/direct/bundle_plan.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,19 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks
7272
g.Run(defaultParallelism, func(resourceKey string, failedDependency *string) bool {
7373
errorPrefix := "cannot plan " + resourceKey
7474

75-
entry := plan.LockEntry(resourceKey)
76-
defer plan.UnlockEntry(resourceKey)
75+
entry, err := plan.WriteLockEntry(resourceKey)
76+
if err != nil {
77+
logdiag.LogError(ctx, fmt.Errorf("%s: internal error: %w", resourceKey, err))
78+
return false
79+
}
7780

7881
if entry == nil {
7982
logdiag.LogError(ctx, fmt.Errorf("%s: internal error: node not in graph", resourceKey))
8083
return false
8184
}
8285

86+
defer plan.WriteUnlockEntry(resourceKey)
87+
8388
if failedDependency != nil {
8489
logdiag.LogError(ctx, fmt.Errorf("%s: dependency failed: %s", errorPrefix, *failedDependency))
8590
return false
@@ -280,14 +285,17 @@ func (b *DeploymentBundle) LookupReferenceLocal(ctx context.Context, path *struc
280285
fieldPath := path.SkipPrefix(3)
281286
fieldPathS := fieldPath.String()
282287

283-
// TODO: this will panic if targetResourceKey == resourceKey
284-
targetEntry := b.Plan.LockEntry(targetResourceKey)
285-
defer b.Plan.UnlockEntry(targetResourceKey)
288+
targetEntry, err := b.Plan.ReadLockEntry(targetResourceKey)
289+
if err != nil {
290+
return nil, err
291+
}
286292

287293
if targetEntry == nil {
288294
return nil, fmt.Errorf("internal error: %s: missing entry in the plan", targetResourceKey)
289295
}
290296

297+
defer b.Plan.ReadUnlockEntry(targetResourceKey)
298+
291299
targetAction := deployplan.ActionTypeFromString(targetEntry.Action)
292300
if targetAction == deployplan.ActionTypeUnset {
293301
return nil, fmt.Errorf("internal error: %s: missing action in the plan", targetResourceKey)

0 commit comments

Comments
 (0)