|
| 1 | +import logging |
| 2 | +import os |
1 | 3 | import shutil |
2 | 4 | import subprocess |
3 | 5 | import sys |
| 6 | +from io import BytesIO |
4 | 7 | from pathlib import Path |
5 | 8 |
|
6 | 9 | import pytest |
| 10 | +from databricks.sdk.service import compute, jobs |
7 | 11 | from databricks.sdk.service.workspace import ImportFormat |
8 | 12 |
|
9 | 13 | from databricks.labs.ucx.providers.mixins.compute import CommandExecutor |
10 | 14 |
|
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
11 | 17 |
|
12 | 18 | @pytest.fixture |
13 | 19 | def fresh_wheel_file(tmp_path) -> Path: |
@@ -85,3 +91,143 @@ def test_sql_backend_works(ws, wsfs_wheel): |
85 | 91 | ) |
86 | 92 |
|
87 | 93 | assert len(database_names) > 0 |
| 94 | + |
| 95 | + |
| 96 | +def test_toolkit_notebook( |
| 97 | + ws, |
| 98 | + sql_exec, |
| 99 | + wsfs_wheel, |
| 100 | + make_cluster, |
| 101 | + make_cluster_policy, |
| 102 | + make_directory, |
| 103 | + make_ucx_group, |
| 104 | + make_instance_pool, |
| 105 | + make_job, |
| 106 | + make_notebook, |
| 107 | + make_pipeline, |
| 108 | + make_random, |
| 109 | + make_repo, |
| 110 | + make_secret_scope, |
| 111 | + make_schema, |
| 112 | + make_table, |
| 113 | + make_user, |
| 114 | +): |
| 115 | + logger.info("setting up fixtures") |
| 116 | + |
| 117 | + user_a = make_user() |
| 118 | + user_b = make_user() |
| 119 | + user_c = make_user() |
| 120 | + |
| 121 | + logger.info(f"user_a={user_a}, user_b={user_b}, user_c={user_c}, ") |
| 122 | + |
| 123 | + # TODO add users to groups |
| 124 | + ws_group_a, acc_group_a = make_ucx_group() |
| 125 | + ws_group_b, acc_group_b = make_ucx_group() |
| 126 | + ws_group_c, acc_group_c = make_ucx_group() |
| 127 | + |
| 128 | + selected_groups = ",".join([ws_group_a.display_name, ws_group_b.display_name, ws_group_c.display_name]) |
| 129 | + |
| 130 | + logger.info(f"group_a={ws_group_a}, group_b={ws_group_b}, group_c={ws_group_c}, ") |
| 131 | + |
| 132 | + cluster = make_cluster(instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], single_node=True) |
| 133 | + cluster_policy = make_cluster_policy() |
| 134 | + directory = make_directory() |
| 135 | + instance_pool = make_instance_pool() |
| 136 | + job = make_job() |
| 137 | + notebook = make_notebook() |
| 138 | + pipeline = make_pipeline() |
| 139 | + repo = make_repo() |
| 140 | + secret_scope = make_secret_scope() |
| 141 | + |
| 142 | + logger.info( |
| 143 | + f"cluster={cluster}, " |
| 144 | + f"cluster_policy={cluster_policy}, " |
| 145 | + f"directory={directory}, " |
| 146 | + f"instance_pool={instance_pool}, " |
| 147 | + f"job={job}, " |
| 148 | + f"notebook={notebook}, " |
| 149 | + f"pipeline={pipeline}" |
| 150 | + f"repo={repo}, " |
| 151 | + f"secret_scope={secret_scope}, " |
| 152 | + ) |
| 153 | + |
| 154 | + # TODO create fixtures for DBSQL assets |
| 155 | + # TODO set permissions |
| 156 | + |
| 157 | + schema_a = make_schema() |
| 158 | + schema_b = make_schema() |
| 159 | + schema_c = make_schema() |
| 160 | + table_a = make_table(schema=schema_a) |
| 161 | + table_b = make_table(schema=schema_b) |
| 162 | + |
| 163 | + logger.info( |
| 164 | + f"schema_a={schema_a}, " |
| 165 | + f"schema_b={schema_b}, " |
| 166 | + f"schema_c={schema_c}, " |
| 167 | + f"table_a={table_a}, " |
| 168 | + f"table_b={table_b}, " |
| 169 | + ) |
| 170 | + |
| 171 | + databases = ",".join([schema_a.split(".")[1], schema_b.split(".")[1], schema_c.split(".")[1]]) |
| 172 | + |
| 173 | + sql_exec(f"GRANT USAGE ON SCHEMA default TO `{ws_group_a.display_name}`") |
| 174 | + sql_exec(f"GRANT USAGE ON SCHEMA default TO `{ws_group_b.display_name}`") |
| 175 | + sql_exec(f"GRANT SELECT ON TABLE {table_a} TO `{ws_group_a.display_name}`") |
| 176 | + sql_exec(f"GRANT SELECT ON TABLE {table_b} TO `{ws_group_b.display_name}`") |
| 177 | + sql_exec(f"GRANT MODIFY ON SCHEMA {schema_b} TO `{ws_group_b.display_name}`") |
| 178 | + |
| 179 | + _, inventory_schema = make_schema(catalog="hive_metastore").split(".") |
| 180 | + |
| 181 | + logger.info(f"inventory_schema={inventory_schema}") |
| 182 | + |
| 183 | + logger.info("uploading notebook") |
| 184 | + |
| 185 | + ucx_notebook_path = Path(__file__).parent.parent.parent / "notebooks" / "toolkit.py" |
| 186 | + my_user = ws.current_user.me().user_name |
| 187 | + remote_ucx_notebook_location = f"/Users/{my_user}/notebooks/{make_random(10)}" |
| 188 | + ws.workspace.mkdirs(remote_ucx_notebook_location) |
| 189 | + ws_notebook = f"{remote_ucx_notebook_location}/test_notebook.py" |
| 190 | + |
| 191 | + with open(ucx_notebook_path, "rb") as fh: |
| 192 | + buf_notebook = BytesIO(fh.read()) |
| 193 | + ws.workspace.upload(ws_notebook, buf_notebook, format=ImportFormat.AUTO) |
| 194 | + |
| 195 | + logger.info("creating job") |
| 196 | + |
| 197 | + created_job = ws.jobs.create( |
| 198 | + tasks=[ |
| 199 | + jobs.Task( |
| 200 | + task_key="uc-migrate", |
| 201 | + notebook_task=jobs.NotebookTask( |
| 202 | + notebook_path=f"{remote_ucx_notebook_location}/test_notebook", |
| 203 | + base_parameters={ |
| 204 | + "inventory_schema": inventory_schema, |
| 205 | + "selected_groups": selected_groups, |
| 206 | + "databases": databases, |
| 207 | + }, |
| 208 | + ), |
| 209 | + libraries=[compute.Library(whl=f"/Workspace{wsfs_wheel}")], |
| 210 | + new_cluster=compute.ClusterSpec( |
| 211 | + instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], |
| 212 | + spark_version=ws.clusters.select_spark_version(latest=True), |
| 213 | + num_workers=1, |
| 214 | + spark_conf={"spark.databricks.acl.sqlOnly": "true"}, |
| 215 | + ), |
| 216 | + ) |
| 217 | + ], |
| 218 | + name="[UCX] Run Migration", |
| 219 | + ) |
| 220 | + |
| 221 | + logger.info("running job") |
| 222 | + |
| 223 | + try: |
| 224 | + ws.jobs.run_now(created_job.job_id).result() |
| 225 | + # TODO Validate migration, tacl |
| 226 | + finally: |
| 227 | + logger.info("deleting workbook") |
| 228 | + |
| 229 | + ws.workspace.delete(remote_ucx_notebook_location, recursive=True) |
| 230 | + |
| 231 | + logger.info("deleting job") |
| 232 | + |
| 233 | + ws.jobs.delete(created_job.job_id) |
0 commit comments