Skip to content

Commit 121c2a6

Browse files
authored
Add --wait option to databricks runs submit CLI command (#487)
1 parent bccbdea commit 121c2a6

File tree

3 files changed

+110
-6
lines changed

3 files changed

+110
-6
lines changed

databricks_cli/runs/cli.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
# See the License for the specific language governing permissions and
2222
# limitations under the License.
2323

24+
import sys
25+
import time
26+
from json import loads as json_loads
27+
2428
import click
2529
from tabulate import tabulate
2630

2731
from databricks_cli.click_types import OutputClickType, JsonClickType, RunIdClickType
2832
from databricks_cli.jobs.cli import check_version
29-
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, json_cli_base, \
30-
truncate_string
33+
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, truncate_string, \
34+
error_and_quit, backoff_with_jitter
3135
from databricks_cli.configure.config import provide_api_client, profile_option, debug_option, \
3236
api_version_option
3337
from databricks_cli.runs.api import RunsApi
@@ -39,21 +43,48 @@
3943
help='File containing JSON request to POST to /api/2.*/jobs/runs/submit.')
4044
@click.option('--json', default=None, type=JsonClickType(),
4145
help=JsonClickType.help('/api/2.*/jobs/runs/submit'))
46+
@click.option('--wait', is_flag=True, default=False,
47+
help='Waits for the submitted run to complete.')
4248
@api_version_option
4349
@debug_option
4450
@profile_option
4551
@eat_exceptions
4652
@provide_api_client
47-
def submit_cli(api_client, json_file, json, version):
53+
def submit_cli(api_client, json_file, json, wait, version):
4854
"""
49-
Submits a one-time run.
55+
Submits a one-time run and optionally waits for its completion.
5056
5157
The specification for the request json can be found
5258
https://docs.databricks.com/api/latest/jobs.html#runs-submit
5359
"""
5460
check_version(api_client, version)
55-
json_cli_base(json_file, json, lambda json: RunsApi(
56-
api_client).submit_run(json, version=version))
61+
if json_file:
62+
with open(json_file, 'r') as f:
63+
json = f.read()
64+
submit_res = RunsApi(api_client).submit_run(json_loads(json), version=version)
65+
click.echo(pretty_format(submit_res))
66+
if wait:
67+
run_id = submit_res['run_id']
68+
completed_states = set(['TERMINATED', 'SKIPPED', 'INTERNAL_ERROR'])
69+
prev_life_cycle_state = ""
70+
attempt = 0
71+
# Wait for run to complete
72+
while True:
73+
run = RunsApi(api_client).get_run(run_id, version=version)
74+
run_state = run['state']
75+
life_cycle_state = run_state['life_cycle_state']
76+
if life_cycle_state in completed_states:
77+
if run_state['result_state'] == 'SUCCESS':
78+
sys.exit(0)
79+
else:
80+
error_and_quit('Run failed with state ' + run_state['result_state'] +
81+
' and state message ' + run_state['state_message'])
82+
if prev_life_cycle_state != life_cycle_state:
83+
click.echo('Waiting on run to complete. Current state: ' + life_cycle_state +
84+
'. URL: ' + run['run_page_url'], err=True)
85+
prev_life_cycle_state = life_cycle_state
86+
time.sleep(backoff_with_jitter(attempt))
87+
attempt += 1
5788

5889

5990
def _runs_to_table(runs_json):

databricks_cli/utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
# See the License for the specific language governing permissions and
2222
# limitations under the License.
2323

24+
import math
25+
import random
2426
import sys
2527
import traceback
2628
from json import dumps as json_dumps, loads as json_loads
@@ -98,6 +100,20 @@ def error_and_quit(message):
98100
sys.exit(1)
99101

100102

103+
INTERVAL_MAX = 30
104+
INTERVAL_BASE = 5
105+
MAX_EXPONENT = 10
106+
107+
108+
def backoff_with_jitter(attempt):
109+
"""
110+
Creates a growing but randomized wait time based on the number of attempts already made.
111+
"""
112+
exponent = min(attempt, MAX_EXPONENT)
113+
sleep_time = min(INTERVAL_MAX, INTERVAL_BASE * 2 ** exponent)
114+
return random.randrange(math.floor(sleep_time * 0.5), sleep_time)
115+
116+
101117
def pretty_format(json, encode_utf8=False):
102118
if encode_utf8:
103119
return json_dumps(json, indent=2, ensure_ascii=False)

tests/runs/test_cli.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,25 @@
3535

3636
SUBMIT_RETURN = {'run_id': 5}
3737
SUBMIT_JSON = '{"name": "test_run"}'
38+
RUNS_GET_RETURN_SUCCESS = {
39+
"state": {
40+
"life_cycle_state": "TERMINATED",
41+
"result_state": "SUCCESS",
42+
},
43+
}
44+
RUNS_GET_RETURN_PENDING = {
45+
"state": {
46+
"life_cycle_state": "PENDING",
47+
"state_message": "Waiting for cluster",
48+
},
49+
"run_page_url": "https://www.google.com",
50+
}
51+
RUNS_GET_RETURN_RUNNING = {
52+
"state": {
53+
"life_cycle_state": "RUNNING",
54+
},
55+
"run_page_url": "https://www.google.com",
56+
}
3857

3958

4059
@pytest.fixture()
@@ -56,6 +75,44 @@ def test_submit_cli_json(runs_api_mock):
5675
assert echo_mock.call_args[0][0] == pretty_format(SUBMIT_RETURN)
5776

5877

78+
@provide_conf
79+
def test_submit_wait_success(runs_api_mock):
80+
with mock.patch('databricks_cli.runs.cli.click.echo') as echo_mock, \
81+
mock.patch('time.sleep') as sleep_mock:
82+
runs_api_mock.submit_run.return_value = SUBMIT_RETURN
83+
runs_api_mock.get_run.side_effect = [RUNS_GET_RETURN_PENDING, RUNS_GET_RETURN_PENDING, \
84+
RUNS_GET_RETURN_RUNNING, RUNS_GET_RETURN_SUCCESS]
85+
runner = CliRunner()
86+
result = runner.invoke(cli.submit_cli, ['--json', SUBMIT_JSON, '--wait'])
87+
assert runs_api_mock.get_run.call_count == 4
88+
assert sleep_mock.call_count == 3
89+
assert echo_mock.call_args[0][0] == 'Waiting on run to complete. Current state: ' + \
90+
'RUNNING. URL: https://www.google.com'
91+
assert result.exit_code == 0
92+
93+
94+
@pytest.mark.parametrize('bad_life_cycle_state', ['TERMINATED', 'SKIPPED', 'INTERNAL_ERROR'])
95+
@provide_conf
96+
def test_submit_wait_failure(runs_api_mock, bad_life_cycle_state):
97+
with mock.patch('click.echo') as echo_mock, mock.patch('time.sleep') as sleep_mock:
98+
runs_api_mock.submit_run.return_value = SUBMIT_RETURN
99+
runs_get_failed = {
100+
"state": {
101+
"life_cycle_state": bad_life_cycle_state,
102+
"result_state": "FAILED",
103+
"state_message": "OH NO!",
104+
},
105+
}
106+
runs_api_mock.get_run.side_effect = [RUNS_GET_RETURN_PENDING, RUNS_GET_RETURN_PENDING, \
107+
RUNS_GET_RETURN_RUNNING, runs_get_failed]
108+
runner = CliRunner()
109+
result = runner.invoke(cli.submit_cli, ['--json', SUBMIT_JSON, '--wait'])
110+
assert runs_api_mock.get_run.call_count == 4
111+
assert sleep_mock.call_count == 3
112+
assert 'Run failed with state FAILED and state message OH NO!' in echo_mock.call_args[0][0]
113+
assert result.exit_code == 1
114+
115+
59116
RUN_PAGE_URL = 'https://databricks.com/#job/1/run/1'
60117
LIST_RETURN = {
61118
'runs': [{

0 commit comments

Comments
 (0)