6969
7070# COMMAND ----------
7171
72- # MAGIC %pip install /Workspace {remote_wheel}
72+ # MAGIC %pip install {remote_wheel}
7373dbutils.library.restartPython()
7474
7575# COMMAND ----------
9292"""
9393
9494TEST_RUNNER_NOTEBOOK = """# Databricks notebook source
95- # MAGIC %pip install /Workspace {remote_wheel}
95+ # MAGIC %pip install {remote_wheel}
9696dbutils.library.restartPython()
9797
9898# COMMAND ----------
@@ -403,14 +403,14 @@ def __init__( # pylint: disable=too-many-arguments
403403 super ().__init__ (config , installation , ws )
404404
405405 def create_jobs (self ):
406- remote_wheel = self ._upload_wheel ()
406+ remote_wheels = self ._upload_wheel ()
407407 desired_workflows = {t .workflow for t in self ._tasks if t .cloud_compatible (self ._ws .config )}
408408 wheel_runner = None
409409
410410 if self ._config .override_clusters :
411- wheel_runner = self ._upload_wheel_runner (remote_wheel )
411+ wheel_runner = self ._upload_wheel_runner (remote_wheels )
412412 for workflow_name in desired_workflows :
413- settings = self ._job_settings (workflow_name , remote_wheel )
413+ settings = self ._job_settings (workflow_name , remote_wheels )
414414 if self ._config .override_clusters :
415415 settings = self ._apply_cluster_overrides (
416416 workflow_name ,
@@ -430,7 +430,7 @@ def create_jobs(self):
430430 continue
431431
432432 self ._install_state .save ()
433- self ._create_debug (remote_wheel )
433+ self ._create_debug (remote_wheels )
434434 return self ._create_readme ()
435435
436436 @property
@@ -529,13 +529,30 @@ def _deploy_workflow(self, step_name: str, settings):
529529 self ._install_state .jobs [step_name ] = str (new_job .job_id )
530530 return None
531531
532+ @staticmethod
533+ def _library_dep_order (library : str ):
534+ match library :
535+ case library if 'sdk' in library :
536+ return 0
537+ case library if 'blueprint' in library :
538+ return 1
539+ case _:
540+ return 2
541+
532542 def _upload_wheel (self ):
543+ wheel_paths = []
533544 with self ._wheels :
534- return self ._wheels .upload_to_wsfs ()
535-
536- def _upload_wheel_runner (self , remote_wheel : str ):
545+ if self ._config .upload_dependencies :
546+ wheel_paths = self ._wheels .upload_wheel_dependencies (["databricks" , "sqlglot" ])
547+ wheel_paths .sort (key = WorkflowsDeployment ._library_dep_order )
548+ wheel_paths .append (self ._wheels .upload_to_wsfs ())
549+ wheel_paths = [f"/Workspace{ wheel } " for wheel in wheel_paths ]
550+ return wheel_paths
551+
552+ def _upload_wheel_runner (self , remote_wheels : list [str ]):
537553 # TODO: we have to be doing this workaround until ES-897453 is solved in the platform
538- code = TEST_RUNNER_NOTEBOOK .format (remote_wheel = remote_wheel , config_file = self ._config_file ).encode ("utf8" )
554+ remote_wheels_str = " " .join (remote_wheels )
555+ code = TEST_RUNNER_NOTEBOOK .format (remote_wheel = remote_wheels_str , config_file = self ._config_file ).encode ("utf8" )
539556 return self ._installation .upload (f"wheels/wheel-test-runner-{ self ._product_info .version ()} .py" , code )
540557
541558 @staticmethod
@@ -559,8 +576,7 @@ def _apply_cluster_overrides(
559576 job_task .notebook_task = jobs .NotebookTask (notebook_path = wheel_runner , base_parameters = widget_values )
560577 return settings
561578
562- def _job_settings (self , step_name : str , remote_wheel : str ):
563-
579+ def _job_settings (self , step_name : str , remote_wheels : list [str ]) -> dict [str , Any ]:
564580 email_notifications = None
565581 if not self ._config .override_clusters and "@" in self ._my_username :
566582 # set email notifications only if we're running the real
@@ -577,8 +593,8 @@ def _job_settings(self, step_name: str, remote_wheel: str):
577593 if self ._skip_dashboards and task .dashboard :
578594 continue
579595 job_clusters .add (task .job_cluster )
580- job_tasks .append (self ._job_task (task , remote_wheel ))
581- job_tasks .append (self ._job_parse_logs_task (job_tasks , step_name , remote_wheel ))
596+ job_tasks .append (self ._job_task (task , remote_wheels ))
597+ job_tasks .append (self ._job_parse_logs_task (job_tasks , step_name , remote_wheels ))
582598 version = self ._product_info .version ()
583599 version = version if not self ._ws .config .is_gcp else version .replace ("+" , "-" )
584600 tags = {"version" : f"v{ version } " }
@@ -594,7 +610,7 @@ def _job_settings(self, step_name: str, remote_wheel: str):
594610 "tasks" : job_tasks ,
595611 }
596612
597- def _job_task (self , task : Task , remote_wheel : str ) -> jobs .Task :
613+ def _job_task (self , task : Task , remote_wheels : list [ str ] ) -> jobs .Task :
598614 jobs_task = jobs .Task (
599615 task_key = task .name ,
600616 job_cluster_key = task .job_cluster ,
@@ -607,7 +623,7 @@ def _job_task(self, task: Task, remote_wheel: str) -> jobs.Task:
607623 return retried_job_dashboard_task (jobs_task , task )
608624 if task .notebook :
609625 return self ._job_notebook_task (jobs_task , task )
610- return self ._job_wheel_task (jobs_task , task .workflow , remote_wheel )
626+ return self ._job_wheel_task (jobs_task , task .workflow , remote_wheels )
611627
612628 def _job_dashboard_task (self , jobs_task : jobs .Task , task : Task ) -> jobs .Task :
613629 assert task .dashboard is not None
@@ -639,8 +655,10 @@ def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
639655 ),
640656 )
641657
642- def _job_wheel_task (self , jobs_task : jobs .Task , workflow : str , remote_wheel : str ) -> jobs .Task :
643- libraries = [compute .Library (whl = f"/Workspace{ remote_wheel } " )]
658+ def _job_wheel_task (self , jobs_task : jobs .Task , workflow : str , remote_wheels : list [str ]) -> jobs .Task :
659+ libraries = []
660+ for wheel in remote_wheels :
661+ libraries .append (compute .Library (whl = wheel ))
644662 named_parameters = {
645663 "config" : f"/Workspace{ self ._config_file } " ,
646664 "workflow" : workflow ,
@@ -701,24 +719,24 @@ def _job_clusters(self, names: set[str]):
701719 )
702720 return clusters
703721
704- def _job_parse_logs_task (self , job_tasks : list [jobs .Task ], workflow : str , remote_wheel : str ) -> jobs .Task :
722+ def _job_parse_logs_task (self , job_tasks : list [jobs .Task ], workflow : str , remote_wheels : list [ str ] ) -> jobs .Task :
705723 jobs_task = jobs .Task (
706724 task_key = "parse_logs" ,
707725 job_cluster_key = Task .job_cluster ,
708726 # The task dependents on all previous tasks.
709727 depends_on = [jobs .TaskDependency (task_key = task .task_key ) for task in job_tasks ],
710728 run_if = jobs .RunIf .ALL_DONE ,
711729 )
712- return self ._job_wheel_task (jobs_task , workflow , remote_wheel )
730+ return self ._job_wheel_task (jobs_task , workflow , remote_wheels )
713731
714- def _create_debug (self , remote_wheel : str ):
732+ def _create_debug (self , remote_wheels : list [ str ] ):
715733 readme_link = self ._installation .workspace_link ('README' )
716734 job_links = ", " .join (
717735 f"[{ self ._name (step_name )} ]({ self ._ws .config .host } #job/{ job_id } )"
718736 for step_name , job_id in self ._install_state .jobs .items ()
719737 )
720738 content = DEBUG_NOTEBOOK .format (
721- remote_wheel = remote_wheel , readme_link = readme_link , job_links = job_links , config_file = self ._config_file
739+ remote_wheel = remote_wheels , readme_link = readme_link , job_links = job_links , config_file = self ._config_file
722740 ).encode ("utf8" )
723741 self ._installation .upload ('DEBUG.py' , content )
724742
0 commit comments