|
3 | 3 | from dataclasses import dataclass |
4 | 4 | from functools import partial |
5 | 5 |
|
| 6 | +from databricks.sdk import WorkspaceClient |
| 7 | + |
6 | 8 | from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend |
7 | 9 | from databricks.labs.ucx.framework.parallel import ThreadedExecution |
8 | 10 | from databricks.labs.ucx.mixins.sql import Row |
@@ -35,39 +37,38 @@ def key(self) -> str: |
35 | 37 | def kind(self) -> str: |
36 | 38 | return "VIEW" if self.view_text is not None else "TABLE" |
37 | 39 |
|
38 | | - def _sql_alter(self, catalog): |
39 | | - return ( |
40 | | - f"ALTER {self.kind} {self.key} SET" |
41 | | - f" TBLPROPERTIES ('upgraded_to' = '{catalog}.{self.database}.{self.name}');" |
42 | | - ) |
43 | | - |
44 | 40 | def _sql_external(self, catalog): |
45 | | - # TODO: https://github.com/databricks/ucx/issues/106 |
46 | | - return ( |
47 | | - f"CREATE TABLE IF NOT EXISTS {catalog}.{self.database}.{self.name}" |
48 | | - f" LIKE {self.key} COPY LOCATION;" + self._sql_alter(catalog) |
49 | | - ) |
| 41 | + return f"SYNC TABLE {catalog}.{self.database}.{self.name} FROM {self.key};" |
50 | 42 |
|
51 | 43 | def _sql_managed(self, catalog): |
52 | 44 | if not self.is_delta: |
53 | 45 | msg = f"{self.key} is not DELTA: {self.table_format}" |
54 | 46 | raise ValueError(msg) |
55 | | - return ( |
56 | | - f"CREATE TABLE IF NOT EXISTS {catalog}.{self.database}.{self.name}" |
57 | | - f" DEEP CLONE {self.key};" + self._sql_alter(catalog) |
58 | | - ) |
| 47 | + return f"CREATE TABLE IF NOT EXISTS {catalog}.{self.database}.{self.name} DEEP CLONE {self.key};" |
59 | 48 |
|
60 | 49 | def _sql_view(self, catalog): |
61 | 50 | return f"CREATE VIEW IF NOT EXISTS {catalog}.{self.database}.{self.name} AS {self.view_text};" |
62 | 51 |
|
63 | 52 | def uc_create_sql(self, catalog): |
64 | 53 | if self.kind == "VIEW": |
65 | 54 | return self._sql_view(catalog) |
66 | | - elif self.location is not None: |
| 55 | + elif self.object_type == "EXTERNAL": |
67 | 56 | return self._sql_external(catalog) |
68 | 57 | else: |
69 | 58 | return self._sql_managed(catalog) |
70 | 59 |
|
| 60 | + def sql_alter_to(self, catalog): |
| 61 | + return ( |
| 62 | + f"ALTER {self.kind} {self.key} SET" |
| 63 | + f" TBLPROPERTIES ('upgraded_to' = '{catalog}.{self.database}.{self.name}');" |
| 64 | + ) |
| 65 | + |
| 66 | + def sql_alter_from(self, catalog): |
| 67 | + return ( |
| 68 | + f"ALTER {self.kind} {catalog}.{self.database}.{self.name} SET" |
| 69 | + f" TBLPROPERTIES ('upgraded_from' = '{self.key}');" |
| 70 | + ) |
| 71 | + |
71 | 72 |
|
72 | 73 | class TablesCrawler(CrawlerBase): |
73 | 74 | def __init__(self, backend: SqlBackend, schema): |
@@ -143,3 +144,52 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None: |
143 | 144 | except Exception as e: |
144 | 145 | logger.error(f"Couldn't fetch information for table {full_name} : {e}") |
145 | 146 | return None |
| 147 | + |
| 148 | + |
| 149 | +class TablesMigrate: |
| 150 | + def __init__( |
| 151 | + self, |
| 152 | + tc: TablesCrawler, |
| 153 | + ws: WorkspaceClient, |
| 154 | + backend: SqlBackend, |
| 155 | + inventory_database: str, |
| 156 | + default_catalog=None, |
| 157 | + database_to_catalog_mapping: dict[str, str] | None = None, |
| 158 | + ): |
| 159 | + self._tc = tc |
| 160 | + self._backend = backend |
| 161 | + self._ws = ws |
| 162 | + self._inventory_database = inventory_database |
| 163 | + self._database_to_catalog_mapping = database_to_catalog_mapping |
| 164 | + self._seen_tables = {} |
| 165 | + self._default_catalog = self._init_default_catalog(default_catalog) |
| 166 | + |
| 167 | + @staticmethod |
| 168 | + def _init_default_catalog(default_catalog): |
| 169 | + if default_catalog: |
| 170 | + return default_catalog |
| 171 | + else: |
| 172 | + return "ucx_default" # TODO : Fetch current workspace name and append it to the default catalog. |
| 173 | + |
| 174 | + def migrate_tables(self): |
| 175 | + tasks = [] |
| 176 | + for table in self._tc.snapshot(): |
| 177 | + target_catalog = self._default_catalog |
| 178 | + if self._database_to_catalog_mapping: |
| 179 | + target_catalog = self._database_to_catalog_mapping[table.database] |
| 180 | + tasks.append(partial(self._migrate_table, target_catalog, table)) |
| 181 | + ThreadedExecution.gather("migrate tables", tasks) |
| 182 | + |
| 183 | + def _migrate_table(self, target_catalog, table): |
| 184 | + try: |
| 185 | + sql = table.uc_create_sql(target_catalog) |
| 186 | + logger.debug(f"Migrating table {table.key} to using SQL query: {sql}") |
| 187 | + |
| 188 | + if table.object_type == "MANAGED": |
| 189 | + self._backend.execute(sql) |
| 190 | + self._backend.execute(table.sql_alter_to(target_catalog)) |
| 191 | + self._backend.execute(table.sql_alter_from(target_catalog)) |
| 192 | + else: |
| 193 | + logger.info(f"Table {table.key} is a {table.object_type} and is not supported for migration yet ") |
| 194 | + except Exception as e: |
| 195 | + logger.error(f"Could not create table {table.name} because: {e}") |
0 commit comments