Skip to content

Commit 52c2f92

Browse files
Add support for recording requests in integration acceptance tests (#2720)
## Changes This PR adds support for recording API calls made during an integration acceptance test by setting `RecordRequests = true` in the test.toml file. This PR adds a new struct `ProxyServer` which is a server that sits in between a client and a real Databricks workspace. Having a server in between allows us to record requests as they are sent to a real Databricks workspace in our integration tests. ## Why Other than being a useful feature in itself, this also unlocks migration away from Terraform in DABs since we can now record the API requests made by Terraform and ensure that our custom library makes the same API requests in all cases. ## Tests A bunch of new acceptance tests for the new feature.
1 parent 5688dd1 commit 52c2f92

File tree

23 files changed

+837
-55
lines changed

23 files changed

+837
-55
lines changed

.wsignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ experimental/python/docs/images/databricks-logo.svg
1111
**/*.zip
1212
**/*.whl
1313

14+
# new lines are recorded differently on windows and unix.
15+
# In unix: "raw_body": "hello, world\n"
16+
# In windows: "raw_body": "hello, world\r\n"
17+
# In order to prevent that difference, hello.txt does not have a trailing newline.
18+
acceptance/selftest/record_cloud/volume-io/hello.txt
19+
1420
# "bundle run" has trailing whitespace:
1521
acceptance/bundle/integration_whl/*/output.txt
1622

acceptance/acceptance_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"slices"
1919
"sort"
2020
"strings"
21-
"sync"
2221
"testing"
2322
"time"
2423
"unicode/utf8"
@@ -242,15 +241,15 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int {
242241
if len(expanded[0]) > 0 {
243242
t.Logf("Running test with env %v", expanded[0])
244243
}
245-
runTest(t, dir, coverDir, repls.Clone(), config, configPath, expanded[0])
244+
runTest(t, dir, coverDir, repls.Clone(), config, configPath, expanded[0], inprocessMode)
246245
} else {
247246
for _, envset := range expanded {
248247
envname := strings.Join(envset, "/")
249248
t.Run(envname, func(t *testing.T) {
250249
if !inprocessMode {
251250
t.Parallel()
252251
}
253-
runTest(t, dir, coverDir, repls.Clone(), config, configPath, envset)
252+
runTest(t, dir, coverDir, repls.Clone(), config, configPath, envset, inprocessMode)
254253
})
255254
}
256255
}
@@ -342,7 +341,14 @@ func getSkipReason(config *internal.TestConfig, configPath string) string {
342341
return ""
343342
}
344343

345-
func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsContext, config internal.TestConfig, configPath string, customEnv []string) {
344+
func runTest(t *testing.T,
345+
dir, coverDir string,
346+
repls testdiff.ReplacementsContext,
347+
config internal.TestConfig,
348+
configPath string,
349+
customEnv []string,
350+
inprocessMode bool,
351+
) {
346352
if LogConfig {
347353
configBytes, err := json.MarshalIndent(config, "", " ")
348354
require.NoError(t, err)
@@ -395,16 +401,12 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
395401
} else if isRunningOnCloud {
396402
timeout = max(timeout, config.TimeoutCloud)
397403
}
398-
399404
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
400405
defer cancelFunc()
401406
args := []string{"bash", "-euo", "pipefail", EntryPointScript}
402407
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
403408

404-
// This mutex is used to synchronize recording requests
405-
var serverMutex sync.Mutex
406-
407-
cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir, &serverMutex)
409+
cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir)
408410
testdiff.PrepareReplacementsUser(t, &repls, user)
409411
testdiff.PrepareReplacementsWorkspaceConfig(t, &repls, cfg)
410412

acceptance/internal/prepare_server.go

Lines changed: 97 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ import (
1414
"time"
1515
"unicode/utf8"
1616

17-
sdkconfig "github.com/databricks/databricks-sdk-go/config"
18-
"github.com/databricks/databricks-sdk-go/service/iam"
19-
2017
"github.com/databricks/cli/libs/env"
18+
"github.com/databricks/cli/libs/testproxy"
2119
"github.com/databricks/cli/libs/testserver"
2220
"github.com/databricks/databricks-sdk-go"
21+
sdkconfig "github.com/databricks/databricks-sdk-go/config"
22+
"github.com/databricks/databricks-sdk-go/service/iam"
2323
"github.com/google/uuid"
2424
"github.com/stretchr/testify/assert"
2525
"github.com/stretchr/testify/require"
@@ -40,81 +40,115 @@ func isTruePtr(value *bool) bool {
4040
return value != nil && *value
4141
}
4242

43-
func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string, mu *sync.Mutex) (*sdkconfig.Config, iam.User) {
43+
func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string) (*sdkconfig.Config, iam.User) {
4444
cloudEnv := os.Getenv("CLOUD_ENV")
45+
recordRequests := isTruePtr(config.RecordRequests)
4546

46-
// If we are running on a cloud environment, use the host configured in the
47-
// environment.
4847
if cloudEnv != "" {
4948
w, err := databricks.NewWorkspaceClient()
5049
require.NoError(t, err)
5150

5251
user, err := w.CurrentUser.Me(context.Background())
5352
require.NoError(t, err, "Failed to get current user")
5453

55-
return w.Config, *user
56-
}
54+
cfg := w.Config
5755

58-
recordRequests := isTruePtr(config.RecordRequests)
56+
// If we are running in a cloud environment AND we are recording requests,
57+
// start a dedicated server to act as a reverse proxy to a real Databricks workspace.
58+
if recordRequests {
59+
host, token := startProxyServer(t, logRequests, config.IncludeRequestHeaders, outputDir)
60+
cfg = &sdkconfig.Config{
61+
Host: host,
62+
Token: token,
63+
}
64+
}
5965

60-
tokenSuffix := strings.ReplaceAll(uuid.NewString(), "-", "")
61-
token := "dbapi" + tokenSuffix
66+
return cfg, *user
67+
}
6268

63-
// If we are not recording requests, and no custom server server stubs are configured,
69+
// If we are not recording requests, and no custom server stubs are configured,
6470
// use the default shared server.
6571
if len(config.Server) == 0 && !recordRequests {
66-
return &sdkconfig.Config{
72+
// Use a unique token for each test. This allows us to maintain
73+
// separate state for each test in fake workspaces.
74+
tokenSuffix := strings.ReplaceAll(uuid.NewString(), "-", "")
75+
token := "dbapi" + tokenSuffix
76+
77+
cfg := &sdkconfig.Config{
6778
Host: os.Getenv("DATABRICKS_DEFAULT_HOST"),
6879
Token: token,
69-
}, TestUser
70-
}
80+
}
7181

72-
host := startDedicatedServer(t, config.Server, recordRequests, logRequests, config.IncludeRequestHeaders, outputDir, mu)
82+
return cfg, TestUser
83+
}
7384

74-
return &sdkconfig.Config{
85+
// Default case. Start a dedicated local server for the test with the server stubs configured
86+
// as overrides.
87+
host, token := startLocalServer(t, config.Server, recordRequests, logRequests, config.IncludeRequestHeaders, outputDir)
88+
cfg := &sdkconfig.Config{
7589
Host: host,
7690
Token: token,
77-
}, TestUser
91+
}
92+
93+
// For the purposes of replacements, use testUser for local runs.
94+
// Note, users might have overriden /api/2.0/preview/scim/v2/Me but that should not affect the replacement:
95+
return cfg, TestUser
7896
}
7997

80-
func startDedicatedServer(t *testing.T,
98+
func recordRequestsCallback(t *testing.T, includeHeaders []string, outputDir string) func(request *testserver.Request) {
99+
mu := sync.Mutex{}
100+
101+
return func(request *testserver.Request) {
102+
mu.Lock()
103+
defer mu.Unlock()
104+
105+
req := getLoggedRequest(request, includeHeaders)
106+
reqJson, err := json.MarshalIndent(req, "", " ")
107+
assert.NoErrorf(t, err, "Failed to json-encode: %#v", req)
108+
109+
requestsPath := filepath.Join(outputDir, "out.requests.txt")
110+
f, err := os.OpenFile(requestsPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
111+
assert.NoError(t, err)
112+
defer f.Close()
113+
114+
_, err = f.WriteString(string(reqJson) + "\n")
115+
assert.NoError(t, err)
116+
}
117+
}
118+
119+
func logResponseCallback(t *testing.T) func(request *testserver.Request, response *testserver.EncodedResponse) {
120+
mu := sync.Mutex{}
121+
122+
return func(request *testserver.Request, response *testserver.EncodedResponse) {
123+
mu.Lock()
124+
defer mu.Unlock()
125+
126+
t.Logf("%d %s %s\n%s\n%s",
127+
response.StatusCode, request.Method, request.URL,
128+
formatHeadersAndBody("> ", request.Headers, request.Body),
129+
formatHeadersAndBody("# ", response.Headers, response.Body),
130+
)
131+
}
132+
}
133+
134+
func startLocalServer(t *testing.T,
81135
stubs []ServerStub,
82136
recordRequests bool,
83137
logRequests bool,
84138
includeHeaders []string,
85139
outputDir string,
86-
mu *sync.Mutex,
87-
) string {
140+
) (string, string) {
88141
s := testserver.New(t)
89142

143+
// Record API requests in out.requests.txt if RecordRequests is true
144+
// in test.toml
90145
if recordRequests {
91-
requestsPath := filepath.Join(outputDir, "out.requests.txt")
92-
s.RequestCallback = func(request *testserver.Request) {
93-
req := getLoggedRequest(request, includeHeaders)
94-
reqJson, err := json.MarshalIndent(req, "", " ")
95-
96-
mu.Lock()
97-
defer mu.Unlock()
98-
99-
assert.NoErrorf(t, err, "Failed to json-encode: %#v", req)
100-
101-
f, err := os.OpenFile(requestsPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
102-
assert.NoError(t, err)
103-
defer f.Close()
104-
105-
_, err = f.WriteString(string(reqJson) + "\n")
106-
assert.NoError(t, err)
107-
}
146+
s.RequestCallback = recordRequestsCallback(t, includeHeaders, outputDir)
108147
}
109148

149+
// Log API responses if the -logrequests flag is set.
110150
if logRequests {
111-
s.ResponseCallback = func(request *testserver.Request, response *testserver.EncodedResponse) {
112-
t.Logf("%d %s %s\n%s\n%s",
113-
response.StatusCode, request.Method, request.URL,
114-
formatHeadersAndBody("> ", request.Headers, request.Body),
115-
formatHeadersAndBody("# ", response.Headers, response.Body),
116-
)
117-
}
151+
s.ResponseCallback = logResponseCallback(t)
118152
}
119153

120154
for ind := range stubs {
@@ -132,8 +166,25 @@ func startDedicatedServer(t *testing.T,
132166

133167
// The earliest handlers take precedence, add default handlers last
134168
addDefaultHandlers(s)
169+
return s.URL, "dbapi123"
170+
}
171+
172+
func startProxyServer(t *testing.T,
173+
logRequests bool,
174+
includeHeaders []string,
175+
outputDir string,
176+
) (string, string) {
177+
s := testproxy.New(t)
178+
179+
// Always record requests for a proxy server.
180+
s.RequestCallback = recordRequestsCallback(t, includeHeaders, outputDir)
181+
182+
// Log API responses if the -logrequests flag is set.
183+
if logRequests {
184+
s.ResponseCallback = logResponseCallback(t)
185+
}
135186

136-
return s.URL
187+
return s.URL, "dbapi1234"
137188
}
138189

139190
type LoggedRequest struct {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"method": "GET",
3+
"path": "/api/2.0/preview/scim/v2/Me"
4+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
>>> [CLI] current-user me
3+
"[USERNAME]"
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Proxy server successfully records a requests and returns a response.
2+
trace $CLI current-user me | jq .name.givenName
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"method": "GET",
3+
"path": "/api/2.2/jobs/get"
4+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
>>> [CLI] jobs get 1234
3+
Error: Job 1234 does not exist.
4+
5+
Exit code: 1
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Proxy server should successfully return non 200 responses.
2+
errcode trace $CLI jobs get 1234
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
2+
=== Create a pipeline
3+
4+
>>> print_requests
5+
{
6+
"method": "POST",
7+
"path": "/api/2.0/pipelines",
8+
"body": {
9+
"allow_duplicate_names": true,
10+
"libraries": [
11+
{
12+
"file": {
13+
"path": "/whatever.py"
14+
}
15+
}
16+
],
17+
"name": "test-pipeline-1"
18+
}
19+
}
20+
21+
=== Get the pipeline
22+
>>> [CLI] pipelines get [UUID]
23+
"test-pipeline-1"
24+
25+
>>> print_requests
26+
{
27+
"method": "GET",
28+
"path": "/api/2.0/pipelines/[UUID]"
29+
}
30+
31+
=== Update the pipeline
32+
>>> [CLI] pipelines update [UUID] --json @pipeline2.json
33+
34+
>>> print_requests
35+
{
36+
"method": "PUT",
37+
"path": "/api/2.0/pipelines/[UUID]",
38+
"body": {
39+
"allow_duplicate_names": true,
40+
"libraries": [
41+
{
42+
"file": {
43+
"path": "/whatever.py"
44+
}
45+
}
46+
],
47+
"name": "test-pipeline-2"
48+
}
49+
}
50+
51+
=== Verify the update
52+
>>> [CLI] pipelines get [UUID]
53+
"test-pipeline-2"
54+
55+
>>> print_requests
56+
{
57+
"method": "GET",
58+
"path": "/api/2.0/pipelines/[UUID]"
59+
}
60+
61+
=== Delete the pipeline
62+
>>> [CLI] pipelines delete [UUID]
63+
64+
>>> print_requests
65+
{
66+
"method": "DELETE",
67+
"path": "/api/2.0/pipelines/[UUID]"
68+
}
69+
70+
=== Verify the deletion
71+
>>> [CLI] pipelines get [UUID]
72+
Error: The specified pipeline [UUID] was not found.
73+
74+
Exit code: 1
75+
76+
>>> print_requests
77+
{
78+
"method": "GET",
79+
"path": "/api/2.0/pipelines/[UUID]"
80+
}

0 commit comments

Comments
 (0)