diff --git a/databricks_cli/jobs/api.py b/databricks_cli/jobs/api.py index 68228b03..319f6262 100644 --- a/databricks_cli/jobs/api.py +++ b/databricks_cli/jobs/api.py @@ -20,14 +20,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from copy import deepcopy + +from databricks_cli.clusters.api import ClusterApi from databricks_cli.sdk import JobsService class JobsApi(object): def __init__(self, api_client): + self.api_client = api_client self.client = JobsService(api_client) def create_job(self, json, headers=None): + json = self._convert_cluster_name_to_id(json) return self.client.client.perform_query('POST', '/jobs/create', data=json, headers=headers) def list_jobs(self, headers=None): @@ -43,6 +49,8 @@ def get_job(self, job_id, headers=None): return self.client.get_job(job_id, headers=headers) def reset_job(self, json, headers=None): + # reset should support cluster_name: + json = self._convert_cluster_name_to_id(json) return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers) def run_now(self, job_id, jar_params, notebook_params, python_params, spark_submit_params, @@ -54,3 +62,40 @@ def _list_jobs_by_name(self, name, headers=None): jobs = self.list_jobs(headers=headers)['jobs'] result = list(filter(lambda job: job['settings']['name'] == name, jobs)) return result + + def clone_job(self, job_id, job_name, headers=None): + job_info = self.get_job(job_id, headers=headers) + if 'settings' not in job_info: + # failure + return job_info + + upload_json = deepcopy(job_info['settings']) + upload_json['name'] = job_name + + return self.create_job(json=upload_json, headers=headers) + + def _convert_cluster_name_to_id(self, json): + """ + If json contains cluster_name instead of existing_cluster_id, convert it to a cluster_id + :return: json + """ + + cluster_data = json + if 'new_settings' in json: + cluster_data = json['new_settings'] + + # early out the easy things + if not json or 'existing_cluster_id' in cluster_data: + return json + + if 'cluster_name' in cluster_data: + cluster_id = self._get_cluster_id(cluster_data['cluster_name']) + cluster_data['existing_cluster_id'] = cluster_id + del cluster_data['cluster_name'] + return json + + def _get_cluster_id(self, cluster_name): + # at this point we might have cluster_name + # lookup the cluster. + clusters_api = ClusterApi(self.api_client) + return clusters_api.get_cluster_id_for_name(cluster_name) diff --git a/databricks_cli/jobs/cli.py b/databricks_cli/jobs/cli.py index 08b73c51..21593959 100644 --- a/databricks_cli/jobs/cli.py +++ b/databricks_cli/jobs/cli.py @@ -24,13 +24,16 @@ from json import loads as json_loads import click +from cstriggers.core.trigger import QuartzCron from tabulate import tabulate -from databricks_cli.click_types import OutputClickType, JsonClickType, JobIdClickType -from databricks_cli.jobs.api import JobsApi -from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, json_cli_base, \ - truncate_string +from databricks_cli.click_types import OutputClickType, JsonClickType, \ + JobIdClickType, ClusterIdClickType, OptionalOneOfOption +from databricks_cli.clusters.api import ClusterApi from databricks_cli.configure.config import provide_api_client, profile_option, debug_option +from databricks_cli.jobs.api import JobsApi +from databricks_cli.utils import eat_exceptions, CLUSTER_OPTIONS, CONTEXT_SETTINGS, pretty_format, \ + json_cli_base, truncate_string from databricks_cli.version import print_version_callback, version @@ -92,14 +95,58 @@ def _jobs_to_table(jobs_json): return sorted(ret, key=lambda t: t[1].lower()) +def list_all_jobs(api_client, cluster_id, cluster_name): + jobs_api = JobsApi(api_client) + jobs_json = jobs_api.list_jobs() + + output = jobs_json + + if cluster_id or cluster_name: + output = {'jobs': []} + clusters_api = ClusterApi(api_client) + if cluster_name: + cluster_id = clusters_api.get_cluster_id_for_name(cluster_name) + + for job in jobs_json.get('jobs'): + settings = job.get('settings') + if settings.get('existing_cluster_id') == cluster_id: + output['jobs'].append(job) + + return output + + +def get_next_runs(jobs_data): + start_iso = '2019-01-01T00:00:00' + end_iso = '2025-01-01T00:00:00' + jobs = jobs_data['jobs'] + runs = {} + for job in jobs: + if 'schedule' in job['settings'] and \ + 'quartz_cron_expression' in job['settings']['schedule']: + expr = job['settings']['schedule']['quartz_cron_expression'].replace("*", "0") + expr = f'{expr} 2020-2030' + cron = QuartzCron(schedule_string=expr, start_date=start_iso, end_date=end_iso) + next_run = cron.next_trigger(isoformat=True) + runs[job['settings']['name']] = next_run.replace('2020-01-01T', '') + + return runs + + @click.command(context_settings=CONTEXT_SETTINGS, short_help='Lists the jobs in the Databricks Job Service.') -@click.option('--output', default=None, help=OutputClickType.help, type=OutputClickType()) +@click.option('--cluster-id', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS, + type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help, + required=False) +@click.option('--cluster-name', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS, + type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help, + required=False) +@click.option('--output', '-o', 'output_type', default=None, + help=OutputClickType.help, type=OutputClickType()) @debug_option @profile_option @eat_exceptions @provide_api_client -def list_cli(api_client, output): +def list_cli(api_client, cluster_id, cluster_name, output_type): """ Lists the jobs in the Databricks Job Service. @@ -113,12 +160,12 @@ def list_cli(api_client, output): In table mode, the jobs are sorted by their name. """ - jobs_api = JobsApi(api_client) - jobs_json = jobs_api.list_jobs() - if OutputClickType.is_json(output): - click.echo(pretty_format(jobs_json)) + + output = list_all_jobs(api_client=api_client, cluster_id=cluster_id, cluster_name=cluster_name) + if OutputClickType.is_json(output_type): + click.echo(pretty_format(output)) else: - click.echo(tabulate(_jobs_to_table(jobs_json), tablefmt='plain', disable_numparse=True)) + click.echo(tabulate(_jobs_to_table(output), tablefmt='plain', disable_numparse=True)) @click.command(context_settings=CONTEXT_SETTINGS, @@ -181,6 +228,51 @@ def run_now_cli(api_client, job_id, jar_params, notebook_params, python_params, click.echo(pretty_format(res)) +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--job-id', required=True, type=JobIdClickType(), help=JobIdClickType.help) +@click.option('--job-name', required=True, help=JobIdClickType.help) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def clone_cli(api_client, job_id, job_name): + """ + Clones an existing job + """ + click.echo(pretty_format(JobsApi(api_client).clone_job(job_id, job_name))) + + +@click.command(context_settings=CONTEXT_SETTINGS, + short_help="Lists the next run time for scheduled jobs.") +@click.option('--cluster-id', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS, + type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help, + required=False) +@click.option('--cluster-name', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS, + type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help, + required=False) +@click.option('--output', '-o', 'output_type', default=None, + help=OutputClickType.help, type=OutputClickType()) +@debug_option +@profile_option +@eat_exceptions +@provide_api_client +def next_runs_cli(api_client, cluster_id, cluster_name, output_type): + """ + Lists the next run time for scheduled jobs. + + Parameter options are specified in json and the format is documented in + https://docs.databricks.com/api/latest/jobs.html#jobsrunnow. + """ + jobs_data = list_all_jobs(api_client=api_client, + cluster_id=cluster_id, cluster_name=cluster_name) + + output = get_next_runs(jobs_data=jobs_data) + if OutputClickType.is_json(output_type): + click.echo(pretty_format(output)) + else: + click.echo(tabulate(_jobs_to_table(output), tablefmt='plain', disable_numparse=True)) + + @click.group(context_settings=CONTEXT_SETTINGS, short_help='Utility to interact with jobs.') @click.option('--version', '-v', is_flag=True, callback=print_version_callback, @@ -199,8 +291,10 @@ def jobs_group(): # pragma: no cover jobs_group.add_command(create_cli, name='create') -jobs_group.add_command(list_cli, name='list') +jobs_group.add_command(clone_cli, name='clone') jobs_group.add_command(delete_cli, name='delete') jobs_group.add_command(get_cli, name='get') +jobs_group.add_command(list_cli, name='list') +jobs_group.add_command(next_runs_cli, name='next-runs') jobs_group.add_command(reset_cli, name='reset') jobs_group.add_command(run_now_cli, name='run-now') diff --git a/setup.py b/setup.py index eaa34d03..c47a58e8 100644 --- a/setup.py +++ b/setup.py @@ -39,6 +39,8 @@ 'tabulate>=0.7.7', 'six>=1.10.0', 'configparser>=0.3.5;python_version < "3.6"', + 'cron-schedule-triggers>=0.0.11', + 'tenacity>=6.2.0' ], entry_points=''' [console_scripts] @@ -57,6 +59,7 @@ 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3.6', 'License :: OSI Approved :: Apache Software License', + 'Development Status :: 5 - Production/Stable' ], keywords='databricks cli', url='https://github.com/databricks/databricks-cli'