Skip to content

Commit 9339119

Browse files
committed
Added initial support for Multi Task Jobs
* provider has to be initialized with `use_multitask_jobs = true` * `task` block of `databricks_job` is currently slice, so adding and removing different tasks might cause confusing, but still correct diffs * we may explore `tf:slice_set` mechanics for `task` blocks, though initial testing turned out to be harder to test * `always_running` parameter still has to be tested for API 2.1 compatibility This implements feature #747
1 parent 77ce37c commit 9339119

File tree

4 files changed

+117
-68
lines changed

4 files changed

+117
-68
lines changed

common/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ type DatabricksClient struct {
9494
// Maximum number of requests per second made to Databricks REST API.
9595
RateLimitPerSecond int `name:"rate_limit" env:"DATABRICKS_RATE_LIMIT"`
9696

97+
// Use multitask jobs
98+
UseMutiltaskJobs bool `name:"use_multitask_jobs"`
99+
97100
// OAuth token refreshers for Azure to be used within `authVisitor`
98101
azureAuthorizer autorest.Authorizer
99102

compute/acceptance/job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestAwsAccJobsCreate(t *testing.T) {
5151
},
5252
},
5353
},
54-
EmailNotifications: &JobEmailNotifications{
54+
EmailNotifications: &EmailNotifications{
5555
OnStart: []string{},
5656
OnSuccess: []string{},
5757
OnFailure: []string{},

compute/model.go

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,9 @@ type Cluster struct {
280280
InstancePoolID string `json:"instance_pool_id,omitempty" tf:"group:node_type"`
281281
DriverInstancePoolID string `json:"driver_instance_pool_id,omitempty" tf:"group:node_type,computed"`
282282
PolicyID string `json:"policy_id,omitempty"`
283-
AwsAttributes *AwsAttributes `json:"aws_attributes,omitempty" tf:"conflicts:instance_pool_id"`
284-
AzureAttributes *AzureAttributes `json:"azure_attributes,omitempty" tf:"conflicts:instance_pool_id"`
285-
GcpAttributes *GcpAttributes `json:"gcp_attributes,omitempty" tf:"conflicts:instance_pool_id"`
283+
AwsAttributes *AwsAttributes `json:"aws_attributes,omitempty" tf:"conflicts:instance_pool_id,suppress_diff"`
284+
AzureAttributes *AzureAttributes `json:"azure_attributes,omitempty" tf:"conflicts:instance_pool_id,suppress_diff"`
285+
GcpAttributes *GcpAttributes `json:"gcp_attributes,omitempty" tf:"conflicts:instance_pool_id,suppress_diff"`
286286
AutoterminationMinutes int32 `json:"autotermination_minutes,omitempty"`
287287

288288
SparkConf map[string]string `json:"spark_conf,omitempty"`
@@ -413,8 +413,8 @@ type InstancePool struct {
413413
MinIdleInstances int32 `json:"min_idle_instances,omitempty"`
414414
MaxCapacity int32 `json:"max_capacity,omitempty"`
415415
IdleInstanceAutoTerminationMinutes int32 `json:"idle_instance_autotermination_minutes"`
416-
AwsAttributes *InstancePoolAwsAttributes `json:"aws_attributes,omitempty" tf:"force_new"`
417-
AzureAttributes *InstancePoolAzureAttributes `json:"azure_attributes,omitempty" tf:"force_new"`
416+
AwsAttributes *InstancePoolAwsAttributes `json:"aws_attributes,omitempty" tf:"force_new,suppress_diff"`
417+
AzureAttributes *InstancePoolAzureAttributes `json:"azure_attributes,omitempty" tf:"force_new,suppress_diff"`
418418
NodeTypeID string `json:"node_type_id" tf:"force_new"`
419419
CustomTags map[string]string `json:"custom_tags,omitempty" tf:"force_new"`
420420
EnableElasticDisk bool `json:"enable_elastic_disk,omitempty" tf:"force_new"`
@@ -517,8 +517,8 @@ type SparkSubmitTask struct {
517517
Parameters []string `json:"parameters,omitempty"`
518518
}
519519

520-
// JobEmailNotifications contains the information for email notifications after job completion
521-
type JobEmailNotifications struct {
520+
// EmailNotifications contains the information for email notifications after job completion
521+
type EmailNotifications struct {
522522
OnStart []string `json:"on_start,omitempty"`
523523
OnSuccess []string `json:"on_success,omitempty"`
524524
OnFailure []string `json:"on_failure,omitempty"`
@@ -532,27 +532,55 @@ type CronSchedule struct {
532532
PauseStatus string `json:"pause_status,omitempty" tf:"computed"`
533533
}
534534

535-
// JobSettings contains the information for configuring a job on databricks
536-
type JobSettings struct {
537-
Name string `json:"name,omitempty" tf:"default:Untitled"`
535+
type TaskDependency struct {
536+
TaskKey string `json:"task_key,omitempty"`
537+
}
538538

539-
ExistingClusterID string `json:"existing_cluster_id,omitempty" tf:"group:cluster_type"`
540-
NewCluster *Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"`
539+
type JobTaskSettings struct {
540+
TaskKey string `json:"task_key,omitempty"`
541+
Description string `json:"description,omitempty"`
542+
DependsOn []TaskDependency `json:"depends_on,omitempty"`
541543

542-
NotebookTask *NotebookTask `json:"notebook_task,omitempty" tf:"group:task_type"`
543-
SparkJarTask *SparkJarTask `json:"spark_jar_task,omitempty" tf:"group:task_type"`
544-
SparkPythonTask *SparkPythonTask `json:"spark_python_task,omitempty" tf:"group:task_type"`
545-
SparkSubmitTask *SparkSubmitTask `json:"spark_submit_task,omitempty" tf:"group:task_type"`
544+
// TODO: add PipelineTask, PythonWheelTask
545+
ExistingClusterID string `json:"existing_cluster_id,omitempty" tf:"group:cluster_type"`
546+
NewCluster *Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"`
547+
Libraries []Library `json:"libraries,omitempty" tf:"slice_set,alias:library"`
548+
NotebookTask *NotebookTask `json:"notebook_task,omitempty" tf:"group:task_type"`
549+
SparkJarTask *SparkJarTask `json:"spark_jar_task,omitempty" tf:"group:task_type"`
550+
SparkPythonTask *SparkPythonTask `json:"spark_python_task,omitempty" tf:"group:task_type"`
551+
SparkSubmitTask *SparkSubmitTask `json:"spark_submit_task,omitempty" tf:"group:task_type"`
552+
EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"`
553+
TimeoutSeconds int32 `json:"timeout_seconds,omitempty"`
554+
MaxRetries int32 `json:"max_retries,omitempty"`
555+
MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"`
556+
RetryOnTimeout bool `json:"retry_on_timeout,omitempty"`
557+
}
546558

547-
Libraries []Library `json:"libraries,omitempty" tf:"slice_set,alias:library"`
548-
TimeoutSeconds int32 `json:"timeout_seconds,omitempty"`
549-
MaxRetries int32 `json:"max_retries,omitempty"`
550-
MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"`
551-
RetryOnTimeout bool `json:"retry_on_timeout,omitempty"`
552-
Schedule *CronSchedule `json:"schedule,omitempty"`
553-
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"`
559+
// JobSettings contains the information for configuring a job on databricks
560+
type JobSettings struct {
561+
Name string `json:"name,omitempty" tf:"default:Untitled"`
554562

555-
EmailNotifications *JobEmailNotifications `json:"email_notifications,omitempty"`
563+
// BEGIN Jobs API 2.0
564+
ExistingClusterID string `json:"existing_cluster_id,omitempty" tf:"group:cluster_type"`
565+
NewCluster *Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"`
566+
NotebookTask *NotebookTask `json:"notebook_task,omitempty" tf:"group:task_type"`
567+
SparkJarTask *SparkJarTask `json:"spark_jar_task,omitempty" tf:"group:task_type"`
568+
SparkPythonTask *SparkPythonTask `json:"spark_python_task,omitempty" tf:"group:task_type"`
569+
SparkSubmitTask *SparkSubmitTask `json:"spark_submit_task,omitempty" tf:"group:task_type"`
570+
Libraries []Library `json:"libraries,omitempty" tf:"slice_set,alias:library"`
571+
TimeoutSeconds int32 `json:"timeout_seconds,omitempty"`
572+
MaxRetries int32 `json:"max_retries,omitempty"`
573+
MinRetryIntervalMillis int32 `json:"min_retry_interval_millis,omitempty"`
574+
RetryOnTimeout bool `json:"retry_on_timeout,omitempty"`
575+
// END Jobs API 2.0
576+
577+
// BEGIN Jobs API 2.1
578+
Tasks []JobTaskSettings `json:"tasks,omitempty" tf:"alias:task"`
579+
// END Jobs API 2.1
580+
581+
Schedule *CronSchedule `json:"schedule,omitempty"`
582+
MaxConcurrentRuns int32 `json:"max_concurrent_runs,omitempty"`
583+
EmailNotifications *EmailNotifications `json:"email_notifications,omitempty" tf:"suppress_diff"`
556584
}
557585

558586
// JobList ...

compute/resource_job.go

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log"
7+
"sort"
78
"strconv"
89
"strings"
910
"time"
@@ -17,7 +18,11 @@ import (
1718

1819
// NewJobsAPI creates JobsAPI instance from provider meta
1920
func NewJobsAPI(ctx context.Context, m interface{}) JobsAPI {
20-
return JobsAPI{m.(*common.DatabricksClient), ctx}
21+
client := m.(*common.DatabricksClient)
22+
if client.UseMutiltaskJobs {
23+
ctx = context.WithValue(ctx, common.Api, common.API_2_1)
24+
}
25+
return JobsAPI{client, ctx}
2126
}
2227

2328
// JobsAPI exposes the Jobs API
@@ -188,39 +193,36 @@ func wrapMissingJobError(err error, id string) error {
188193
return err
189194
}
190195

196+
func jobSettingsSchema(s *map[string]*schema.Schema, prefix string) {
197+
if p, err := common.SchemaPath(*s, "new_cluster", "num_workers"); err == nil {
198+
p.Optional = true
199+
p.Default = 0
200+
p.Type = schema.TypeInt
201+
p.ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(0))
202+
p.Required = false
203+
}
204+
if v, err := common.SchemaPath(*s, "new_cluster", "spark_conf"); err == nil {
205+
reSize := common.MustCompileKeyRE(prefix + "new_cluster.0.spark_conf.%")
206+
reConf := common.MustCompileKeyRE(prefix + "new_cluster.0.spark_conf.spark.databricks.delta.preview.enabled")
207+
v.DiffSuppressFunc = func(k, old, new string, d *schema.ResourceData) bool {
208+
isPossiblyLegacyConfig := reSize.Match([]byte(k)) && old == "1" && new == "0"
209+
isLegacyConfig := reConf.Match([]byte(k))
210+
if isPossiblyLegacyConfig || isLegacyConfig {
211+
log.Printf("[DEBUG] Suppressing diff for k=%#v old=%#v new=%#v", k, old, new)
212+
return true
213+
}
214+
return false
215+
}
216+
}
217+
}
218+
191219
var jobSchema = common.StructToSchema(JobSettings{},
192220
func(s map[string]*schema.Schema) map[string]*schema.Schema {
193-
if p, err := common.SchemaPath(s, "new_cluster", "num_workers"); err == nil {
194-
p.Optional = true
195-
p.Default = 0
196-
p.Type = schema.TypeInt
197-
p.ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(0))
198-
p.Required = false
199-
}
221+
jobSettingsSchema(&s, "")
222+
jobSettingsSchema(&s["task"].Elem.(*schema.Resource).Schema, "task.0.")
200223
if p, err := common.SchemaPath(s, "schedule", "pause_status"); err == nil {
201224
p.ValidateFunc = validation.StringInSlice([]string{"PAUSED", "UNPAUSED"}, false)
202225
}
203-
if v, err := common.SchemaPath(s, "new_cluster", "spark_conf"); err == nil {
204-
v.DiffSuppressFunc = func(k, old, new string, d *schema.ResourceData) bool {
205-
isPossiblyLegacyConfig := k == "new_cluster.0.spark_conf.%" && old == "1" && new == "0"
206-
isLegacyConfig := k == "new_cluster.0.spark_conf.spark.databricks.delta.preview.enabled"
207-
if isPossiblyLegacyConfig || isLegacyConfig {
208-
log.Printf("[DEBUG] Suppressing diff for k=%#v old=%#v new=%#v", k, old, new)
209-
return true
210-
}
211-
return false
212-
}
213-
}
214-
if v, err := common.SchemaPath(s, "new_cluster", "aws_attributes"); err == nil {
215-
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.aws_attributes.#")
216-
}
217-
if v, err := common.SchemaPath(s, "new_cluster", "azure_attributes"); err == nil {
218-
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.azure_attributes.#")
219-
}
220-
if v, err := common.SchemaPath(s, "new_cluster", "gcp_attributes"); err == nil {
221-
v.DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("new_cluster.0.gcp_attributes.#")
222-
}
223-
s["email_notifications"].DiffSuppressFunc = common.MakeEmptyBlockSuppressFunc("email_notifications.#")
224226
s["max_concurrent_runs"].ValidateDiagFunc = validation.ToDiagFunc(validation.IntAtLeast(1))
225227
s["max_concurrent_runs"].Default = 1
226228
s["url"] = &schema.Schema{
@@ -244,12 +246,31 @@ func ResourceJob() *schema.Resource {
244246
Create: schema.DefaultTimeout(DefaultProvisionTimeout),
245247
Update: schema.DefaultTimeout(DefaultProvisionTimeout),
246248
},
247-
CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, c interface{}) error {
249+
CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff, m interface{}) error {
250+
var js JobSettings
251+
err := common.DiffToStructPointer(d, jobSchema, &js)
252+
if err != nil {
253+
return err
254+
}
248255
alwaysRunning := d.Get("always_running").(bool)
249-
maxConcurrentRuns := d.Get("max_concurrent_runs").(int)
250-
if alwaysRunning && maxConcurrentRuns > 1 {
256+
if alwaysRunning && js.MaxConcurrentRuns > 1 {
251257
return fmt.Errorf("`always_running` must be specified only with `max_concurrent_runs = 1`")
252258
}
259+
c := m.(*common.DatabricksClient)
260+
if c.UseMutiltaskJobs {
261+
for _, task := range js.Tasks {
262+
err = validateClusterDefinition(*task.NewCluster)
263+
if err != nil {
264+
return fmt.Errorf("task %s invalid: %w", task.TaskKey, err)
265+
}
266+
}
267+
}
268+
if js.NewCluster != nil {
269+
err = validateClusterDefinition(*js.NewCluster)
270+
if err != nil {
271+
return fmt.Errorf("invalid job cluster: %w", err)
272+
}
273+
}
253274
return nil
254275
},
255276
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
@@ -258,18 +279,17 @@ func ResourceJob() *schema.Resource {
258279
if err != nil {
259280
return err
260281
}
261-
if js.NewCluster != nil {
262-
if err = validateClusterDefinition(*js.NewCluster); err != nil {
263-
return err
264-
}
265-
}
282+
sort.Slice(js.Tasks, func(i, j int) bool {
283+
return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey
284+
})
266285
jobsAPI := NewJobsAPI(ctx, c)
267286
job, err := jobsAPI.Create(js)
268287
if err != nil {
269288
return err
270289
}
271290
d.SetId(job.ID())
272291
if d.Get("always_running").(bool) {
292+
// TODO: test this with c.UseMutiltaskJobs
273293
return jobsAPI.Start(job.JobID, d.Timeout(schema.TimeoutCreate))
274294
}
275295
return nil
@@ -279,6 +299,9 @@ func ResourceJob() *schema.Resource {
279299
if err != nil {
280300
return err
281301
}
302+
sort.Slice(job.Settings.Tasks, func(i, j int) bool {
303+
return job.Settings.Tasks[i].TaskKey < job.Settings.Tasks[j].TaskKey
304+
})
282305
d.Set("url", c.FormatURL("#job/", d.Id()))
283306
return common.StructToData(*job.Settings, jobSchema, d)
284307
},
@@ -288,18 +311,13 @@ func ResourceJob() *schema.Resource {
288311
if err != nil {
289312
return err
290313
}
291-
if js.NewCluster != nil {
292-
err = validateClusterDefinition(*js.NewCluster)
293-
if err != nil {
294-
return err
295-
}
296-
}
297314
jobsAPI := NewJobsAPI(ctx, c)
298315
err = jobsAPI.Update(d.Id(), js)
299316
if err != nil {
300317
return err
301318
}
302319
if d.Get("always_running").(bool) {
320+
// TODO: test this with c.UseMutiltaskJobs
303321
return jobsAPI.Restart(d.Id(), d.Timeout(schema.TimeoutUpdate))
304322
}
305323
return nil

0 commit comments

Comments
 (0)