Skip to content

Commit c24388e

Browse files
Migrate library read to use go-sdk (#2985)
* update * update * update
1 parent f035814 commit c24388e

File tree

3 files changed

+234
-5
lines changed

3 files changed

+234
-5
lines changed

clusters/resource_library.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ func ResourceLibrary() *schema.Resource {
2020
}
2121
return m
2222
})
23+
libraySdkSchema := common.StructToSchema(compute.Library{}, func(m map[string]*schema.Schema) map[string]*schema.Schema {
24+
m["cluster_id"] = &schema.Schema{
25+
Type: schema.TypeString,
26+
Required: true,
27+
}
28+
return m
29+
})
2330
parseId := func(id string) (string, string) {
2431
split := strings.SplitN(id, "/", 2)
2532
if len(split) != 2 {
@@ -28,7 +35,7 @@ func ResourceLibrary() *schema.Resource {
2835
return split[0], split[1]
2936
}
3037
return common.Resource{
31-
Schema: s,
38+
Schema: libraySdkSchema,
3239
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
3340
clusterID := d.Get("cluster_id").(string)
3441
err := NewClustersAPI(ctx, c).Start(clusterID)
@@ -58,18 +65,21 @@ func ResourceLibrary() *schema.Resource {
5865
},
5966
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
6067
clusterID, libraryRep := parseId(d.Id())
61-
cll, err := libraries.NewLibrariesAPI(ctx, c).WaitForLibrariesInstalled(libraries.Wait{
68+
w, err := c.WorkspaceClient()
69+
if err != nil {
70+
return err
71+
}
72+
cll, err := libraries.WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
6273
ClusterID: clusterID,
63-
Timeout: d.Timeout(schema.TimeoutRead),
6474
IsRefresh: true,
65-
})
75+
}, d.Timeout(schema.TimeoutRead))
6676
if err != nil {
6777
return err
6878
}
6979
for _, v := range cll.LibraryStatuses {
7080
thisRep := v.Library.String()
7181
if thisRep == libraryRep {
72-
common.StructToData(v.Library, s, d)
82+
common.StructToData(v.Library, libraySdkSchema, d)
7383
d.Set("cluster_id", clusterID)
7484
return nil
7585
}

libraries/libraries_api_sdk.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package libraries
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"strings"
9+
"time"
10+
11+
"github.com/databricks/databricks-sdk-go"
12+
"github.com/databricks/databricks-sdk-go/apierr"
13+
"github.com/databricks/databricks-sdk-go/service/compute"
14+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
15+
)
16+
17+
// Given a compute.Wait struct, returns library statuses based on the input parameter.
18+
// If wait.IsRunning is set to true, this function will wait until all of the libraries are installed to return. Otherwise, it will directly return the list of libraries.
19+
func WaitForLibrariesInstalledSdk(ctx context.Context, w *databricks.WorkspaceClient, wait compute.Wait, timeout time.Duration) (result *compute.ClusterLibraryStatuses, err error) {
20+
err = resource.RetryContext(ctx, timeout, func() *resource.RetryError {
21+
libsClusterStatus, err := w.Libraries.ClusterStatusByClusterId(ctx, wait.ClusterID)
22+
if err != nil {
23+
var apiErr *apierr.APIError
24+
if !errors.As(err, &apiErr) {
25+
return resource.NonRetryableError(err)
26+
}
27+
if apiErr.StatusCode != 404 && strings.Contains(apiErr.Message,
28+
fmt.Sprintf("Cluster %s does not exist", wait.ClusterID)) {
29+
apiErr.StatusCode = 404
30+
}
31+
return resource.NonRetryableError(apiErr)
32+
}
33+
if !wait.IsRunning {
34+
log.Printf("[INFO] We don't have to wait until the libraries are installed, so just returning list of %d libraries",
35+
len(libsClusterStatus.LibraryStatuses))
36+
result = libsClusterStatus
37+
return nil
38+
}
39+
retry, err := libsClusterStatus.IsRetryNeeded(wait)
40+
if retry {
41+
return resource.RetryableError(err)
42+
}
43+
if err != nil {
44+
return resource.NonRetryableError(err)
45+
}
46+
result = libsClusterStatus
47+
return nil
48+
})
49+
if err != nil {
50+
return
51+
}
52+
if wait.IsRunning {
53+
installed := []compute.LibraryFullStatus{}
54+
cleanup := compute.UninstallLibraries{
55+
ClusterId: wait.ClusterID,
56+
Libraries: []compute.Library{},
57+
}
58+
// cleanup libraries that failed to install
59+
for _, v := range result.LibraryStatuses {
60+
if v.Status == "FAILED" {
61+
log.Printf("[WARN] Removing failed library %s from %s", v.Library, wait.ClusterID)
62+
cleanup.Libraries = append(cleanup.Libraries, *v.Library)
63+
continue
64+
}
65+
installed = append(installed, v)
66+
}
67+
// and result contains only the libraries that were successfully installed
68+
result.LibraryStatuses = installed
69+
if len(cleanup.Libraries) > 0 {
70+
w.Libraries.Uninstall(ctx, cleanup)
71+
if err != nil {
72+
err = fmt.Errorf("cannot cleanup libraries: %w", err)
73+
}
74+
}
75+
}
76+
return
77+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package libraries
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/databricks/databricks-sdk-go/apierr"
10+
"github.com/databricks/databricks-sdk-go/service/compute"
11+
"github.com/databricks/terraform-provider-databricks/common"
12+
"github.com/databricks/terraform-provider-databricks/qa"
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
func TestWaitForLibrariesInstalledSdk(t *testing.T) {
17+
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
18+
{
19+
Method: "GET",
20+
Resource: "/api/2.0/libraries/cluster-status?cluster_id=missing",
21+
ReuseRequest: true,
22+
Status: 404,
23+
Response: apierr.NotFound("missing"),
24+
},
25+
{
26+
Method: "GET",
27+
Resource: "/api/2.0/libraries/cluster-status?cluster_id=error",
28+
ReuseRequest: true,
29+
Status: 500,
30+
Response: apierr.APIError{
31+
Message: "internal error",
32+
},
33+
},
34+
{
35+
Method: "GET",
36+
Resource: "/api/2.0/libraries/cluster-status?cluster_id=1005-abcd",
37+
ReuseRequest: true,
38+
Status: 400,
39+
Response: apierr.APIError{
40+
Message: "Cluster 1005-abcd does not exist",
41+
},
42+
},
43+
{
44+
Method: "GET",
45+
Resource: "/api/2.0/libraries/cluster-status?cluster_id=still-installing",
46+
ReuseRequest: true,
47+
Response: ClusterLibraryStatuses{
48+
ClusterID: "still-installing",
49+
LibraryStatuses: []LibraryStatus{
50+
{
51+
Status: "PENDING",
52+
Library: &Library{
53+
Jar: "a.jar",
54+
},
55+
},
56+
},
57+
},
58+
},
59+
{
60+
Method: "GET",
61+
Resource: "/api/2.0/libraries/cluster-status?cluster_id=failed-wheel",
62+
ReuseRequest: true,
63+
Response: ClusterLibraryStatuses{
64+
ClusterID: "still-installing",
65+
LibraryStatuses: []LibraryStatus{
66+
{
67+
Status: "FAILED",
68+
Messages: []string{"does not compute"},
69+
Library: &Library{
70+
Whl: "b.whl",
71+
},
72+
},
73+
{
74+
Status: "INSTALLED",
75+
Library: &Library{
76+
Jar: "a.jar",
77+
},
78+
},
79+
},
80+
},
81+
},
82+
{
83+
Method: "POST",
84+
Resource: "/api/2.0/libraries/uninstall",
85+
ExpectedRequest: ClusterLibraryList{
86+
ClusterID: "failed-wheel",
87+
Libraries: []Library{
88+
{
89+
Whl: "b.whl",
90+
},
91+
},
92+
},
93+
},
94+
}, func(ctx context.Context, client *common.DatabricksClient) {
95+
w, err := client.WorkspaceClient()
96+
if err != nil {
97+
panic(err)
98+
}
99+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
100+
ClusterID: "missing", Libraries: []compute.Library{}, IsRunning: true, IsRefresh: false,
101+
}, 50*time.Millisecond)
102+
assert.EqualError(t, err, "missing")
103+
104+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
105+
ClusterID: "error", Libraries: []compute.Library{}, IsRunning: true, IsRefresh: false,
106+
}, 50*time.Millisecond)
107+
assert.EqualError(t, err, "internal error")
108+
109+
// cluster is not running
110+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
111+
ClusterID: "still-installing", Libraries: []compute.Library{}, IsRunning: false, IsRefresh: false,
112+
}, 50*time.Millisecond)
113+
assert.NoError(t, err)
114+
115+
// cluster is running
116+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
117+
ClusterID: "still-installing", Libraries: []compute.Library{}, IsRunning: true, IsRefresh: false,
118+
}, 50*time.Millisecond)
119+
assert.EqualError(t, err, "0 libraries are ready, but there are still 1 pending")
120+
121+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
122+
ClusterID: "failed-wheel", Libraries: []compute.Library{}, IsRunning: true, IsRefresh: false,
123+
}, 50*time.Millisecond)
124+
assert.EqualError(t, err, "whl:b.whl failed: does not compute")
125+
126+
// uninstall b.whl and continue executing
127+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
128+
ClusterID: "failed-wheel", Libraries: []compute.Library{}, IsRunning: true, IsRefresh: true,
129+
}, 50*time.Millisecond)
130+
assert.NoError(t, err, "library should have been uninstalled and work proceeded")
131+
132+
// Cluster not available or doesn't exist
133+
_, err = WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
134+
ClusterID: "1005-abcd", Libraries: []compute.Library{}, IsRunning: false, IsRefresh: false,
135+
}, 50*time.Millisecond)
136+
137+
var ae *apierr.APIError
138+
assert.True(t, errors.As(err, &ae))
139+
assert.Equal(t, 404, ae.StatusCode)
140+
assert.Equal(t, "Cluster 1005-abcd does not exist", ae.Message)
141+
})
142+
}

0 commit comments

Comments
 (0)