Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Library for leveraging the power of Sync"""

__version__ = "1.6.0"
__version__ = "1.7.0-alpha"

TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Empty file added sync/databricks/__init__.py
Empty file.
Empty file.
111 changes: 111 additions & 0 deletions sync/databricks/integrations/_run_submit_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging

from sync import _databricks as databricks
from sync.api import projects
from sync.models import Platform
from sync.api import workspace


logger = logging.getLogger(__name__)

compute_provider_platform_dict = {
"aws": Platform.AWS_DATABRICKS,
"azure": Platform.AZURE_DATABRICKS
}


def apply_sync_gradient_cluster_recommendation(
run_submit_task: dict,
gradient_app_id: str,
auto_apply: bool,
cluster_log_url: str,
workspace_id: str
) -> dict:
response = workspace.get_workspace_config(workspace_id)
workspace_config = response.result

project_id = create_or_fetch_project(gradient_app_id, cluster_log_url, workspace_config.get("compute_provider"))

# if recommendation exists apply the configuration or apply default configuration with project settings
if auto_apply:
logger.info(f"Generating recommendation - app_id: {gradient_app_id}, project_id: {project_id}, "
f"auto_apply: {auto_apply}")
rec = get_gradient_recommendation(project_id)
if rec:
logger.info(f"Recommendation generated - app_id: {gradient_app_id}, project_id: {project_id}, "
f"recommendation: {rec}")
updated_cluster = rec
else:
logger.warning(f"Unable to generate recommendation. Falling back to original cluster - "
f"app_id: {gradient_app_id}, project_id: {project_id}, "
f"auto_apply: {auto_apply}, cluster: {run_submit_task['new_cluster']}")
updated_cluster = run_submit_task["new_cluster"]
else:
updated_cluster = run_submit_task["new_cluster"]

resp = databricks.get_project_cluster(updated_cluster, project_id)
if resp.result:
configured_cluster = resp.result
run_submit_task["new_cluster"] = configured_cluster
run_submit_task = apply_webhook_notification(workspace_config, run_submit_task)
else:
logger.error("Unable to apply gradient configuration to databricks run submit call. "
"Submitting original run submit call.")

return run_submit_task


def create_or_fetch_project(app_id: str, cluster_log_url: str, compute_provider: str) -> str:
resp = projects.get_project_by_app_id(app_id)
if resp.result is None:
logger.info(f"Project with app_id does not exist. Creating project - app_id:{app_id}, "
f"cluster_log_url:{cluster_log_url}, compute_provider:{compute_provider}")
resp = projects.create_project(
name=app_id,
app_id=app_id,
cluster_log_url=cluster_log_url,
product_code=compute_provider_platform_dict.get(compute_provider) or Platform.AWS_DATABRICKS
)
else:
logger.info(f"Found project with app_id - app_id:{app_id}")

return resp.result['id']


def get_gradient_recommendation(project_id: str) -> dict:
"""
Generates/retrieves the recommendation and returns the cluster configuration

Args: None

Returns: cluster config
"""
response = projects.create_project_recommendation(project_id)
recommendation_id = response.result

if recommendation_id is None:
return None

response = projects.wait_for_recommendation(project_id, recommendation_id)

return response.result['recommendation']['configuration']


def apply_webhook_notification(workspace_config: dict, task: dict) -> dict:
webhook_id = workspace_config["webhook_id"]
if workspace_config.get("collection_type") == "hosted":
task = append_webhook(task, webhook_id, "on_start")
else:
task = append_webhook(task, webhook_id, "on_success")
return task


def append_webhook(task: dict, webhook_id: str, event: str) -> dict:
webhook_request = {
"id": webhook_id
}
task["webhook_notifications"] = task.get("webhook_notifications") or {}
task["webhook_notifications"][event] = task["webhook_notifications"].get(event) or []
task["webhook_notifications"][event].append(webhook_request)

return task
39 changes: 39 additions & 0 deletions sync/databricks/integrations/airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging

from sync.databricks.integrations._run_submit_runner import (
apply_sync_gradient_cluster_recommendation,
)

logger = logging.getLogger(__name__)


def airflow_gradient_pre_execute_hook(context: dict):
try:
logger.info("Running airflow gradient pre-execute hook!")
logger.debug(f"Airflow operator context - context:{context}")

task_id = context["task"].task_id
gradient_app_id = context["params"]["gradient_app_id"]
auto_apply = context["params"]["gradient_auto_apply"]
cluster_log_url = context["params"]["cluster_log_url"]
workspace_id = context["params"]["databricks_workspace_id"]
run_submit_task = context[
"task"
].json.copy() # copy the run submit json from the task context

updated_task_configuration = apply_sync_gradient_cluster_recommendation(
run_submit_task=run_submit_task,
gradient_app_id=build_app_id(task_id, gradient_app_id),
auto_apply=auto_apply,
cluster_log_url=cluster_log_url,
workspace_id=workspace_id,
)

context["task"].json = updated_task_configuration
except Exception as e:
logger.exception(e)
logger.error("Unable to apply gradient configuration to Databricks run submit tasks")


def build_app_id(task_id: str, app_id: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def build_app_id(task_id: str, app_id: str):
def build_app_id(task_id: str, app_id: str) -> str:

return f"{task_id}-{app_id}"