diff --git a/sync/__init__.py b/sync/__init__.py index 042e4a5..b485ec2 100644 --- a/sync/__init__.py +++ b/sync/__init__.py @@ -1,5 +1,5 @@ """Library for leveraging the power of Sync""" -__version__ = "1.6.0" +__version__ = "1.7.2-alpha" TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" diff --git a/sync/databricks/__init__.py b/sync/databricks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sync/databricks/integrations/__init__.py b/sync/databricks/integrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sync/databricks/integrations/_run_submit_runner.py b/sync/databricks/integrations/_run_submit_runner.py new file mode 100644 index 0000000..6d108cf --- /dev/null +++ b/sync/databricks/integrations/_run_submit_runner.py @@ -0,0 +1,140 @@ +import logging + +from sync import _databricks as databricks +from sync.api import projects +from sync.api import workspace +from sync.models import Platform +from sync.utils.json import deep_update + +logger = logging.getLogger(__name__) + +compute_provider_platform_dict = { + "aws": Platform.AWS_DATABRICKS, + "azure": Platform.AZURE_DATABRICKS, +} + + +def apply_sync_gradient_cluster_recommendation( + run_submit_task: dict, + gradient_app_id: str, + auto_apply: bool, + cluster_log_url: str, + workspace_id: str, +) -> dict: + tasks = run_submit_task.get("tasks") + if tasks: + if len(tasks) > 1: + logger.error( + "Unable to apply gradient configuration for run submit " "with multiple tasks." + ) + original_cluster = tasks[0]["new_cluster"] + else: + original_cluster = run_submit_task["new_cluster"] + + response = workspace.get_workspace_config(workspace_id) + workspace_config = response.result + + project_id = create_or_fetch_project( + gradient_app_id, cluster_log_url, workspace_config.get("compute_provider") + ) + + # if recommendation exists apply the configuration or apply default configuration with project settings + if auto_apply: + logger.info( + f"Generating recommendation - app_id: {gradient_app_id}, project_id: {project_id}, " + f"auto_apply: {auto_apply}" + ) + recommended_cluster = get_gradient_recommendation(project_id) + if recommended_cluster: + logger.info( + f"Recommendation generated - app_id: {gradient_app_id}, project_id: {project_id}, " + f"recommendation: {recommended_cluster}" + ) + if "num_workers" in original_cluster: + del original_cluster["num_workers"] + + if "autoscale" in original_cluster: + del original_cluster["autoscale"] + + updated_cluster = deep_update(original_cluster, recommended_cluster) + else: + logger.warning( + f"Unable to generate recommendation. Falling back to original cluster - " + f"app_id: {gradient_app_id}, project_id: {project_id}, " + f"auto_apply: {auto_apply}, cluster: {original_cluster}" + ) + updated_cluster = original_cluster + else: + updated_cluster = original_cluster + + resp = databricks.get_project_cluster(updated_cluster, project_id) + if resp.result: + configured_cluster = resp.result + if run_submit_task.get("tasks"): + run_submit_task["tasks"][0]["new_cluster"] = configured_cluster + else: + run_submit_task["new_cluster"] = configured_cluster + run_submit_task = apply_webhook_notification(workspace_config, run_submit_task) + else: + logger.error( + "Unable to apply gradient configuration to databricks run submit call. " + "Submitting original run submit call." + ) + + return run_submit_task + + +def create_or_fetch_project(app_id: str, cluster_log_url: str, compute_provider: str) -> str: + resp = projects.get_project_by_app_id(app_id) + if resp.result is None: + logger.info( + f"Project with app_id does not exist. Creating project - app_id:{app_id}, " + f"cluster_log_url:{cluster_log_url}, compute_provider:{compute_provider}" + ) + resp = projects.create_project( + name=app_id, + app_id=app_id, + cluster_log_url=cluster_log_url, + product_code=compute_provider_platform_dict.get(compute_provider) or Platform.AWS_DATABRICKS, + ) + else: + logger.info(f"Found project with app_id - app_id:{app_id}") + + return resp.result["id"] + + +def get_gradient_recommendation(project_id: str) -> dict: + """ + Generates/retrieves the recommendation and returns the cluster configuration + + Args: None + + Returns: cluster config + """ + response = projects.create_project_recommendation(project_id) + recommendation_id = response.result + + if recommendation_id is None: + return None + + response = projects.wait_for_recommendation(project_id, recommendation_id) + + return response.result["recommendation"]["configuration"] + + +def apply_webhook_notification(workspace_config: dict, task: dict) -> dict: + webhook_id = workspace_config["webhook_id"] + if workspace_config.get("collection_type") == "hosted": + task = append_webhook(task, webhook_id, "on_start") + else: + task = append_webhook(task, webhook_id, "on_success") + return task + + +def append_webhook(task: dict, webhook_id: str, event: str) -> dict: + webhook_request = {"id": webhook_id} + task["webhook_notifications"] = task.get("webhook_notifications") or {} + task["webhook_notifications"][event] = task["webhook_notifications"].get(event) or [] + task["webhook_notifications"][event].append(webhook_request) + + return task diff --git a/sync/databricks/integrations/airflow.py b/sync/databricks/integrations/airflow.py new file mode 100644 index 0000000..3b2f580 --- /dev/null +++ b/sync/databricks/integrations/airflow.py @@ -0,0 +1,39 @@ +import logging + +from sync.databricks.integrations._run_submit_runner import ( + apply_sync_gradient_cluster_recommendation, +) + +logger = logging.getLogger(__name__) + + +def airflow_gradient_pre_execute_hook(context: dict): + try: + logger.info("Running airflow gradient pre-execute hook!") + logger.debug(f"Airflow operator context - context:{context}") + + task_id = context["task"].task_id + gradient_app_id = context["params"]["gradient_app_id"] + auto_apply = context["params"]["gradient_auto_apply"] + cluster_log_url = context["params"]["cluster_log_url"] + workspace_id = context["params"]["databricks_workspace_id"] + run_submit_task = context[ + "task" + ].json.copy() # copy the run submit json from the task context + + updated_task_configuration = apply_sync_gradient_cluster_recommendation( + run_submit_task=run_submit_task, + gradient_app_id=build_app_id(task_id, gradient_app_id), + auto_apply=auto_apply, + cluster_log_url=cluster_log_url, + workspace_id=workspace_id, + ) + + context["task"].json = updated_task_configuration + except Exception as e: + logger.exception(e) + logger.error("Unable to apply gradient configuration to Databricks run submit tasks") + + +def build_app_id(task_id: str, app_id: str): + return f"{task_id}-{app_id}"