1+ import dataclasses
2+ from typing import Literal
3+
14import pytest
2- from databricks .sdk . errors import NotFound
5+ from databricks .labs . lsql . core import Row
36
47from databricks .labs .ucx .framework .utils import escape_sql_identifier
58from databricks .labs .ucx .hive_metastore .tables import Table
69
710
811@pytest .mark .parametrize (
9- "prepare_tables_for_migration, workflow" ,
12+ "scenario, workflow" ,
1013 [
1114 ("regular" , "migrate-tables" ),
1215 ("hiveserde" , "migrate-external-hiveserde-tables-in-place-experimental" ),
1316 ("hiveserde" , "migrate-external-tables-ctas" ),
1417 ],
15- indirect = ("prepare_tables_for_migration" ,),
1618)
1719def test_table_migration_job_refreshes_migration_status (
18- ws ,
1920 installation_ctx ,
20- prepare_tables_for_migration ,
21- workflow ,
22- ):
21+ scenario : Literal ["regular" , "hiveserde" ],
22+ workflow : str ,
23+ make_table_migration_context ,
24+ ) -> None :
2325 """The migration status should be refreshed after the migration job."""
24- tables , _ = prepare_tables_for_migration
26+ tables , _ = make_table_migration_context ( scenario , installation_ctx )
2527 ctx = installation_ctx .replace (
28+ config_transform = lambda wc : dataclasses .replace (
29+ wc ,
30+ skip_tacl_migration = True ,
31+ ),
2632 extend_prompts = {
2733 r".*Do you want to update the existing installation?.*" : 'yes' ,
2834 },
2935 )
3036
3137 ctx .workspace_installation .run ()
32- ctx .deployed_workflows .run_workflow (workflow )
38+ ctx .deployed_workflows .run_workflow (workflow , skip_job_wait = True )
39+
40+ assert ctx .deployed_workflows .validate_step (workflow )
3341
3442 # Avoiding MigrationStatusRefresh as it will refresh the status before fetching
3543 migration_status_query = f"SELECT * FROM { ctx .config .inventory_database } .migration_status"
@@ -62,85 +70,76 @@ def test_table_migration_job_refreshes_migration_status(
6270 assert len (asserts ) == 0 , assert_message
6371
6472
65- @pytest .mark .parametrize (
66- "prepare_tables_for_migration,workflow" ,
67- [
68- ("managed" , "migrate-tables" ),
69- ],
70- indirect = ("prepare_tables_for_migration" ,),
71- )
72- def test_table_migration_for_managed_table (ws , installation_ctx , prepare_tables_for_migration , workflow , sql_backend ):
73- # This test cases test the CONVERT_TO_EXTERNAL scenario.
74- tables , dst_schema = prepare_tables_for_migration
73+ def test_table_migration_convert_manged_to_external (installation_ctx , make_table_migration_context ) -> None :
74+ tables , dst_schema = make_table_migration_context ("managed" , installation_ctx )
7575 ctx = installation_ctx .replace (
76+ config_transform = lambda wc : dataclasses .replace (
77+ wc ,
78+ skip_tacl_migration = True ,
79+ ),
7680 extend_prompts = {
7781 r"If hive_metastore contains managed table with external.*" : "0" ,
7882 r".*Do you want to update the existing installation?.*" : 'yes' ,
7983 },
8084 )
8185
8286 ctx .workspace_installation .run ()
83- ctx .deployed_workflows .run_workflow (workflow )
87+ ctx .deployed_workflows .run_workflow ("migrate-tables" , skip_job_wait = True )
88+
89+ assert ctx .deployed_workflows .validate_step ("migrate-tables" )
8490
91+ missing_tables = set [str ]()
8592 for table in tables .values ():
86- try :
87- assert ws .tables .get (f"{ dst_schema .catalog_name } .{ dst_schema .name } .{ table .name } " ).name
88- except NotFound :
89- assert False , f"{ table .name } not found in { dst_schema .catalog_name } .{ dst_schema .name } "
90- managed_table = tables ["src_managed_table" ]
93+ migrated_table_name = f"{ dst_schema .catalog_name } .{ dst_schema .name } .{ table .name } "
94+ if not ctx .workspace_client .tables .exists (migrated_table_name ):
95+ missing_tables .add (migrated_table_name )
96+ assert not missing_tables , f"Missing migrated tables: { missing_tables } "
9197
92- for key , value , _ in sql_backend .fetch (f"DESCRIBE TABLE EXTENDED { escape_sql_identifier (managed_table .full_name )} " ):
98+ managed_table = tables ["src_managed_table" ]
99+ for key , value , _ in ctx .sql_backend .fetch (
100+ f"DESCRIBE TABLE EXTENDED { escape_sql_identifier (managed_table .full_name )} "
101+ ):
93102 if key == "Type" :
94103 assert value == "EXTERNAL"
95104 break
96105
97106
98- @pytest .mark .parametrize ('prepare_tables_for_migration' , [('hiveserde' )], indirect = True )
99- def test_hiveserde_table_in_place_migration_job (ws , installation_ctx , prepare_tables_for_migration ):
100- tables , dst_schema = prepare_tables_for_migration
107+ @pytest .mark .parametrize (
108+ "workflow" , ["migrate-external-hiveserde-tables-in-place-experimental" , "migrate-external-tables-ctas" ]
109+ )
110+ def test_hiveserde_table_in_place_migration_job (installation_ctx , make_table_migration_context , workflow ) -> None :
111+ tables , dst_schema = make_table_migration_context ("hiveserde" , installation_ctx )
101112 ctx = installation_ctx .replace (
113+ config_transform = lambda wc : dataclasses .replace (
114+ wc ,
115+ skip_tacl_migration = True ,
116+ ),
102117 extend_prompts = {
103118 r".*Do you want to update the existing installation?.*" : 'yes' ,
104119 },
105120 )
106121 ctx .workspace_installation .run ()
107- ctx .deployed_workflows .run_workflow ("migrate-external-hiveserde-tables-in-place-experimental" )
108- # assert the workflow is successful
109- assert ctx .deployed_workflows .validate_step ("migrate-external-hiveserde-tables-in-place-experimental" )
110- # assert the tables are migrated
122+
123+ ctx .deployed_workflows .run_workflow (workflow , skip_job_wait = True )
124+
125+ assert installation_ctx .deployed_workflows .validate_step (workflow ), f"Workflow failed: { workflow } "
126+ missing_tables = set [str ]()
111127 for table in tables .values ():
112- try :
113- assert ws . tables .get ( f" { dst_schema . catalog_name } . { dst_schema . name } . { table . name } " ). name
114- except NotFound :
115- assert False , f"{ table . name } not found in { dst_schema . catalog_name } . { dst_schema . name } "
128+ migrated_table_name = f" { dst_schema . catalog_name } . { dst_schema . name } . { table . name } "
129+ if not ctx . workspace_client . tables .exists ( migrated_table_name ):
130+ missing_tables . add ( migrated_table_name )
131+ assert not missing_tables , f"Missing migrated tables: { missing_tables } "
116132
117133
118- @pytest .mark .parametrize ('prepare_tables_for_migration' , [('hiveserde' )], indirect = True )
119- def test_hiveserde_table_ctas_migration_job (ws , installation_ctx , prepare_tables_for_migration ):
120- tables , dst_schema = prepare_tables_for_migration
134+ def test_table_migration_job_publishes_remaining_tables (installation_ctx , make_table_migration_context ) -> None :
135+ tables , dst_schema = make_table_migration_context ("regular" , installation_ctx )
121136 ctx = installation_ctx .replace (
122- extend_prompts = {
123- r".*Do you want to update the existing installation?.*" : 'yes' ,
124- },
137+ config_transform = lambda wc : dataclasses .replace (
138+ wc ,
139+ skip_tacl_migration = True ,
140+ ),
125141 )
126142 ctx .workspace_installation .run ()
127- ctx .deployed_workflows .run_workflow ("migrate-external-tables-ctas" )
128- # assert the workflow is successful
129- assert ctx .deployed_workflows .validate_step ("migrate-external-tables-ctas" )
130- # assert the tables are migrated
131- for table in tables .values ():
132- try :
133- assert ws .tables .get (f"{ dst_schema .catalog_name } .{ dst_schema .name } .{ table .name } " ).name
134- except NotFound :
135- assert False , f"{ table .name } not found in { dst_schema .catalog_name } .{ dst_schema .name } "
136-
137-
138- @pytest .mark .parametrize ('prepare_tables_for_migration' , ['regular' ], indirect = True )
139- def test_table_migration_job_publishes_remaining_tables (
140- ws , installation_ctx , sql_backend , prepare_tables_for_migration , caplog
141- ):
142- tables , dst_schema = prepare_tables_for_migration
143- installation_ctx .workspace_installation .run ()
144143 second_table = list (tables .values ())[1 ]
145144 table = Table (
146145 "hive_metastore" ,
@@ -149,19 +148,20 @@ def test_table_migration_job_publishes_remaining_tables(
149148 object_type = "UNKNOWN" ,
150149 table_format = "UNKNOWN" ,
151150 )
152- installation_ctx .table_mapping .skip_table_or_view (dst_schema .name , second_table .name , load_table = lambda * _ : table )
153- installation_ctx . deployed_workflows . run_workflow ( "migrate-tables" )
154- assert installation_ctx .deployed_workflows .validate_step ("migrate-tables" )
151+ ctx .table_mapping .skip_table_or_view (dst_schema .name , second_table .name , load_table = lambda * _ : table )
152+
153+ ctx .deployed_workflows .run_workflow ("migrate-tables" , skip_job_wait = True )
155154
155+ assert ctx .deployed_workflows .validate_step ("migrate-tables" )
156156 remaining_tables = list (
157- sql_backend .fetch (
157+ ctx . sql_backend .fetch (
158158 f"""
159159 SELECT
160160 SUBSTRING(message, LENGTH('remained-hive-metastore-table: ') + 1)
161161 AS message
162- FROM { installation_ctx .inventory_database } .logs
162+ FROM { ctx .inventory_database } .logs
163163 WHERE message LIKE 'remained-hive-metastore-table: %'
164164 """
165165 )
166166 )
167- assert remaining_tables [ 0 ]. message == f' hive_metastore.{ dst_schema .name } .{ second_table .name } '
167+ assert remaining_tables == [ Row ( message = f" hive_metastore.{ dst_schema .name } .{ second_table .name } " )]
0 commit comments