Skip to content

Commit 9fc1778

Browse files
authored
Fixed installer script to reuse an existing UCX Cluster policy if present (#964)
## Changes This change fixes two bugs related to cluster policy setup during ucx installation - If a user is rerunning installation (due to some issue previously but some steps completed like policy creation), if a UCX cluster policy is found, it reuses that instead of creating one - If the user is upgrading UCX where the initial installation steps are skipped, but the policy ID is not found in the config.yaml (due to manually deleting or upgrading from an older version), then raise an InvalidParameterValue with a custom msg saying policy id not found and request the user to uninstall and reinstall ucx completely. ### #963 ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [ ] modified existing command: `databricks labs ucx ...` - [ ] added a new workflow - [ ] modified existing workflow: `...` - [ ] added a new table - [ ] modified existing table: `...` ### Tests - [X] manually tested - [X] added unit tests - [X] added integration tests - [ ] verified on staging environment (screenshot attached)
1 parent c38ff72 commit 9fc1778

File tree

3 files changed

+109
-28
lines changed

3 files changed

+109
-28
lines changed

src/databricks/labs/ucx/install.py

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,7 @@ def warehouse_type(_):
243243
cluster_policy = json.loads(self._prompts.choice_from_dict("Choose a cluster policy", cluster_policies))
244244
instance_profile, spark_conf_dict = self._get_ext_hms_conf_from_policy(cluster_policy)
245245

246-
logger.info("Creating UCX cluster policy.")
247-
policy_id = self._ws.cluster_policies.create(
248-
name=f"Unity Catalog Migration ({inventory_database})",
249-
definition=self._cluster_policy_definition(conf=spark_conf_dict, instance_profile=instance_profile),
250-
description="Custom cluster policy for Unity Catalog Migration (UCX)",
251-
).policy_id
252-
246+
policy_id = self._create_cluster_policy(inventory_database, spark_conf_dict, instance_profile)
253247
config = WorkspaceConfig(
254248
inventory_database=inventory_database,
255249
workspace_group_regex=configure_groups.workspace_group_regex,
@@ -275,6 +269,26 @@ def warehouse_type(_):
275269
def _policy_config(value: str):
276270
return {"type": "fixed", "value": value}
277271

272+
def _create_cluster_policy(
273+
self, inventory_database: str, spark_conf: dict, instance_profile: str | None
274+
) -> str | None:
275+
policy_name = f"Unity Catalog Migration ({inventory_database}) ({self._ws.current_user.me().user_name})"
276+
policies = self._ws.cluster_policies.list()
277+
policy_id = None
278+
for policy in policies:
279+
if policy.name == policy_name:
280+
policy_id = policy.policy_id
281+
logger.info(f"Cluster policy {policy_name} already present, reusing the same.")
282+
break
283+
if not policy_id:
284+
logger.info("Creating UCX cluster policy.")
285+
policy_id = self._ws.cluster_policies.create(
286+
name=policy_name,
287+
definition=self._cluster_policy_definition(conf=spark_conf, instance_profile=instance_profile),
288+
description="Custom cluster policy for Unity Catalog Migration (UCX)",
289+
).policy_id
290+
return policy_id
291+
278292
def _cluster_policy_definition(self, conf: dict, instance_profile: str | None) -> str:
279293
policy_definition = {
280294
"spark_version": self._policy_config(self._ws.clusters.select_spark_version(latest=True)),
@@ -543,22 +557,28 @@ def _upload_wheel(self):
543557
self._installation.save(self._config)
544558
return self._wheels.upload_to_wsfs()
545559

546-
def create_jobs(self):
547-
logger.debug(f"Creating jobs from tasks in {main.__name__}")
548-
remote_wheel = self._upload_wheel()
560+
def _upload_cluster_policy(self, remote_wheel: str):
549561
try:
550-
policy_definition = self._ws.cluster_policies.get(policy_id=self.config.policy_id).definition
562+
if self.config.policy_id is None:
563+
msg = "Cluster policy not present, please uninstall and reinstall ucx completely."
564+
raise InvalidParameterValue(msg)
565+
policy = self._ws.cluster_policies.get(policy_id=self.config.policy_id)
551566
except NotFound as err:
552567
msg = f"UCX Policy {self.config.policy_id} not found, please reinstall UCX"
553568
logger.error(msg)
554569
raise NotFound(msg) from err
570+
if policy.name is not None:
571+
self._ws.cluster_policies.edit(
572+
policy_id=self.config.policy_id,
573+
name=policy.name,
574+
definition=policy.definition,
575+
libraries=[compute.Library(whl=f"dbfs:{remote_wheel}")],
576+
)
555577

556-
self._ws.cluster_policies.edit(
557-
policy_id=self.config.policy_id,
558-
name=f"Unity Catalog Migration ({self.config.inventory_database})",
559-
definition=policy_definition,
560-
libraries=[compute.Library(whl=f"dbfs:{remote_wheel}")],
561-
)
578+
def create_jobs(self):
579+
logger.debug(f"Creating jobs from tasks in {main.__name__}")
580+
remote_wheel = self._upload_wheel()
581+
self._upload_cluster_policy(remote_wheel)
562582
desired_steps = {t.workflow for t in _TASKS.values() if t.cloud_compatible(self._ws.config)}
563583
wheel_runner = None
564584

tests/integration/test_installation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,11 @@ def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend,
116116
@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=18))
117117
def test_job_cluster_policy(ws, new_installation):
118118
install = new_installation(lambda wc: replace(wc, override_clusters=None))
119+
user_name = ws.current_user.me().user_name
119120
cluster_policy = ws.cluster_policies.get(policy_id=install.config.policy_id)
120121
policy_definition = json.loads(cluster_policy.definition)
121122

122-
assert cluster_policy.name == f"Unity Catalog Migration ({install.config.inventory_database})"
123+
assert cluster_policy.name == f"Unity Catalog Migration ({install.config.inventory_database}) ({user_name})"
123124

124125
assert policy_definition["spark_version"]["value"] == ws.clusters.select_spark_version(latest=True)
125126
assert policy_definition["node_type_id"]["value"] == ws.clusters.select_node_type(local_disk=True)

tests/unit/test_install.py

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def test_install_cluster_override_jobs(ws, mock_installation, any_prompt):
158158
sql_backend = MockBackend()
159159
wheels = create_autospec(WheelsV2)
160160
workspace_installation = WorkspaceInstallation(
161-
WorkspaceConfig(inventory_database='ucx', override_clusters={"main": 'one', "tacl": 'two'}),
161+
WorkspaceConfig(inventory_database='ucx', override_clusters={"main": 'one', "tacl": 'two'}, policy_id='123'),
162162
mock_installation,
163163
sql_backend,
164164
wheels,
@@ -190,7 +190,7 @@ def test_write_protected_dbfs(ws, tmp_path, mock_installation):
190190
)
191191

192192
workspace_installation = WorkspaceInstallation(
193-
WorkspaceConfig(inventory_database='ucx'),
193+
WorkspaceConfig(inventory_database='ucx', policy_id='123'),
194194
mock_installation,
195195
sql_backend,
196196
wheels,
@@ -214,6 +214,7 @@ def test_write_protected_dbfs(ws, tmp_path, mock_installation):
214214
'log_level': 'INFO',
215215
'num_threads': 10,
216216
'override_clusters': {'main': '2222-999999-nosecuri', 'tacl': '3333-999999-legacytc'},
217+
'policy_id': '123',
217218
'renamed_group_prefix': 'ucx-renamed-',
218219
'workspace_start_path': '/',
219220
},
@@ -225,7 +226,7 @@ def test_writeable_dbfs(ws, tmp_path, mock_installation, any_prompt):
225226
sql_backend = MockBackend()
226227
wheels = create_autospec(WheelsV2)
227228
workspace_installation = WorkspaceInstallation(
228-
WorkspaceConfig(inventory_database='ucx'),
229+
WorkspaceConfig(inventory_database='ucx', policy_id='123'),
229230
mock_installation,
230231
sql_backend,
231232
wheels,
@@ -452,6 +453,47 @@ def test_save_config_strip_group_names(ws, mock_installation):
452453
)
453454

454455

456+
def test_cluster_policy_definition_present_reuse(ws, mock_installation):
457+
ws.config.is_aws = False
458+
ws.config.is_azure = True
459+
ws.config.is_gcp = False
460+
ws.cluster_policies.list.return_value = [
461+
Policy(
462+
policy_id="foo1",
463+
name="Unity Catalog Migration (ucx) ([email protected])",
464+
definition=json.dumps({}),
465+
description="Custom cluster policy for Unity Catalog Migration (UCX)",
466+
)
467+
]
468+
prompts = MockPrompts(
469+
{
470+
r".*PRO or SERVERLESS SQL warehouse.*": "1",
471+
r"Choose how to map the workspace groups.*": "2", # specify names
472+
r".*workspace group names.*": "g1, g2, g99",
473+
r".*We have identified one or more cluster.*": "No",
474+
r".*Choose a cluster policy.*": "0",
475+
r".*": "",
476+
}
477+
)
478+
install = WorkspaceInstaller(prompts, mock_installation, ws)
479+
install.configure()
480+
mock_installation.assert_file_written(
481+
'config.yml',
482+
{
483+
'version': 2,
484+
'default_catalog': 'ucx_default',
485+
'include_group_names': ['g1', 'g2', 'g99'],
486+
'inventory_database': 'ucx',
487+
'log_level': 'INFO',
488+
'num_threads': 8,
489+
'policy_id': 'foo1',
490+
'renamed_group_prefix': 'db-temp-',
491+
'warehouse_id': 'abc',
492+
'workspace_start_path': '/',
493+
},
494+
)
495+
496+
455497
def test_cluster_policy_definition_azure_hms(ws, mock_installation):
456498
ws.config.is_aws = False
457499
ws.config.is_azure = True
@@ -498,7 +540,7 @@ def test_cluster_policy_definition_azure_hms(ws, mock_installation):
498540
"azure_attributes.availability": {"type": "fixed", "value": "ON_DEMAND_AZURE"},
499541
}
500542
ws.cluster_policies.create.assert_called_with(
501-
name="Unity Catalog Migration (ucx)",
543+
name="Unity Catalog Migration (ucx) ([email protected])",
502544
definition=json.dumps(policy_definition_actual),
503545
description="Custom cluster policy for Unity Catalog Migration (UCX)",
504546
)
@@ -541,7 +583,7 @@ def test_cluster_policy_definition_aws_glue(ws, mock_installation):
541583
"aws_attributes.instance_profile_arn": {"type": "fixed", "value": "role_arn_1"},
542584
}
543585
ws.cluster_policies.create.assert_called_with(
544-
name="Unity Catalog Migration (ucx)",
586+
name="Unity Catalog Migration (ucx) ([email protected])",
545587
definition=json.dumps(policy_definition_actual),
546588
description="Custom cluster policy for Unity Catalog Migration (UCX)",
547589
)
@@ -592,7 +634,7 @@ def test_cluster_policy_definition_gcp(ws, mock_installation):
592634
"gcp_attributes.availability": {"type": "fixed", "value": "ON_DEMAND_GCP"},
593635
}
594636
ws.cluster_policies.create.assert_called_with(
595-
name="Unity Catalog Migration (ucx)",
637+
name="Unity Catalog Migration (ucx) ([email protected])",
596638
definition=json.dumps(policy_definition_actual),
597639
description="Custom cluster policy for Unity Catalog Migration (UCX)",
598640
)
@@ -611,17 +653,19 @@ def test_install_edit_policy_with_library(ws, mock_installation, any_prompt):
611653
timedelta(seconds=1),
612654
)
613655
wheels.upload_to_wsfs.return_value = "path1"
614-
ws.cluster_policies.get.return_value = Policy(policy_id="foo")
656+
ws.cluster_policies.get.return_value = Policy(
657+
policy_id="foo", name="Unity Catalog Migration (ucx) ([email protected])"
658+
)
615659
workspace_installation.create_jobs()
616660
ws.cluster_policies.edit.assert_called_with(
617-
name="Unity Catalog Migration (ucx)",
661+
name="Unity Catalog Migration (ucx) ([email protected])",
618662
policy_id="foo",
619663
definition=None,
620664
libraries=[compute.Library(whl="dbfs:path1")],
621665
)
622666

623667

624-
def test_install_edit_policy_not_present(ws, mock_installation, any_prompt):
668+
def test_install_edit_policy_not_found(ws, mock_installation, any_prompt):
625669
sql_backend = MockBackend()
626670
wheels = create_autospec(WheelsV2)
627671
workspace_installation = WorkspaceInstallation(
@@ -638,6 +682,22 @@ def test_install_edit_policy_not_present(ws, mock_installation, any_prompt):
638682
workspace_installation.create_jobs()
639683

640684

685+
def test_install_edit_policy_not_present(ws, mock_installation, any_prompt):
686+
sql_backend = MockBackend()
687+
wheels = create_autospec(WheelsV2)
688+
workspace_installation = WorkspaceInstallation(
689+
WorkspaceConfig(inventory_database='ucx', override_clusters={"main": 'one', "tacl": 'two'}),
690+
mock_installation,
691+
sql_backend,
692+
wheels,
693+
ws,
694+
any_prompt,
695+
timedelta(seconds=1),
696+
)
697+
with pytest.raises(InvalidParameterValue):
698+
workspace_installation.create_jobs()
699+
700+
641701
def test_save_config_with_custom_policy(ws, mock_installation):
642702
policy_def = b"""{
643703
"aws_attributes.instance_profile_arn": {
@@ -750,7 +810,7 @@ def test_main_with_existing_conf_does_not_recreate_config(ws, mocker, mock_insta
750810
}
751811
)
752812
workspace_installation = WorkspaceInstallation(
753-
WorkspaceConfig(inventory_database="..."),
813+
WorkspaceConfig(inventory_database="...", policy_id='123'),
754814
mock_installation,
755815
sql_backend,
756816
create_autospec(WheelsV2),

0 commit comments

Comments
 (0)