Skip to content

Commit 3f2d3c9

Browse files
committed
Dynamically determine APIv2.0 vs APIv2.1
1 parent 9339119 commit 3f2d3c9

File tree

9 files changed

+202
-24
lines changed

9 files changed

+202
-24
lines changed

common/client.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ type DatabricksClient struct {
9494
// Maximum number of requests per second made to Databricks REST API.
9595
RateLimitPerSecond int `name:"rate_limit" env:"DATABRICKS_RATE_LIMIT"`
9696

97-
// Use multitask jobs
98-
UseMutiltaskJobs bool `name:"use_multitask_jobs"`
99-
10097
// OAuth token refreshers for Azure to be used within `authVisitor`
10198
azureAuthorizer autorest.Authorizer
10299

common/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func TestDatabricksClient_FormatURL(t *testing.T) {
151151

152152
func TestClientAttributes(t *testing.T) {
153153
ca := ClientAttributes()
154-
assert.Len(t, ca, 26)
154+
assert.Len(t, ca, 25)
155155
}
156156

157157
func TestDatabricksClient_Authenticate(t *testing.T) {

common/http.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ func (c *DatabricksClient) genericQuery(ctx context.Context, method, requestURL
451451
headers += "\n"
452452
}
453453
}
454-
log.Printf("[DEBUG] %s %s %s%v", method, requestURL, headers, c.redactedDump(requestBody)) // lgtm[go/clear-text-logging]
454+
log.Printf("[DEBUG] %s %s %s%v", method, request.URL.Path, headers, c.redactedDump(requestBody)) // lgtm[go/clear-text-logging]
455455

456456
r, err := retryablehttp.FromRequest(request)
457457
if err != nil {
@@ -475,7 +475,7 @@ func (c *DatabricksClient) genericQuery(ctx context.Context, method, requestURL
475475
if err != nil {
476476
return nil, err
477477
}
478-
log.Printf("[DEBUG] %s %v <- %s %s", resp.Status, c.redactedDump(body), method, requestURL)
478+
log.Printf("[DEBUG] %s %v <- %s %s", resp.Status, c.redactedDump(body), method, request.URL.Path)
479479
return body, nil
480480
}
481481

common/reflect_resource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func typeToSchema(v reflect.Value, t reflect.Type, path []string) map[string]*sc
185185
if strings.Contains(tfTag, "suppress_diff") {
186186
// TODO: we may also suppress count diffs on all json:"..,omitempty" (101 occurences)
187187
// find . -type f -name '*.go' -not -path "vendor/*" | xargs grep ',omitempty' | grep '*'
188-
blockCount := strings.Join(append(path, "#"), ".")
188+
blockCount := strings.Join(append(path, fieldName, "#"), ".")
189189
scm[fieldName].DiffSuppressFunc = makeEmptyBlockSuppressFunc(blockCount)
190190
}
191191
scm[fieldName].Elem = &schema.Resource{

common/resource.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,11 @@ func MustCompileKeyRE(name string) *regexp.Regexp {
105105
}
106106

107107
func makeEmptyBlockSuppressFunc(name string) func(k, old, new string, d *schema.ResourceData) bool {
108-
// TODO: move to TF field tag
109108
re := MustCompileKeyRE(name)
110109
return func(k, old, new string, d *schema.ResourceData) bool {
111110
log.Printf("[DEBUG] name=%s k='%v', old='%v', new='%v'", name, k, old, new)
112111
if re.Match([]byte(name)) && old == "1" && new == "0" {
113-
log.Printf("[DEBUG] Disable removal of empty block")
112+
log.Printf("[DEBUG] Suppressing diff for name=%s k=%#v old=%#v new=%#v", name, k, old, new)
114113
return true
115114
}
116115
return false

compute/acceptance/job_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,82 @@ func TestAwsAccJobsCreate(t *testing.T) {
8888
assert.True(t, job.Settings.NewCluster.SparkVersion == newSparkVersion, "Something is wrong with spark version")
8989
}
9090

91+
func TestAccJobTasks(t *testing.T) {
92+
acceptance.Test(t, []acceptance.Step{
93+
{
94+
Template: `
95+
data "databricks_current_user" "me" {}
96+
data "databricks_spark_version" "latest" {}
97+
data "databricks_node_type" "smallest" {
98+
local_disk = true
99+
}
100+
101+
resource "databricks_notebook" "this" {
102+
path = "${data.databricks_current_user.me.home}/Terraform{var.RANDOM}"
103+
language = "PYTHON"
104+
content_base64 = base64encode(<<-EOT
105+
# created from ${abspath(path.module)}
106+
display(spark.range(10))
107+
EOT
108+
)
109+
}
110+
111+
resource "databricks_job" "this" {
112+
name = "{var.RANDOM}"
113+
task {
114+
task_key = "a"
115+
116+
new_cluster {
117+
num_workers = 1
118+
spark_version = data.databricks_spark_version.latest.id
119+
node_type_id = data.databricks_node_type.smallest.id
120+
}
121+
122+
notebook_task {
123+
notebook_path = databricks_notebook.this.path
124+
}
125+
}
126+
127+
task {
128+
task_key = "b"
129+
130+
depends_on {
131+
task_key = "a"
132+
}
133+
134+
new_cluster {
135+
num_workers = 8
136+
spark_version = data.databricks_spark_version.latest.id
137+
node_type_id = data.databricks_node_type.smallest.id
138+
}
139+
140+
notebook_task {
141+
notebook_path = databricks_notebook.this.path
142+
}
143+
}
144+
145+
task {
146+
task_key = "c"
147+
148+
depends_on {
149+
task_key = "b"
150+
}
151+
152+
new_cluster {
153+
num_workers = 20
154+
spark_version = data.databricks_spark_version.latest.id
155+
node_type_id = data.databricks_node_type.smallest.id
156+
}
157+
158+
notebook_task {
159+
notebook_path = databricks_notebook.this.path
160+
}
161+
}
162+
}`,
163+
},
164+
})
165+
}
166+
91167
func TestAccJobResource(t *testing.T) {
92168
if _, ok := os.LookupEnv("CLOUD_ENV"); !ok {
93169
t.Skip("Acceptance tests skipped unless env 'CLOUD_ENV' is set")

compute/model.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ type Cluster struct {
273273
NumWorkers int32 `json:"num_workers" tf:"group:size"`
274274
Autoscale *AutoScale `json:"autoscale,omitempty" tf:"group:size"`
275275
EnableElasticDisk bool `json:"enable_elastic_disk,omitempty" tf:"computed"`
276-
EnableLocalDiskEncryption bool `json:"enable_local_disk_encryption,omitempty"`
276+
EnableLocalDiskEncryption bool `json:"enable_local_disk_encryption,omitempty" tf:"computed"`
277277

278278
NodeTypeID string `json:"node_type_id,omitempty" tf:"group:node_type,computed"`
279279
DriverNodeTypeID string `json:"driver_node_type_id,omitempty" tf:"group:node_type,computed"`
@@ -553,7 +553,7 @@ type JobTaskSettings struct {
553553
TimeoutSeconds int32 `json:"timeout_seconds,omitempty"`
554554
MaxRetries int32 `json:"max_retries,omitempty"`
555555
MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"`
556-
RetryOnTimeout bool `json:"retry_on_timeout,omitempty"`
556+
RetryOnTimeout bool `json:"retry_on_timeout,omitempty" tf:"computed"`
557557
}
558558

559559
// JobSettings contains the information for configuring a job on databricks
@@ -575,14 +575,19 @@ type JobSettings struct {
575575
// END Jobs API 2.0
576576

577577
// BEGIN Jobs API 2.1
578-
Tasks []JobTaskSettings `json:"tasks,omitempty" tf:"alias:task"`
578+
Tasks []JobTaskSettings `json:"tasks,omitempty" tf:"alias:task"`
579+
Format string `json:"format,omitempty" tf:"computed"`
579580
// END Jobs API 2.1
580581

581582
Schedule *CronSchedule `json:"schedule,omitempty"`
582583
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"`
583584
EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"`
584585
}
585586

587+
func (js *JobSettings) isMultiTask() bool {
588+
return js.Format == "MULTI_TASK" || len(js.Tasks) > 0
589+
}
590+
586591
// JobList ...
587592
type JobList struct {
588593
Jobs []Job `json:"jobs"`

compute/resource_job.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ import (
1919
// NewJobsAPI creates JobsAPI instance from provider meta
2020
func NewJobsAPI(ctx context.Context, m interface{}) JobsAPI {
2121
client := m.(*common.DatabricksClient)
22-
if client.UseMutiltaskJobs {
23-
ctx = context.WithValue(ctx, common.Api, common.API_2_1)
24-
}
2522
return JobsAPI{client, ctx}
2623
}
2724

@@ -239,6 +236,18 @@ var jobSchema = common.StructToSchema(JobSettings{},
239236

240237
// ResourceJob ...
241238
func ResourceJob() *schema.Resource {
239+
getReadCtx := func(ctx context.Context, d *schema.ResourceData) context.Context {
240+
var js JobSettings
241+
err := common.DataToStructPointer(d, jobSchema, &js)
242+
if err != nil {
243+
log.Printf("[INFO] no job resource data available. Returning default context")
244+
return ctx
245+
}
246+
if js.isMultiTask() {
247+
return context.WithValue(ctx, common.Api, common.API_2_1)
248+
}
249+
return ctx
250+
}
242251
return common.Resource{
243252
Schema: jobSchema,
244253
SchemaVersion: 2,
@@ -256,13 +265,10 @@ func ResourceJob() *schema.Resource {
256265
if alwaysRunning && js.MaxConcurrentRuns > 1 {
257266
return fmt.Errorf("`always_running` must be specified only with `max_concurrent_runs = 1`")
258267
}
259-
c := m.(*common.DatabricksClient)
260-
if c.UseMutiltaskJobs {
261-
for _, task := range js.Tasks {
262-
err = validateClusterDefinition(*task.NewCluster)
263-
if err != nil {
264-
return fmt.Errorf("task %s invalid: %w", task.TaskKey, err)
265-
}
268+
for _, task := range js.Tasks {
269+
err = validateClusterDefinition(*task.NewCluster)
270+
if err != nil {
271+
return fmt.Errorf("task %s invalid: %w", task.TaskKey, err)
266272
}
267273
}
268274
if js.NewCluster != nil {
@@ -282,19 +288,22 @@ func ResourceJob() *schema.Resource {
282288
sort.Slice(js.Tasks, func(i, j int) bool {
283289
return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey
284290
})
291+
if js.isMultiTask() {
292+
ctx = context.WithValue(ctx, common.Api, common.API_2_1)
293+
}
285294
jobsAPI := NewJobsAPI(ctx, c)
286295
job, err := jobsAPI.Create(js)
287296
if err != nil {
288297
return err
289298
}
290299
d.SetId(job.ID())
291300
if d.Get("always_running").(bool) {
292-
// TODO: test this with c.UseMutiltaskJobs
293301
return jobsAPI.Start(job.JobID, d.Timeout(schema.TimeoutCreate))
294302
}
295303
return nil
296304
},
297305
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
306+
ctx = getReadCtx(ctx, d)
298307
job, err := NewJobsAPI(ctx, c).Read(d.Id())
299308
if err != nil {
300309
return err
@@ -311,18 +320,21 @@ func ResourceJob() *schema.Resource {
311320
if err != nil {
312321
return err
313322
}
323+
if js.isMultiTask() {
324+
ctx = context.WithValue(ctx, common.Api, common.API_2_1)
325+
}
314326
jobsAPI := NewJobsAPI(ctx, c)
315327
err = jobsAPI.Update(d.Id(), js)
316328
if err != nil {
317329
return err
318330
}
319331
if d.Get("always_running").(bool) {
320-
// TODO: test this with c.UseMutiltaskJobs
321332
return jobsAPI.Restart(d.Id(), d.Timeout(schema.TimeoutUpdate))
322333
}
323334
return nil
324335
},
325336
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
337+
ctx = getReadCtx(ctx, d)
326338
return NewJobsAPI(ctx, c).Delete(d.Id())
327339
},
328340
}.ToResource()

compute/resource_job_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,95 @@ func TestResourceJobCreate(t *testing.T) {
106106
assert.Equal(t, "789", d.Id())
107107
}
108108

109+
func TestResourceJobCreate_MultiTask(t *testing.T) {
110+
d, err := qa.ResourceFixture{
111+
Fixtures: []qa.HTTPFixture{
112+
{
113+
Method: "POST",
114+
Resource: "/api/2.1/jobs/create",
115+
ExpectedRequest: JobSettings{
116+
Name: "Featurizer",
117+
Tasks: []JobTaskSettings{
118+
{
119+
TaskKey: "a",
120+
ExistingClusterID: "abc",
121+
Libraries: []Library{
122+
{
123+
Jar: "dbfs://aa/bb/cc.jar",
124+
},
125+
},
126+
SparkJarTask: &SparkJarTask{
127+
MainClassName: "com.labs.BarMain",
128+
},
129+
},
130+
{
131+
TaskKey: "b",
132+
NewCluster: &Cluster{
133+
SparkVersion: "a",
134+
NodeTypeID: "b",
135+
AzureAttributes: &AzureAttributes{
136+
SpotBidMaxPrice: 0.99,
137+
},
138+
},
139+
NotebookTask: &NotebookTask{
140+
NotebookPath: "/Stuff",
141+
},
142+
},
143+
},
144+
MaxConcurrentRuns: 1,
145+
},
146+
Response: Job{
147+
JobID: 789,
148+
},
149+
},
150+
{
151+
Method: "GET",
152+
Resource: "/api/2.1/jobs/get?job_id=789",
153+
Response: Job{
154+
// good enough for mock
155+
Settings: &JobSettings{},
156+
},
157+
},
158+
},
159+
Create: true,
160+
Resource: ResourceJob(),
161+
HCL: `
162+
name = "Featurizer"
163+
164+
task {
165+
task_key = "a"
166+
167+
existing_cluster_id = "abc"
168+
169+
spark_jar_task {
170+
main_class_name = "com.labs.BarMain"
171+
}
172+
173+
library {
174+
jar = "dbfs://aa/bb/cc.jar"
175+
}
176+
}
177+
178+
task {
179+
task_key = "b"
180+
181+
new_cluster {
182+
spark_version = "a"
183+
node_type_id = "b"
184+
azure_attributes {
185+
spot_bid_max_price = 0.99
186+
}
187+
}
188+
189+
notebook_task {
190+
notebook_path = "/Stuff"
191+
}
192+
}`,
193+
}.Apply(t)
194+
assert.NoError(t, err, err)
195+
assert.Equal(t, "789", d.Id())
196+
}
197+
109198
func TestResourceJobCreate_AlwaysRunning(t *testing.T) {
110199
d, err := qa.ResourceFixture{
111200
Fixtures: []qa.HTTPFixture{

0 commit comments

Comments
 (0)