Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Library for leveraging the power of Sync"""

__version__ = "2.0.0"
__version__ = "2.0.1-alpha"

TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Empty file added sync/databricks/__init__.py
Empty file.
Empty file.
108 changes: 108 additions & 0 deletions sync/databricks/integrations/_run_submit_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging

from sync import _databricks as databricks
from sync.api import projects
from sync.models import Platform
from sync.api import workspace


logger = logging.getLogger(__name__)

compute_provider_platform_dict = {
"aws": Platform.AWS_DATABRICKS,
"azure": Platform.AZURE_DATABRICKS
}

def apply_sync_gradient_cluster_recommendation(
run_submit_task: dict,
gradient_app_id: str,
auto_apply: bool,
cluster_log_url: str,
workspace_id: str
) -> dict:
response = workspace.get_workspace_config(workspace_id)
workspace_config = response.result

project_id = create_or_fetch_project(gradient_app_id, cluster_log_url, workspace_config.get("compute_provider"))

# if recommendation exists apply the configuration or apply default configuration with project settings
if auto_apply:
logger.info(f"Generating recommendation - app_id: {gradient_app_id}, project_id: {project_id}, "
f"auto_apply: {auto_apply}")
rec = get_gradient_recommendation(project_id)
if rec:
logger.info(f"Recommendation generated - app_id: {gradient_app_id}, project_id: {project_id}, "
f"recommendation: {rec}")
updated_cluster = rec
else:
logger.warning(f"Unable to generate recommendation. Falling back to original cluster - "
f"app_id: {gradient_app_id}, project_id: {project_id}, "
f"auto_apply: {auto_apply}, cluster: {run_submit_task['new_cluster']}")
updated_cluster = run_submit_task["new_cluster"]
else:
updated_cluster = run_submit_task["new_cluster"]

resp = databricks.get_project_cluster(updated_cluster, project_id)
if resp.result:
configured_cluster = resp.result
run_submit_task["new_cluster"] = configured_cluster
run_submit_task = apply_webhook_notification(workspace_config, run_submit_task)
else:
logger.error("Unable to apply gradient configuration to databricks run submit call. "
"Submitting original run submit call.")

return run_submit_task

def create_or_fetch_project(app_id: str, cluster_log_url: str, compute_provider: str):
resp = projects.get_project_by_app_id(app_id)
if resp.result is None:
logger.info(f"Project with app_id does not exist. Creating project - app_id:{app_id}, "
f"cluster_log_url:{cluster_log_url}, compute_provider:{compute_provider}")
resp = projects.create_project(
name=app_id,
app_id=app_id,
cluster_log_url=cluster_log_url,
product_code=compute_provider_platform_dict.get(compute_provider) or Platform.AWS_DATABRICKS
)
else:
logger.info(f"Found project with app_id - app_id:{app_id}")

return resp.result['id']


def get_gradient_recommendation(project_id):
"""
Generates/retrieves the recommendation and returns the cluster configuration

Args: None

Returns: cluster config
"""
response = projects.create_project_recommendation(project_id)
recommendation_id = response.result

if recommendation_id is None:
return None

response = projects.wait_for_recommendation(project_id, recommendation_id)
print(response)
return response.result['recommendation']['configuration']


def apply_webhook_notification(workspace_config, task):
webhook_id = workspace_config["webhook_id"]
if workspace_config.get("collection_type") == "hosted":
task = append_webhook(task, webhook_id, "on_start")
else:
task = append_webhook(task, webhook_id, "on_success")
return task


def append_webhook(task: dict, webhook_id: str, event: str):
webhook_request = {
"id": webhook_id
}
task["webhook_notifications"] = task.get("webhook_notifications") or {}
task["webhook_notifications"][event] = task["webhook_notifications"].get(event) or []
task["webhook_notifications"][event].append(webhook_request)
return task
39 changes: 39 additions & 0 deletions sync/databricks/integrations/airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging

from sync.databricks.integrations._run_submit_runner import (
apply_sync_gradient_cluster_recommendation,
)

logger = logging.getLogger(__name__)


def airflow_gradient_pre_execute_hook(context):
try:
logger.info("Running airflow gradient pre-execute hook!")
logger.debug(f"Airflow operator context - context:{context}")

task_id = context["task"].task_id
gradient_app_id = context["params"]["gradient_app_id"]
auto_apply = context["params"]["gradient_auto_apply"]
cluster_log_url = context["params"]["cluster_log_url"]
workspace_id = context["params"]["databricks_workspace_id"]
run_submit_task = context[
"task"
].json.copy() # copy the run submit json from the task context

updated_task_configuration = apply_sync_gradient_cluster_recommendation(
run_submit_task=run_submit_task,
gradient_app_id=build_app_id(task_id, gradient_app_id),
auto_apply=auto_apply,
cluster_log_url=cluster_log_url,
workspace_id=workspace_id,
)

context["task"].json = updated_task_configuration
except Exception as e:
logger.exception(e)
logger.error("Unable to apply gradient configuration to Databricks run submit tasks")


def build_app_id(task_id: str, app_id: str):
return f"{task_id}-{app_id}"