|
6 | 6 | import sys |
7 | 7 | import time |
8 | 8 | import webbrowser |
| 9 | +from collections.abc import Callable |
9 | 10 | from dataclasses import replace |
10 | 11 | from datetime import datetime, timedelta |
11 | 12 | from pathlib import Path |
|
16 | 17 | from databricks.labs.blueprint.installer import InstallState |
17 | 18 | from databricks.labs.blueprint.parallel import ManyError, Threads |
18 | 19 | from databricks.labs.blueprint.tui import Prompts |
| 20 | +from databricks.labs.blueprint.upgrades import Upgrades |
19 | 21 | from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2, find_project_root |
20 | 22 | from databricks.sdk import WorkspaceClient |
21 | 23 | from databricks.sdk.errors import ( # pylint: disable=redefined-builtin |
@@ -176,54 +178,65 @@ def __init__(self, prompts: Prompts, installation: Installation, ws: WorkspaceCl |
176 | 178 | self._installation = installation |
177 | 179 | self._prompts = prompts |
178 | 180 |
|
179 | | - def run(self): |
| 181 | + def run( |
| 182 | + self, |
| 183 | + verify_timeout=timedelta(minutes=2), |
| 184 | + sql_backend_factory: Callable[[WorkspaceConfig], SqlBackend] | None = None, |
| 185 | + wheel_builder_factory: Callable[[], WheelsV2] | None = None, |
| 186 | + ): |
180 | 187 | logger.info(f"Installing UCX v{PRODUCT_INFO.version()}") |
181 | 188 | config = self.configure() |
182 | | - sql_backend = StatementExecutionBackend(self._ws, config.warehouse_id) |
183 | | - wheels = WheelsV2(self._installation, PRODUCT_INFO) |
| 189 | + if not sql_backend_factory: |
| 190 | + sql_backend_factory = self._new_sql_backend |
| 191 | + if not wheel_builder_factory: |
| 192 | + wheel_builder_factory = self._new_wheel_builder |
184 | 193 | workspace_installation = WorkspaceInstallation( |
185 | 194 | config, |
186 | 195 | self._installation, |
187 | | - sql_backend, |
188 | | - wheels, |
| 196 | + sql_backend_factory(config), |
| 197 | + wheel_builder_factory(), |
189 | 198 | self._ws, |
190 | 199 | self._prompts, |
191 | | - verify_timeout=timedelta(minutes=2), |
| 200 | + verify_timeout=verify_timeout, |
192 | 201 | ) |
193 | | - workspace_installation.run() |
| 202 | + try: |
| 203 | + workspace_installation.run() |
| 204 | + except ManyError as err: |
| 205 | + if len(err.errs) == 1: |
| 206 | + raise err.errs[0] from None |
| 207 | + raise err |
| 208 | + |
| 209 | + def _new_wheel_builder(self): |
| 210 | + return WheelsV2(self._installation, PRODUCT_INFO) |
| 211 | + |
| 212 | + def _new_sql_backend(self, config: WorkspaceConfig) -> SqlBackend: |
| 213 | + return StatementExecutionBackend(self._ws, config.warehouse_id) |
194 | 214 |
|
195 | 215 | def configure(self) -> WorkspaceConfig: |
196 | 216 | try: |
197 | | - return self._installation.load(WorkspaceConfig) |
| 217 | + config = self._installation.load(WorkspaceConfig) |
| 218 | + self._apply_upgrades() |
| 219 | + return config |
198 | 220 | except NotFound as err: |
199 | 221 | logger.debug(f"Cannot find previous installation: {err}") |
| 222 | + return self._configure_new_installation() |
| 223 | + |
| 224 | + def _apply_upgrades(self): |
| 225 | + try: |
| 226 | + upgrades = Upgrades(PRODUCT_INFO, self._installation) |
| 227 | + upgrades.apply(self._ws) |
| 228 | + except NotFound as err: |
| 229 | + logger.warning(f"Installed version is too old: {err}") |
| 230 | + return |
| 231 | + |
| 232 | + def _configure_new_installation(self) -> WorkspaceConfig: |
200 | 233 | logger.info("Please answer a couple of questions to configure Unity Catalog migration") |
201 | 234 | HiveMetastoreLineageEnabler(self._ws).apply(self._prompts) |
202 | 235 | inventory_database = self._prompts.question( |
203 | 236 | "Inventory Database stored in hive_metastore", default="ucx", valid_regex=r"^\w+$" |
204 | 237 | ) |
205 | 238 |
|
206 | | - def warehouse_type(_): |
207 | | - return _.warehouse_type.value if not _.enable_serverless_compute else "SERVERLESS" |
208 | | - |
209 | | - pro_warehouses = {"[Create new PRO SQL warehouse]": "create_new"} | { |
210 | | - f"{_.name} ({_.id}, {warehouse_type(_)}, {_.state.value})": _.id |
211 | | - for _ in self._ws.warehouses.list() |
212 | | - if _.warehouse_type == EndpointInfoWarehouseType.PRO |
213 | | - } |
214 | | - warehouse_id = self._prompts.choice_from_dict( |
215 | | - "Select PRO or SERVERLESS SQL warehouse to run assessment dashboards on", pro_warehouses |
216 | | - ) |
217 | | - if warehouse_id == "create_new": |
218 | | - new_warehouse = self._ws.warehouses.create( |
219 | | - name=f"{WAREHOUSE_PREFIX} {time.time_ns()}", |
220 | | - spot_instance_policy=SpotInstancePolicy.COST_OPTIMIZED, |
221 | | - warehouse_type=CreateWarehouseRequestWarehouseType.PRO, |
222 | | - cluster_size="Small", |
223 | | - max_num_clusters=1, |
224 | | - ) |
225 | | - warehouse_id = new_warehouse.id |
226 | | - |
| 239 | + warehouse_id = self._configure_warehouse() |
227 | 240 | configure_groups = ConfigureGroups(self._prompts) |
228 | 241 | configure_groups.run() |
229 | 242 | log_level = self._prompts.question("Log level", default="INFO").upper() |
@@ -269,6 +282,29 @@ def warehouse_type(_): |
269 | 282 | webbrowser.open(ws_file_url) |
270 | 283 | return config |
271 | 284 |
|
| 285 | + def _configure_warehouse(self): |
| 286 | + def warehouse_type(_): |
| 287 | + return _.warehouse_type.value if not _.enable_serverless_compute else "SERVERLESS" |
| 288 | + |
| 289 | + pro_warehouses = {"[Create new PRO SQL warehouse]": "create_new"} | { |
| 290 | + f"{_.name} ({_.id}, {warehouse_type(_)}, {_.state.value})": _.id |
| 291 | + for _ in self._ws.warehouses.list() |
| 292 | + if _.warehouse_type == EndpointInfoWarehouseType.PRO |
| 293 | + } |
| 294 | + warehouse_id = self._prompts.choice_from_dict( |
| 295 | + "Select PRO or SERVERLESS SQL warehouse to run assessment dashboards on", pro_warehouses |
| 296 | + ) |
| 297 | + if warehouse_id == "create_new": |
| 298 | + new_warehouse = self._ws.warehouses.create( |
| 299 | + name=f"{WAREHOUSE_PREFIX} {time.time_ns()}", |
| 300 | + spot_instance_policy=SpotInstancePolicy.COST_OPTIMIZED, |
| 301 | + warehouse_type=CreateWarehouseRequestWarehouseType.PRO, |
| 302 | + cluster_size="Small", |
| 303 | + max_num_clusters=1, |
| 304 | + ) |
| 305 | + warehouse_id = new_warehouse.id |
| 306 | + return warehouse_id |
| 307 | + |
272 | 308 | @staticmethod |
273 | 309 | def _policy_config(value: str): |
274 | 310 | return {"type": "fixed", "value": value} |
@@ -370,7 +406,7 @@ def __init__( |
370 | 406 |
|
371 | 407 | @classmethod |
372 | 408 | def current(cls, ws: WorkspaceClient): |
373 | | - installation = Installation.current(ws, PRODUCT_INFO.product_name()) |
| 409 | + installation = PRODUCT_INFO.current_installation(ws) |
374 | 410 | config = installation.load(WorkspaceConfig) |
375 | 411 | sql_backend = StatementExecutionBackend(ws, config.warehouse_id) |
376 | 412 | wheels = WheelsV2(installation, PRODUCT_INFO) |
|
0 commit comments