Skip to content

Commit 1b75df6

Browse files
Added support for running specified job tasks instead of all job tasks (#3388)
## Changes Added support for running specified job tasks instead of all job tasks. This change introduces a new flag `--only` for the `bundle run` command when executed on a job. It allows specifying which specific tasks out of all in the jobs to run. Usage ``` databricks bundle run my_job --only task_1,task_2 ``` ## Why The feature was relatively recently added to Jobs API https://docs.databricks.com/api/workspace/jobs/runnow#only ## Tests Added an acceptance tests. Skipped running on Cloud because the test is really slow but verified manually it works as expected too. <!-- If your PR needs to be included in the release notes for next release, add a separate entry in NEXT_CHANGELOG.md as part of your PR. --> --------- Co-authored-by: Pieter Noordhuis <[email protected]>
1 parent 5bebe60 commit 1b75df6

File tree

10 files changed

+170
-0
lines changed

10 files changed

+170
-0
lines changed

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
* Changed logic for resolving `${resources...}` references. Previously this would be done by terraform at deploy time. Now if it references a field that is present in the config, it will be done by DABs during bundle loading ([#3370](https://github.com/databricks/cli/pull/3370))
1616
* Add support for tagging pipelines ([#3086](https://github.com/databricks/cli/pull/3086))
1717
* Add warning for when an invalid value is specified for an enum field ([#3050](https://github.com/databricks/cli/pull/3050))
18+
* Add support for running specified job tasks instead of all job tasks ([#3388](https://github.com/databricks/cli/pull/3388))
1819

1920
### API Changes

acceptance/bundle/help/bundle-run/output.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Usage:
3838
databricks bundle run [flags] [KEY]
3939

4040
Job Flags:
41+
--only strings comma separated list of task keys to run
4142
--params stringToString comma separated k=v pairs for job parameters (default [])
4243

4344
Job Task Flags:
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
bundle:
2+
name: partial_run
3+
4+
resources:
5+
jobs:
6+
my_job:
7+
name: my_job
8+
tasks:
9+
- task_key: task_1
10+
notebook_task:
11+
notebook_path: "./notebook1.py"
12+
- task_key: task_2
13+
notebook_task:
14+
notebook_path: "./notebook2.py"
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Databricks notebook source
2+
3+
print("Hello from notebook1!")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Databricks notebook source
2+
3+
print("Hello from notebook2!")
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Local = true
2+
Cloud = false
3+
4+
[EnvMatrix]
5+
DATABRICKS_CLI_DEPLOYMENT = ["terraform", "direct-exp"]
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
2+
>>> [CLI] bundle deploy
3+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/partial_run/default/files...
4+
Deploying resources...
5+
Updating deployment state...
6+
Deployment complete!
7+
8+
>>> [CLI] bundle run my_job --only task_1
9+
Run URL: [DATABRICKS_URL]/job/run/[NUMID]
10+
11+
[TIMESTAMP] "run-name" TERMINATED
12+
13+
>>> print_requests
14+
{
15+
"body": {
16+
"job_id": [NUMID],
17+
"only": [
18+
"task_1"
19+
]
20+
},
21+
"method": "POST",
22+
"path": "/api/2.2/jobs/run-now"
23+
}
24+
25+
>>> [CLI] bundle run my_job --only task_1,task_2
26+
Run URL: [DATABRICKS_URL]/job/run/[NUMID]
27+
28+
[TIMESTAMP] "run-name" TERMINATED
29+
30+
>>> print_requests
31+
{
32+
"body": {
33+
"job_id": [NUMID],
34+
"only": [
35+
"task_1",
36+
"task_2"
37+
]
38+
},
39+
"method": "POST",
40+
"path": "/api/2.2/jobs/run-now"
41+
}
42+
43+
>>> [CLI] bundle run my_job
44+
Run URL: [DATABRICKS_URL]/job/run/[NUMID]
45+
46+
[TIMESTAMP] "run-name" TERMINATED
47+
48+
>>> print_requests
49+
{
50+
"body": {
51+
"job_id": [NUMID]
52+
},
53+
"method": "POST",
54+
"path": "/api/2.2/jobs/run-now"
55+
}
56+
57+
>>> musterr [CLI] bundle run my_job --only non_existent_task
58+
Error: task "non_existent_task" not found in job "my_job"
59+
60+
Exit code (musterr): 1
61+
62+
>>> musterr [CLI] bundle run my_job --only non_existent_task,task_1
63+
Error: task "non_existent_task" not found in job "my_job"
64+
65+
Exit code (musterr): 1
66+
67+
>>> [CLI] bundle run my_job --only task1.table1,task2>,task3>table3
68+
Run URL: [DATABRICKS_URL]/job/run/[NUMID]
69+
70+
[TIMESTAMP] "run-name" TERMINATED
71+
72+
>>> print_requests
73+
{
74+
"body": {
75+
"job_id": [NUMID],
76+
"only": [
77+
"task1.table1",
78+
"task2>",
79+
"task3>table3"
80+
]
81+
},
82+
"method": "POST",
83+
"path": "/api/2.2/jobs/run-now"
84+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
print_requests() {
2+
jq --sort-keys 'select(.path | contains("/jobs/run-now"))' < out.requests.txt
3+
rm out.requests.txt
4+
}
5+
6+
trace $CLI bundle deploy
7+
8+
trace $CLI bundle run my_job --only task_1
9+
trace print_requests
10+
11+
trace $CLI bundle run my_job --only task_1,task_2
12+
trace print_requests
13+
14+
trace $CLI bundle run my_job
15+
trace print_requests
16+
17+
# We expect an error because the task does not exist
18+
trace musterr $CLI bundle run my_job --only non_existent_task
19+
20+
# We expect an error because one of the tasks does not exist
21+
trace musterr $CLI bundle run my_job --only non_existent_task,task_1
22+
23+
# We expect these values to be send through
24+
trace $CLI bundle run my_job --only "task1.table1,task2>,task3>table3"
25+
trace print_requests
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Local = true
2+
Cloud = false
3+
4+
RecordRequests = true

bundle/run/job_options.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package run
22

33
import (
44
"errors"
5+
"fmt"
6+
"regexp"
57
"strconv"
68

79
"github.com/databricks/cli/bundle/config/resources"
@@ -25,10 +27,14 @@ type JobOptions struct {
2527
// If a job uses job parameters, it cannot use task parameters.
2628
// Also see https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks.
2729
jobParams map[string]string
30+
31+
// only is a list of task keys to run. If not specified, the full job is run.
32+
only []string
2833
}
2934

3035
func (o *JobOptions) DefineJobOptions(fs *flag.FlagSet) {
3136
fs.StringToStringVar(&o.jobParams, "params", nil, "comma separated k=v pairs for job parameters")
37+
fs.StringSliceVar(&o.only, "only", nil, "comma separated list of task keys to run")
3238
}
3339

3440
func (o *JobOptions) DefineTaskOptions(fs *flag.FlagSet) {
@@ -57,6 +63,9 @@ func (o *JobOptions) hasJobParametersConfigured() bool {
5763
return len(o.jobParams) > 0
5864
}
5965

66+
// regex to match valid task keys as defined in https://docs.databricks.com/api/workspace/jobs/create#tasks-task_key
67+
var taskKeyRegex = regexp.MustCompile(`^[\w\-\_]+$`)
68+
6069
// Validate returns if the combination of options is valid.
6170
func (o *JobOptions) Validate(job *resources.Job) error {
6271
if job == nil {
@@ -72,6 +81,26 @@ func (o *JobOptions) Validate(job *resources.Job) error {
7281
return errors.New("the job to run does not define job parameters; specifying job parameters is not allowed")
7382
}
7483

84+
if len(o.only) > 0 {
85+
for _, task := range o.only {
86+
// Skip if did not match the regex. It can mean that the more complex syntax like "task1.table1" is used.
87+
if !taskKeyRegex.MatchString(task) {
88+
continue
89+
}
90+
91+
found := false
92+
for _, t := range job.Tasks {
93+
if t.TaskKey == task {
94+
found = true
95+
break
96+
}
97+
}
98+
if !found {
99+
return fmt.Errorf("task %#v not found in job %#v", task, job.Name)
100+
}
101+
}
102+
}
103+
75104
return nil
76105
}
77106

@@ -121,6 +150,7 @@ func (o *JobOptions) toPayload(job *resources.Job, jobID int64) (*jobs.RunNow, e
121150
SqlParams: o.sqlParams,
122151

123152
JobParameters: o.jobParams,
153+
Only: o.only,
124154
}
125155

126156
return payload, nil

0 commit comments

Comments
 (0)