Skip to content

Commit fe390a0

Browse files
andrewnesterhectorcast-db
authored andcommitted
Added a warning when Python wheel wrapper needs to be used (#807)
## Changes Added a warning when Python wheel wrapper needs to be used ## Tests Added unit tests + manual run with different bundle configurations
1 parent a108d1c commit fe390a0

File tree

3 files changed

+266
-0
lines changed

3 files changed

+266
-0
lines changed

bundle/phases/initialize.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/databricks/cli/bundle/config/mutator"
88
"github.com/databricks/cli/bundle/config/variable"
99
"github.com/databricks/cli/bundle/deploy/terraform"
10+
"github.com/databricks/cli/bundle/python"
1011
"github.com/databricks/cli/bundle/scripts"
1112
)
1213

@@ -31,6 +32,7 @@ func Initialize() bundle.Mutator {
3132
mutator.OverrideCompute(),
3233
mutator.ProcessTargetMode(),
3334
mutator.TranslatePaths(),
35+
python.WrapperWarning(),
3436
terraform.Initialize(),
3537
scripts.Execute(config.ScriptPostInit),
3638
},

bundle/python/warning.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package python
2+
3+
import (
4+
"context"
5+
"strings"
6+
7+
"github.com/databricks/cli/bundle"
8+
"github.com/databricks/cli/bundle/libraries"
9+
"github.com/databricks/cli/libs/cmdio"
10+
"golang.org/x/mod/semver"
11+
)
12+
13+
type wrapperWarning struct {
14+
}
15+
16+
func WrapperWarning() bundle.Mutator {
17+
return &wrapperWarning{}
18+
}
19+
20+
func (m *wrapperWarning) Apply(ctx context.Context, b *bundle.Bundle) error {
21+
if hasIncompatibleWheelTasks(ctx, b) {
22+
cmdio.LogString(ctx, "Python wheel tasks with local libraries require compute with DBR 13.1+. Please change your cluster configuration or set experimental 'python_wheel_wrapper' setting to 'true'")
23+
}
24+
return nil
25+
}
26+
27+
func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
28+
tasks := libraries.FindAllWheelTasksWithLocalLibraries(b)
29+
for _, task := range tasks {
30+
if task.NewCluster != nil {
31+
if lowerThanExpectedVersion(ctx, task.NewCluster.SparkVersion) {
32+
return true
33+
}
34+
}
35+
36+
if task.JobClusterKey != "" {
37+
for _, job := range b.Config.Resources.Jobs {
38+
for _, cluster := range job.JobClusters {
39+
if task.JobClusterKey == cluster.JobClusterKey && cluster.NewCluster != nil {
40+
if lowerThanExpectedVersion(ctx, cluster.NewCluster.SparkVersion) {
41+
return true
42+
}
43+
}
44+
}
45+
}
46+
}
47+
}
48+
49+
return false
50+
}
51+
52+
func lowerThanExpectedVersion(ctx context.Context, sparkVersion string) bool {
53+
parts := strings.Split(sparkVersion, ".")
54+
if len(parts) < 2 {
55+
return false
56+
}
57+
58+
v := "v" + parts[0] + "." + parts[1]
59+
return semver.Compare(v, "v13.1") < 0
60+
}
61+
62+
// Name implements bundle.Mutator.
63+
func (m *wrapperWarning) Name() string {
64+
return "PythonWrapperWarning"
65+
}

bundle/python/warning_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package python
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/databricks/cli/bundle"
8+
"github.com/databricks/cli/bundle/config"
9+
"github.com/databricks/cli/bundle/config/resources"
10+
"github.com/databricks/databricks-sdk-go/service/compute"
11+
"github.com/databricks/databricks-sdk-go/service/jobs"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestIncompatibleWheelTasksWithNewCluster(t *testing.T) {
16+
b := &bundle.Bundle{
17+
Config: config.Root{
18+
Resources: config.Resources{
19+
Jobs: map[string]*resources.Job{
20+
"job1": {
21+
JobSettings: &jobs.JobSettings{
22+
Tasks: []jobs.Task{
23+
{
24+
TaskKey: "key1",
25+
PythonWheelTask: &jobs.PythonWheelTask{},
26+
NewCluster: &compute.ClusterSpec{
27+
SparkVersion: "12.2.x-scala2.12",
28+
},
29+
Libraries: []compute.Library{
30+
{Whl: "./dist/test.whl"},
31+
},
32+
},
33+
{
34+
TaskKey: "key2",
35+
PythonWheelTask: &jobs.PythonWheelTask{},
36+
NewCluster: &compute.ClusterSpec{
37+
SparkVersion: "13.1.x-scala2.12",
38+
},
39+
Libraries: []compute.Library{
40+
{Whl: "./dist/test.whl"},
41+
},
42+
},
43+
},
44+
},
45+
},
46+
},
47+
},
48+
},
49+
}
50+
51+
require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
52+
}
53+
54+
func TestIncompatibleWheelTasksWithJobClusterKey(t *testing.T) {
55+
b := &bundle.Bundle{
56+
Config: config.Root{
57+
Resources: config.Resources{
58+
Jobs: map[string]*resources.Job{
59+
"job1": {
60+
JobSettings: &jobs.JobSettings{
61+
JobClusters: []jobs.JobCluster{
62+
{
63+
JobClusterKey: "cluster1",
64+
NewCluster: &compute.ClusterSpec{
65+
SparkVersion: "12.2.x-scala2.12",
66+
},
67+
},
68+
{
69+
JobClusterKey: "cluster2",
70+
NewCluster: &compute.ClusterSpec{
71+
SparkVersion: "13.1.x-scala2.12",
72+
},
73+
},
74+
},
75+
Tasks: []jobs.Task{
76+
{
77+
TaskKey: "key1",
78+
PythonWheelTask: &jobs.PythonWheelTask{},
79+
JobClusterKey: "cluster1",
80+
Libraries: []compute.Library{
81+
{Whl: "./dist/test.whl"},
82+
},
83+
},
84+
{
85+
TaskKey: "key2",
86+
PythonWheelTask: &jobs.PythonWheelTask{},
87+
JobClusterKey: "cluster2",
88+
Libraries: []compute.Library{
89+
{Whl: "./dist/test.whl"},
90+
},
91+
},
92+
},
93+
},
94+
},
95+
},
96+
},
97+
},
98+
}
99+
100+
require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
101+
}
102+
103+
func TestNoIncompatibleWheelTasks(t *testing.T) {
104+
b := &bundle.Bundle{
105+
Config: config.Root{
106+
Resources: config.Resources{
107+
Jobs: map[string]*resources.Job{
108+
"job1": {
109+
JobSettings: &jobs.JobSettings{
110+
JobClusters: []jobs.JobCluster{
111+
{
112+
JobClusterKey: "cluster1",
113+
NewCluster: &compute.ClusterSpec{
114+
SparkVersion: "12.2.x-scala2.12",
115+
},
116+
},
117+
{
118+
JobClusterKey: "cluster2",
119+
NewCluster: &compute.ClusterSpec{
120+
SparkVersion: "13.1.x-scala2.12",
121+
},
122+
},
123+
},
124+
Tasks: []jobs.Task{
125+
{
126+
TaskKey: "key1",
127+
PythonWheelTask: &jobs.PythonWheelTask{},
128+
NewCluster: &compute.ClusterSpec{
129+
SparkVersion: "12.2.x-scala2.12",
130+
},
131+
Libraries: []compute.Library{
132+
{Whl: "/Workspace/Users/[email protected]/dist/test.whl"},
133+
},
134+
},
135+
{
136+
TaskKey: "key2",
137+
PythonWheelTask: &jobs.PythonWheelTask{},
138+
NewCluster: &compute.ClusterSpec{
139+
SparkVersion: "13.3.x-scala2.12",
140+
},
141+
Libraries: []compute.Library{
142+
{Whl: "./dist/test.whl"},
143+
},
144+
},
145+
{
146+
TaskKey: "key3",
147+
PythonWheelTask: &jobs.PythonWheelTask{},
148+
NewCluster: &compute.ClusterSpec{
149+
SparkVersion: "12.2.x-scala2.12",
150+
},
151+
Libraries: []compute.Library{
152+
{Whl: "dbfs:/dist/test.whl"},
153+
},
154+
},
155+
{
156+
TaskKey: "key4",
157+
PythonWheelTask: &jobs.PythonWheelTask{},
158+
JobClusterKey: "cluster1",
159+
Libraries: []compute.Library{
160+
{Whl: "/Workspace/Users/[email protected]/dist/test.whl"},
161+
},
162+
},
163+
{
164+
TaskKey: "key5",
165+
PythonWheelTask: &jobs.PythonWheelTask{},
166+
JobClusterKey: "cluster2",
167+
Libraries: []compute.Library{
168+
{Whl: "./dist/test.whl"},
169+
},
170+
},
171+
},
172+
},
173+
},
174+
},
175+
},
176+
},
177+
}
178+
179+
require.False(t, hasIncompatibleWheelTasks(context.Background(), b))
180+
}
181+
182+
func TestSparkVersionLowerThanExpected(t *testing.T) {
183+
testCases := map[string]bool{
184+
"13.1.x-scala2.12": false,
185+
"13.2.x-scala2.12": false,
186+
"13.3.x-scala2.12": false,
187+
"14.0.x-scala2.12": false,
188+
"14.1.x-scala2.12": false,
189+
"10.4.x-aarch64-photon-scala2.12": true,
190+
"10.4.x-scala2.12": true,
191+
"13.0.x-scala2.12": true,
192+
"5.0.x-rc-gpu-ml-scala2.11": true,
193+
}
194+
195+
for k, v := range testCases {
196+
result := lowerThanExpectedVersion(context.Background(), k)
197+
require.Equal(t, v, result, k)
198+
}
199+
}

0 commit comments

Comments
 (0)