Skip to content

Commit 775251d

Browse files
authored
Emit an error when incompatible all purpose cluster used with Python wheel tasks (#823)
## Changes Follow up for #807 to also validate configuration if existing cluster id is used. ## Tests Added unit tests
1 parent 4226c88 commit 775251d

File tree

2 files changed

+185
-2
lines changed

2 files changed

+185
-2
lines changed

bundle/python/warning.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package python
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67

78
"github.com/databricks/cli/bundle"
89
"github.com/databricks/cli/bundle/libraries"
9-
"github.com/databricks/cli/libs/cmdio"
10+
"github.com/databricks/cli/libs/log"
11+
"github.com/databricks/databricks-sdk-go"
1012
"golang.org/x/mod/semver"
1113
)
1214

@@ -19,7 +21,7 @@ func WrapperWarning() bundle.Mutator {
1921

2022
func (m *wrapperWarning) Apply(ctx context.Context, b *bundle.Bundle) error {
2123
if hasIncompatibleWheelTasks(ctx, b) {
22-
cmdio.LogString(ctx, "Python wheel tasks with local libraries require compute with DBR 13.1+. Please change your cluster configuration or set experimental 'python_wheel_wrapper' setting to 'true'")
24+
return fmt.Errorf("python wheel tasks with local libraries require compute with DBR 13.1+. Please change your cluster configuration or set experimental 'python_wheel_wrapper' setting to 'true'")
2325
}
2426
return nil
2527
}
@@ -44,6 +46,20 @@ func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
4446
}
4547
}
4648
}
49+
50+
if task.ExistingClusterId != "" {
51+
version, err := getSparkVersionForCluster(ctx, b.WorkspaceClient(), task.ExistingClusterId)
52+
53+
// If there's error getting spark version for cluster, do not mark it as incompatible
54+
if err != nil {
55+
log.Warnf(ctx, "unable to get spark version for cluster %s, err: %s", task.ExistingClusterId, err.Error())
56+
return false
57+
}
58+
59+
if lowerThanExpectedVersion(ctx, version) {
60+
return true
61+
}
62+
}
4763
}
4864

4965
return false
@@ -63,3 +79,12 @@ func lowerThanExpectedVersion(ctx context.Context, sparkVersion string) bool {
6379
func (m *wrapperWarning) Name() string {
6480
return "PythonWrapperWarning"
6581
}
82+
83+
func getSparkVersionForCluster(ctx context.Context, w *databricks.WorkspaceClient, clusterId string) (string, error) {
84+
details, err := w.Clusters.GetByClusterId(ctx, clusterId)
85+
if err != nil {
86+
return "", err
87+
}
88+
89+
return details.SparkVersion, nil
90+
}

bundle/python/warning_test.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,117 @@ import (
1212
"github.com/stretchr/testify/require"
1313
)
1414

15+
type MockClusterService struct{}
16+
17+
// ChangeOwner implements compute.ClustersService.
18+
func (MockClusterService) ChangeOwner(ctx context.Context, request compute.ChangeClusterOwner) error {
19+
panic("unimplemented")
20+
}
21+
22+
// Create implements compute.ClustersService.
23+
func (MockClusterService) Create(ctx context.Context, request compute.CreateCluster) (*compute.CreateClusterResponse, error) {
24+
panic("unimplemented")
25+
}
26+
27+
// Delete implements compute.ClustersService.
28+
func (MockClusterService) Delete(ctx context.Context, request compute.DeleteCluster) error {
29+
panic("unimplemented")
30+
}
31+
32+
// Edit implements compute.ClustersService.
33+
func (MockClusterService) Edit(ctx context.Context, request compute.EditCluster) error {
34+
panic("unimplemented")
35+
}
36+
37+
// Events implements compute.ClustersService.
38+
func (MockClusterService) Events(ctx context.Context, request compute.GetEvents) (*compute.GetEventsResponse, error) {
39+
panic("unimplemented")
40+
}
41+
42+
// Get implements compute.ClustersService.
43+
func (MockClusterService) Get(ctx context.Context, request compute.GetClusterRequest) (*compute.ClusterDetails, error) {
44+
clusterDetails := map[string]*compute.ClusterDetails{
45+
"test-key-1": {
46+
SparkVersion: "12.2.x-scala2.12",
47+
},
48+
"test-key-2": {
49+
SparkVersion: "13.2.x-scala2.12",
50+
},
51+
}
52+
53+
return clusterDetails[request.ClusterId], nil
54+
}
55+
56+
// GetPermissionLevels implements compute.ClustersService.
57+
func (MockClusterService) GetPermissionLevels(ctx context.Context, request compute.GetClusterPermissionLevelsRequest) (*compute.GetClusterPermissionLevelsResponse, error) {
58+
panic("unimplemented")
59+
}
60+
61+
// GetPermissions implements compute.ClustersService.
62+
func (MockClusterService) GetPermissions(ctx context.Context, request compute.GetClusterPermissionsRequest) (*compute.ClusterPermissions, error) {
63+
panic("unimplemented")
64+
}
65+
66+
// List implements compute.ClustersService.
67+
func (MockClusterService) List(ctx context.Context, request compute.ListClustersRequest) (*compute.ListClustersResponse, error) {
68+
panic("unimplemented")
69+
}
70+
71+
// ListNodeTypes implements compute.ClustersService.
72+
func (MockClusterService) ListNodeTypes(ctx context.Context) (*compute.ListNodeTypesResponse, error) {
73+
panic("unimplemented")
74+
}
75+
76+
// ListZones implements compute.ClustersService.
77+
func (MockClusterService) ListZones(ctx context.Context) (*compute.ListAvailableZonesResponse, error) {
78+
panic("unimplemented")
79+
}
80+
81+
// PermanentDelete implements compute.ClustersService.
82+
func (MockClusterService) PermanentDelete(ctx context.Context, request compute.PermanentDeleteCluster) error {
83+
panic("unimplemented")
84+
}
85+
86+
// Pin implements compute.ClustersService.
87+
func (MockClusterService) Pin(ctx context.Context, request compute.PinCluster) error {
88+
panic("unimplemented")
89+
}
90+
91+
// Resize implements compute.ClustersService.
92+
func (MockClusterService) Resize(ctx context.Context, request compute.ResizeCluster) error {
93+
panic("unimplemented")
94+
}
95+
96+
// Restart implements compute.ClustersService.
97+
func (MockClusterService) Restart(ctx context.Context, request compute.RestartCluster) error {
98+
panic("unimplemented")
99+
}
100+
101+
// SetPermissions implements compute.ClustersService.
102+
func (MockClusterService) SetPermissions(ctx context.Context, request compute.ClusterPermissionsRequest) (*compute.ClusterPermissions, error) {
103+
panic("unimplemented")
104+
}
105+
106+
// SparkVersions implements compute.ClustersService.
107+
func (MockClusterService) SparkVersions(ctx context.Context) (*compute.GetSparkVersionsResponse, error) {
108+
panic("unimplemented")
109+
}
110+
111+
// Start implements compute.ClustersService.
112+
func (MockClusterService) Start(ctx context.Context, request compute.StartCluster) error {
113+
panic("unimplemented")
114+
}
115+
116+
// Unpin implements compute.ClustersService.
117+
func (MockClusterService) Unpin(ctx context.Context, request compute.UnpinCluster) error {
118+
panic("unimplemented")
119+
}
120+
121+
// UpdatePermissions implements compute.ClustersService.
122+
func (MockClusterService) UpdatePermissions(ctx context.Context, request compute.ClusterPermissionsRequest) (*compute.ClusterPermissions, error) {
123+
panic("unimplemented")
124+
}
125+
15126
func TestIncompatibleWheelTasksWithNewCluster(t *testing.T) {
16127
b := &bundle.Bundle{
17128
Config: config.Root{
@@ -100,6 +211,43 @@ func TestIncompatibleWheelTasksWithJobClusterKey(t *testing.T) {
100211
require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
101212
}
102213

214+
func TestIncompatibleWheelTasksWithExistingClusterId(t *testing.T) {
215+
b := &bundle.Bundle{
216+
Config: config.Root{
217+
Resources: config.Resources{
218+
Jobs: map[string]*resources.Job{
219+
"job1": {
220+
JobSettings: &jobs.JobSettings{
221+
Tasks: []jobs.Task{
222+
{
223+
TaskKey: "key1",
224+
PythonWheelTask: &jobs.PythonWheelTask{},
225+
ExistingClusterId: "test-key-1",
226+
Libraries: []compute.Library{
227+
{Whl: "./dist/test.whl"},
228+
},
229+
},
230+
{
231+
TaskKey: "key2",
232+
PythonWheelTask: &jobs.PythonWheelTask{},
233+
ExistingClusterId: "test-key-2",
234+
Libraries: []compute.Library{
235+
{Whl: "./dist/test.whl"},
236+
},
237+
},
238+
},
239+
},
240+
},
241+
},
242+
},
243+
},
244+
}
245+
246+
b.WorkspaceClient().Clusters.WithImpl(MockClusterService{})
247+
248+
require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
249+
}
250+
103251
func TestNoIncompatibleWheelTasks(t *testing.T) {
104252
b := &bundle.Bundle{
105253
Config: config.Root{
@@ -168,6 +316,14 @@ func TestNoIncompatibleWheelTasks(t *testing.T) {
168316
{Whl: "./dist/test.whl"},
169317
},
170318
},
319+
{
320+
TaskKey: "key6",
321+
PythonWheelTask: &jobs.PythonWheelTask{},
322+
ExistingClusterId: "test-key-2",
323+
Libraries: []compute.Library{
324+
{Whl: "./dist/test.whl"},
325+
},
326+
},
171327
},
172328
},
173329
},
@@ -176,6 +332,8 @@ func TestNoIncompatibleWheelTasks(t *testing.T) {
176332
},
177333
}
178334

335+
b.WorkspaceClient().Clusters.WithImpl(MockClusterService{})
336+
179337
require.False(t, hasIncompatibleWheelTasks(context.Background(), b))
180338
}
181339

0 commit comments

Comments
 (0)