Skip to content

Commit c4d9759

Browse files
committed
Verify workspace reachability through HTTP client
* `WorkspacesAPI.WaitForRunning` used to check if workspace is reachable through `net.Dial` on TCP level, though this scenario ignored HTTPS proxies, which are common for enterprise setup. * `mws.Workspace` now gets `workspace_url` computed in accordance to Accounts API endpoint used from a single place in `WorkspacesAPI.Read`, if there's no `workspace_url` returned in the response, which allows fully unit testing the verification flow. * added `DatabricksClient.ClientForHost` to talk to just created workspace with the same authentication parameters. This fixes #874
1 parent b1976a3 commit c4d9759

File tree

4 files changed

+131
-46
lines changed

4 files changed

+131
-46
lines changed

common/client.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,3 +436,29 @@ func (c *DatabricksClient) FormatURL(strs ...string) string {
436436
data := append([]string{host}, strs...)
437437
return strings.Join(data, "")
438438
}
439+
440+
// ClientForHost creates a new DatabricksClient instance with the same auth parameters,
441+
// but for the given host. Authentication has to be reinitialized, as Google OIDC has
442+
// different authorizers, depending if it's workspace or Accounts API we're talking to.
443+
func (c *DatabricksClient) ClientForHost(url string) *DatabricksClient {
444+
return &DatabricksClient{
445+
Host: url,
446+
Username: c.Username,
447+
Password: c.Password,
448+
Token: c.Token,
449+
Profile: c.Profile,
450+
ConfigFile: c.ConfigFile,
451+
GoogleServiceAccount: c.GoogleServiceAccount,
452+
AzurermEnvironment: c.AzurermEnvironment,
453+
InsecureSkipVerify: c.InsecureSkipVerify,
454+
HTTPTimeoutSeconds: c.HTTPTimeoutSeconds,
455+
DebugTruncateBytes: c.DebugTruncateBytes,
456+
DebugHeaders: c.DebugHeaders,
457+
RateLimitPerSecond: c.RateLimitPerSecond,
458+
Provider: c.Provider,
459+
rateLimiter: c.rateLimiter,
460+
httpClient: c.httpClient,
461+
configAttributesUsed: c.configAttributesUsed,
462+
commandFactory: c.commandFactory,
463+
}
464+
}

common/http.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,11 @@ func (c *DatabricksClient) parseError(resp *http.Response) APIError {
220220
// checkHTTPRetry inspects HTTP errors from the Databricks API for known transient errors on Workspace creation
221221
func (c *DatabricksClient) checkHTTPRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
222222
if ue, ok := err.(*url.Error); ok {
223-
apiError := APIError{ErrorCode: "IO_ERROR", Message: ue.Error()}
223+
apiError := APIError{
224+
ErrorCode: "IO_ERROR",
225+
StatusCode: 523,
226+
Message: ue.Error(),
227+
}
224228
return apiError.IsRetriable(), apiError
225229
}
226230
if resp == nil {

mws/resource_workspace.go

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,6 @@ func (a WorkspacesAPI) Create(ws *Workspace, timeout time.Duration) error {
5252
return nil
5353
}
5454

55-
func dial(hostAndPort, url string, timeout time.Duration) *resource.RetryError {
56-
conn, err := net.DialTimeout("tcp", hostAndPort, timeout)
57-
if err != nil {
58-
return resource.RetryableError(err)
59-
}
60-
log.Printf("[INFO] Workspace %s is ready to use", url)
61-
defer conn.Close()
62-
return nil
63-
}
64-
6555
// generateWorkspaceHostname computes the hostname for the specified workspace,
6656
// given the account console hostname.
6757
func generateWorkspaceHostname(client *common.DatabricksClient, ws Workspace) string {
@@ -85,6 +75,42 @@ func generateWorkspaceHostname(client *common.DatabricksClient, ws Workspace) st
8575
return strings.Join(chunks, ".")
8676
}
8777

78+
func (a WorkspacesAPI) verifyWorkspaceReachable(ws Workspace) *resource.RetryError {
79+
ctx, cancel := context.WithTimeout(a.context, 10*time.Second)
80+
defer cancel()
81+
// wait for DNS caches to refresh, as sometimes we cannot make
82+
// API calls to new workspaces immediately after it's created
83+
wsClient := a.client.ClientForHost(ws.WorkspaceURL)
84+
// make a request to Tokens API, just to verify there are no errors
85+
var response map[string]interface{}
86+
err := wsClient.Get(ctx, "/token/list", nil, &response)
87+
if apiError, ok := err.(common.APIError); ok {
88+
err = fmt.Errorf("workspace %s is not yet reachable: %s",
89+
ws.WorkspaceURL, apiError)
90+
log.Printf("[INFO] %s", err)
91+
// expected to retry on: dial tcp: lookup XXX: no such host
92+
return resource.RetryableError(err)
93+
}
94+
return nil
95+
}
96+
97+
func (a WorkspacesAPI) explainWorkspaceFailure(ws Workspace) error {
98+
if ws.NetworkID == "" {
99+
return fmt.Errorf(ws.WorkspaceStatusMessage)
100+
}
101+
network, nerr := NewNetworksAPI(a.context, a.client).Read(ws.AccountID, ws.NetworkID)
102+
if nerr != nil {
103+
return fmt.Errorf("failed to start workspace. Cannot read network: %s", nerr)
104+
}
105+
var strBuffer bytes.Buffer
106+
for _, networkHealth := range network.ErrorMessages {
107+
strBuffer.WriteString(fmt.Sprintf("error: %s;error_msg: %s;",
108+
networkHealth.ErrorType, networkHealth.ErrorMessage))
109+
}
110+
return fmt.Errorf("Workspace failed to create: %v, network error message: %v",
111+
ws.WorkspaceStatusMessage, strBuffer.String())
112+
}
113+
88114
// WaitForRunning will wait until workspace is running, otherwise will try to explain why it failed
89115
func (a WorkspacesAPI) WaitForRunning(ws Workspace, timeout time.Duration) error {
90116
return resource.RetryContext(a.context, timeout, func() *resource.RetryError {
@@ -94,36 +120,17 @@ func (a WorkspacesAPI) WaitForRunning(ws Workspace, timeout time.Duration) error
94120
}
95121
switch workspace.WorkspaceStatus {
96122
case WorkspaceStatusRunning:
97-
// wait for DNS caches to refresh, as sometimes we cannot make
98-
// API calls to new workspaces immediately after it's created
99-
host := generateWorkspaceHostname(a.client, ws)
100-
hostAndPort := fmt.Sprintf("%s:443", host)
101-
url := fmt.Sprintf("https://%s", host)
102123
log.Printf("[INFO] Workspace is now running")
103-
if strings.Contains(workspace.DeploymentName, "900150983cd24fb0") {
124+
if strings.Contains(ws.DeploymentName, "900150983cd24fb0") {
104125
// nobody would probably name workspace as 900150983cd24fb0,
105126
// so we'll use it as unit testing shim
106127
return nil
107128
}
108-
return dial(hostAndPort, url, 10*time.Second)
129+
return a.verifyWorkspaceReachable(workspace)
109130
case WorkspaceStatusCanceled, WorkspaceStatusFailed:
110131
log.Printf("[ERROR] Cannot start workspace: %s", workspace.WorkspaceStatusMessage)
111-
if workspace.NetworkID == "" {
112-
return resource.NonRetryableError(fmt.Errorf(workspace.WorkspaceStatusMessage))
113-
}
114-
network, nerr := NewNetworksAPI(a.context, a.client).Read(ws.AccountID, ws.NetworkID)
115-
if nerr != nil {
116-
return resource.NonRetryableError(fmt.Errorf(
117-
"failed to start workspace. Cannot read network: %s", nerr))
118-
}
119-
var strBuffer bytes.Buffer
120-
for _, networkHealth := range network.ErrorMessages {
121-
strBuffer.WriteString(fmt.Sprintf("error: %s;error_msg: %s;",
122-
networkHealth.ErrorType, networkHealth.ErrorMessage))
123-
}
124-
return resource.NonRetryableError(fmt.Errorf(
125-
"Workspace failed to create: %v, network error message: %v",
126-
workspace.WorkspaceStatusMessage, strBuffer.String()))
132+
err = a.explainWorkspaceFailure(workspace)
133+
return resource.NonRetryableError(err)
127134
default:
128135
log.Printf("[INFO] Workspace %s is %s: %s", workspace.DeploymentName,
129136
workspace.WorkspaceStatus, workspace.WorkspaceStatusMessage)
@@ -162,6 +169,11 @@ func (a WorkspacesAPI) Read(mwsAcctID, workspaceID string) (Workspace, error) {
162169
var mwsWorkspace Workspace
163170
workspacesAPIPath := fmt.Sprintf("/accounts/%s/workspaces/%s", mwsAcctID, workspaceID)
164171
err := a.client.Get(a.context, workspacesAPIPath, nil, &mwsWorkspace)
172+
if err == nil && mwsWorkspace.WorkspaceURL == "" {
173+
// generate workspace URL based on client's hostname, if response contains no URL
174+
host := generateWorkspaceHostname(a.client, mwsWorkspace)
175+
mwsWorkspace.WorkspaceURL = fmt.Sprintf("https://%s", host)
176+
}
165177
return mwsWorkspace, err
166178
}
167179

@@ -296,7 +308,6 @@ func ResourceWorkspace() *schema.Resource {
296308
// Default the value of `is_no_public_ip_enabled` because it isn't part of the GET payload.
297309
// The field is only used on creation and we therefore suppress all diffs.
298310
workspace.IsNoPublicIPEnabled = true
299-
workspace.WorkspaceURL = fmt.Sprintf("https://%s", generateWorkspaceHostname(c, workspace))
300311
if err = common.StructToData(workspace, workspaceSchema, d); err != nil {
301312
return err
302313
}

mws/resource_workspace_test.go

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ package mws
33
import (
44
"context"
55
"fmt"
6-
"net/http"
7-
"net/http/httptest"
8-
"strings"
96
"testing"
107
"time"
118

@@ -906,13 +903,60 @@ func TestListWorkspaces(t *testing.T) {
906903
assert.Len(t, l, 0)
907904
}
908905

909-
func TestDial(t *testing.T) {
910-
err := dial("127.0.0.1:32456", "localhost", 50*time.Millisecond)
911-
assert.NotNil(t, err)
912-
assert.Equal(t, err.Err.Error(), "dial tcp 127.0.0.1:32456: connect: connection refused")
906+
func TestWorkspace_WaitForResolve_Failure(t *testing.T) {
907+
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
908+
{
909+
Method: "GET",
910+
Resource: "/api/2.0/accounts/abc/workspaces/1234",
911+
ReuseRequest: true,
912+
Response: Workspace{
913+
AccountID: "abc",
914+
WorkspaceID: 1234,
915+
WorkspaceStatus: "RUNNING",
916+
WorkspaceURL: "https://foo-bar-baz.cloud.databricks.com",
917+
},
918+
},
919+
}, func(ctx context.Context, client *common.DatabricksClient) {
920+
a := NewWorkspacesAPI(ctx, client)
921+
err := a.WaitForRunning(Workspace{
922+
AccountID: "abc",
923+
WorkspaceID: 1234,
924+
}, 1*time.Second)
925+
assert.EqualError(t, err, "workspace https://foo-bar-baz.cloud.databricks.com is not yet reachable:"+
926+
" Get \"https://foo-bar-baz.cloud.databricks.com/api/2.0/token/list\": "+
927+
"dial tcp: lookup foo-bar-baz.cloud.databricks.com: no such host")
928+
})
929+
}
913930

914-
s := httptest.NewServer(http.HandlerFunc(http.NotFound))
915-
defer s.Close()
916-
err = dial(strings.ReplaceAll(s.URL, "http://", ""), s.URL, 500*time.Millisecond)
917-
assert.Nil(t, err)
931+
func TestWorkspace_WaitForResolve(t *testing.T) {
932+
// outer HTTP server is used for inner request for "just created" workspace
933+
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
934+
{
935+
Method: "GET",
936+
Resource: "/api/2.0/token/list",
937+
Response: `{}`, // we just need a JSON for this
938+
},
939+
}, func(ctx context.Context, wsClient *common.DatabricksClient) {
940+
// inner HTTP server is used for outer request for Accounts API
941+
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
942+
{
943+
Method: "GET",
944+
Resource: "/api/2.0/accounts/abc/workspaces/1234",
945+
ReuseRequest: true,
946+
Response: Workspace{
947+
AccountID: "abc",
948+
WorkspaceID: 1234,
949+
WorkspaceStatus: "RUNNING",
950+
WorkspaceURL: wsClient.Host,
951+
},
952+
},
953+
}, func(ctx context.Context, client *common.DatabricksClient) {
954+
a := NewWorkspacesAPI(ctx, client)
955+
err := a.WaitForRunning(Workspace{
956+
AccountID: "abc",
957+
WorkspaceID: 1234,
958+
}, 1*time.Second)
959+
assert.NoError(t, err)
960+
})
961+
})
918962
}

0 commit comments

Comments
 (0)