@@ -62,11 +62,11 @@ class DLTMETATechSummitDemo(DLTMETARunner):
6262 - workspace_client: Databricks workspace client.
6363 - base_dir: Base directory.
6464 """
65- def __init__ (self , args , workspace_client , base_dir ):
65+ def __init__ (self , args , ws , base_dir ):
6666 self .args = args
67- self .workspace_client = workspace_client
67+ self .ws = ws
6868 self .base_dir = base_dir
69-
69+
7070 def init_runner_conf (self ) -> TechsummitRunnerConf :
7171 """
7272 Initializes the TechsummitRunnerConf object with the provided configuration parameters.
@@ -78,19 +78,20 @@ def init_runner_conf(self) -> TechsummitRunnerConf:
7878 print (f"run_id={ run_id } " )
7979 runner_conf = TechsummitRunnerConf (
8080 run_id = run_id ,
81+ username = self ._my_username (self .ws ),
8182 dbfs_tmp_path = f"{ self .args .__dict__ ['dbfs_path' ]} /{ run_id } " ,
8283 dlt_meta_schema = f"dlt_meta_dataflowspecs_demo_{ run_id } " ,
8384 bronze_schema = f"dlt_meta_bronze_demo_{ run_id } " ,
8485 silver_schema = f"dlt_meta_silver_demo_{ run_id } " ,
8586 runners_full_local_path = './demo/dbc/tech_summit_dlt_meta_runners.dbc' ,
86- runners_nb_path = f"/Users/{ self .args . __dict__ [ 'username' ] } /dlt_meta_techsummit_demo/{ run_id } " ,
87+ runners_nb_path = f"/Users/{ self ._my_username ( self . ws ) } /dlt_meta_techsummit_demo/{ run_id } " ,
8788 node_type_id = cloud_node_type_id_dict [self .args .__dict__ ['cloud_provider_name' ]],
8889 dbr_version = self .args .__dict__ ['dbr_version' ],
8990 env = "prod" ,
9091 table_count = self .args .__dict__ ['table_count' ] if self .args .__dict__ ['table_count' ] else "100" ,
9192 table_column_count = (self .args .__dict__ ['table_column_count' ] if self .args .__dict__ ['table_column_count' ]
9293 else "5" ),
93- table_data_rows_count = (self .args .__dict__ ['table_data_rows_count' ]
94+ table_data_rows_count = (self .args .__dict__ ['table_data_rows_count' ]
9495 if self .args .__dict__ ['table_data_rows_count' ] else "10" ),
9596 worker_nodes = self .args .__dict__ ['worker_nodes' ] if self .args .__dict__ ['worker_nodes' ] else "4" ,
9697 source = self .args .__dict__ ['source' ],
@@ -110,24 +111,24 @@ def init_dltmeta_runner_conf(self, runner_conf: DLTMetaRunnerConf):
110111 - runner_conf: The DLTMetaRunnerConf object containing the runner configuration parameters.
111112 """
112113 fp = open (runner_conf .runners_full_local_path , "rb" )
113- self .workspace_client .workspace .mkdirs (runner_conf .runners_nb_path )
114- self .workspace_client .workspace .upload (path = f"{ runner_conf .runners_nb_path } /runners" ,
115- format = ImportFormat .DBC , content = fp .read ())
114+ self .ws .workspace .mkdirs (runner_conf .runners_nb_path )
115+ self .ws .workspace .upload (path = f"{ runner_conf .runners_nb_path } /runners" ,
116+ format = ImportFormat .DBC , content = fp .read ())
116117 if runner_conf .uc_catalog_name :
117- SchemasAPI (self .workspace_client .api_client ).create (catalog_name = runner_conf .uc_catalog_name ,
118- name = runner_conf .dlt_meta_schema ,
119- comment = "dlt_meta framework schema" )
120- volume_info = self .workspace_client .volumes .create (catalog_name = runner_conf .uc_catalog_name ,
121- schema_name = runner_conf .dlt_meta_schema ,
122- name = runner_conf .uc_volume_name ,
123- volume_type = VolumeType .MANAGED )
118+ SchemasAPI (self .ws .api_client ).create (catalog_name = runner_conf .uc_catalog_name ,
119+ name = runner_conf .dlt_meta_schema ,
120+ comment = "dlt_meta framework schema" )
121+ volume_info = self .ws .volumes .create (catalog_name = runner_conf .uc_catalog_name ,
122+ schema_name = runner_conf .dlt_meta_schema ,
123+ name = runner_conf .uc_volume_name ,
124+ volume_type = VolumeType .MANAGED )
124125 runner_conf .volume_info = volume_info
125- SchemasAPI (self .workspace_client .api_client ).create (catalog_name = runner_conf .uc_catalog_name ,
126- name = runner_conf .bronze_schema ,
127- comment = "bronze_schema" )
128- SchemasAPI (self .workspace_client .api_client ).create (catalog_name = runner_conf .uc_catalog_name ,
129- name = runner_conf .silver_schema ,
130- comment = "silver_schema" )
126+ SchemasAPI (self .ws .api_client ).create (catalog_name = runner_conf .uc_catalog_name ,
127+ name = runner_conf .bronze_schema ,
128+ comment = "bronze_schema" )
129+ SchemasAPI (self .ws .api_client ).create (catalog_name = runner_conf .uc_catalog_name ,
130+ name = runner_conf .silver_schema ,
131+ comment = "silver_schema" )
131132
132133 self .build_and_upload_package (runner_conf ) # comment this line before merging to master
133134
@@ -160,9 +161,9 @@ def launch_workflow(self, runner_conf: DLTMetaRunnerConf):
160161 runner_conf .job_id = created_job .job_id
161162 print (f"Job created successfully. job_id={ created_job .job_id } , started run..." )
162163 print (f"Waiting for job to complete. run_id={ created_job .job_id } " )
163- run_by_id = self .workspace_client .jobs .run_now (job_id = created_job .job_id ).result ()
164+ run_by_id = self .ws .jobs .run_now (job_id = created_job .job_id ).result ()
164165 print (f"Job run finished. run_id={ run_by_id } " )
165-
166+
166167 def create_techsummit_demo_workflow (self , runner_conf : TechsummitRunnerConf ):
167168 """
168169 Creates the workflow for the Techsummit Demo by defining the tasks and their dependencies.
@@ -174,7 +175,7 @@ def create_techsummit_demo_workflow(self, runner_conf: TechsummitRunnerConf):
174175 - created_job: The created job object.
175176 """
176177 database , dlt_lib = self .init_db_dltlib (runner_conf )
177- return self .workspace_client .jobs .create (
178+ return self .ws .jobs .create (
178179 name = f"dlt-meta-dais-demo-{ runner_conf .run_id } " ,
179180 tasks = [
180181 jobs .Task (
@@ -240,7 +241,6 @@ def create_techsummit_demo_workflow(self, runner_conf: TechsummitRunnerConf):
240241
241242
242243techsummit_args_map = {"--profile" : "provide databricks cli profile name, if not provide databricks_host and token" ,
243- "--username" : "provide databricks username, this is required to upload runners notebook" ,
244244 "--source" : "provide --source=cloudfiles" ,
245245 "--uc_catalog_name" : "provide databricks uc_catalog name, \
246246 this is required to create volume, schema, table" ,
@@ -253,7 +253,7 @@ def create_techsummit_demo_workflow(self, runner_conf: TechsummitRunnerConf):
253253 "--table_data_rows_count" : "table_data_rows_count"
254254 }
255255
256- techsummit_mandatory_args = ["username" , " source" , "cloud_provider_name" , "dbr_version" , "dbfs_path" ]
256+ techsummit_mandatory_args = ["source" , "cloud_provider_name" , "dbr_version" , "dbfs_path" ]
257257
258258
259259def main ():
0 commit comments