Skip to content

Commit 2f1d8c4

Browse files
Added verification of group permissions (#841)
## Changes Added `verify` method to all `AclSupport` subclasses. Existing `_inflight_check` and similar names were renamed to _verify for consistency. Expose a method to validate groups permissions and added cli command. ### Functionality - [x] added new CLI command (`validate-groups-permissions`) to run verification of groups permissions ### Tests - [x] manually tested - [x] added unit tests - [x] added integration tests --------- Co-authored-by: mwojtyczka <[email protected]> Co-authored-by: Marcin Wojtyczka <[email protected]>
1 parent f4ba945 commit 2f1d8c4

File tree

22 files changed

+1200
-69
lines changed

22 files changed

+1200
-69
lines changed

labs.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ commands:
103103
- name: subscription-id
104104
description: Subscription to scan storage account in
105105

106-
107106
- name: validate-groups-membership
108107
description: Validate the groups to see if the groups at account level and workspace level have different membership
109108
table_template: |-

src/databricks/labs/ucx/runtime.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,20 @@ def apply_permissions_to_account_groups(cfg: WorkspaceConfig, ws: WorkspaceClien
310310
permission_manager.apply_group_permissions(migration_state)
311311

312312

313+
@task("validate-groups-permissions")
314+
def validate_groups_permissions(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
315+
"""Validate that all the crawled permissions are applied correctly to the destination groups."""
316+
logger.info("Running validation of permissions applied to destination groups.")
317+
permission_manager = PermissionManager.factory(
318+
ws,
319+
sql_backend,
320+
cfg.inventory_database,
321+
num_threads=cfg.num_threads,
322+
workspace_start_path=cfg.workspace_start_path,
323+
)
324+
permission_manager.verify_group_permissions()
325+
326+
313327
@task("remove-workspace-local-backup-groups", depends_on=[apply_permissions_to_account_groups])
314328
def delete_backup_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
315329
"""Last step of the group migration process. Removes all workspace-level backup groups, along with their

src/databricks/labs/ucx/workspace_access/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ def get_apply_task(self, item: Permissions, migration_state: MigrationState) ->
2929
"""This method returns a Callable, that applies permissions to a destination group, based on
3030
the group migration state. The callable is required not to have any shared mutable state."""
3131

32+
def get_verify_task(self, item: Permissions) -> Callable[[], bool] | None:
33+
"""This method returns a Callable that verifies that all the crawled permissions are applied correctly to the
34+
destination group."""
35+
3236
@abstractmethod
3337
def object_types(self) -> set[str]:
3438
"""This method returns a set of strings, that represent object types that are applicable by this instance."""

src/databricks/labs/ucx/workspace_access/generic.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,21 +119,32 @@ def _response_to_request(
119119
)
120120
return results
121121

122-
def _inflight_check(self, object_type: str, object_id: str, acl: list[iam.AccessControlRequest]):
122+
@rate_limited(max_requests=100)
123+
def _verify(self, object_type: str, object_id: str, acl: list[iam.AccessControlRequest]):
123124
# in-flight check for the applied permissions
124125
# the api might be inconsistent, therefore we need to check that the permissions were applied
125126
remote_permission = self._safe_get_permissions(object_type, object_id)
126127
if remote_permission:
127128
remote_permission_as_request = self._response_to_request(remote_permission.access_control_list)
128129
if all(elem in remote_permission_as_request for elem in acl):
129130
return True
130-
msg = f"""Couldn't apply appropriate permission for object type {object_type} with id {object_id}
131-
acl to be applied={acl}
132-
acl found in the object={remote_permission_as_request}
133-
"""
131+
msg = (
132+
f"Couldn't find permission for object type {object_type} with id {object_id}\n"
133+
f"acl to be applied={acl}\n"
134+
f"acl found in the object={remote_permission_as_request}\n"
135+
)
134136
raise ValueError(msg)
135137
return False
136138

139+
def get_verify_task(self, item: Permissions) -> Callable[[], bool]:
140+
acl = iam.ObjectPermissions.from_dict(json.loads(item.raw))
141+
if not acl.access_control_list:
142+
raise ValueError(
143+
f"Access control list not present for object type " f"{item.object_type} and object id {item.object_id}"
144+
)
145+
permissions_as_request = self._response_to_request(acl.access_control_list)
146+
return partial(self._verify, item.object_type, item.object_id, permissions_as_request)
147+
137148
@rate_limited(max_requests=30)
138149
def _applier_task(self, object_type: str, object_id: str, acl: list[iam.AccessControlRequest]):
139150
retryable_exceptions = [InternalError, NotFound, ResourceConflict, TemporarilyUnavailable, DeadlineExceeded]
@@ -145,7 +156,7 @@ def _applier_task(self, object_type: str, object_id: str, acl: list[iam.AccessCo
145156
update_retried_check(object_type, object_id, acl)
146157

147158
retry_on_value_error = retried(on=[*retryable_exceptions, ValueError], timeout=self._verify_timeout)
148-
retried_check = retry_on_value_error(self._inflight_check)
159+
retried_check = retry_on_value_error(self._verify)
149160
return retried_check(object_type, object_id, acl)
150161

151162
@rate_limited(max_requests=100)

src/databricks/labs/ucx/workspace_access/manager.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def inventorize_permissions(self):
9191
self._save(items)
9292
logger.info(f"Saved {len(items)} to {self._full_name}")
9393

94-
def apply_group_permissions(self, migration_state: MigrationState):
94+
def apply_group_permissions(self, migration_state: MigrationState) -> bool:
9595
# list shall be sorted prior to using group by
9696
if len(migration_state) == 0:
9797
logger.info("No valid groups selected, nothing to do.")
@@ -140,6 +140,34 @@ def apply_group_permissions(self, migration_state: MigrationState):
140140
logger.info("Permissions were applied")
141141
return True
142142

143+
def verify_group_permissions(self) -> bool:
144+
items = sorted(self.load_all(), key=lambda i: i.object_type)
145+
logger.info(f"Total permissions found: {len(items)}")
146+
verifier_tasks: list[Callable[..., bool]] = []
147+
appliers = self._appliers()
148+
149+
for object_type, items_subset in groupby(items, key=lambda i: i.object_type):
150+
if object_type not in appliers:
151+
msg = f"Could not find support for {object_type}. Please check the inventory table."
152+
raise ValueError(msg)
153+
154+
relevant_support = appliers[object_type]
155+
tasks_for_support: list[Callable[..., bool]] = []
156+
for item in items_subset:
157+
task = relevant_support.get_verify_task(item)
158+
if not task:
159+
continue
160+
tasks_for_support.append(task)
161+
162+
logger.info(f"Total tasks for {object_type}: {len(tasks_for_support)}")
163+
verifier_tasks.extend(tasks_for_support)
164+
165+
logger.info(f"Starting to verify permissions. Total tasks: {len(verifier_tasks)}")
166+
Threads.strict("verify group permissions", verifier_tasks)
167+
logger.info("All permissions validated successfully. No issues found.")
168+
169+
return True
170+
143171
def _appliers(self) -> dict[str, AclSupport]:
144172
appliers: dict[str, AclSupport] = {}
145173
for support in self._acl_support:

src/databricks/labs/ucx/workspace_access/redash.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# pylint: disable=duplicate-code
12
import dataclasses
23
import json
34
import logging
@@ -141,22 +142,31 @@ def _crawler_task(self, object_id: str, object_type: sql.ObjectTypePlural) -> Pe
141142
)
142143
return None
143144

144-
def _inflight_check(self, object_type: sql.ObjectTypePlural, object_id: str, acl: list[sql.AccessControl]):
145+
@rate_limited(max_requests=100)
146+
def _verify(self, object_type: sql.ObjectTypePlural, object_id: str, acl: list[sql.AccessControl]):
145147
# in-flight check for the applied permissions
146148
# the api might be inconsistent, therefore we need to check that the permissions were applied
147149
remote_permission = self._safe_get_dbsql_permissions(object_type, object_id)
148150
if remote_permission:
149151
assert remote_permission.access_control_list is not None
150152
if all(elem in remote_permission.access_control_list for elem in acl):
151153
return True
152-
msg = f"""
153-
Couldn't apply appropriate permission for object type {object_type} with id {object_id}
154-
acl to be applied={acl}
155-
acl found in the object={remote_permission}
156-
"""
154+
msg = (
155+
f"Couldn't find permission for object type {object_type} with id {object_id}\n"
156+
f"acl to be applied={acl}\n"
157+
f"acl found in the object={remote_permission}\n"
158+
)
157159
raise ValueError(msg)
158160
return False
159161

162+
def get_verify_task(self, item: Permissions) -> Callable[[], bool]:
163+
acl = sql.GetResponse.from_dict(json.loads(item.raw))
164+
if not acl.access_control_list:
165+
raise ValueError(
166+
f"Access control list not present for object type " f"{item.object_type} and object id {item.object_id}"
167+
)
168+
return partial(self._verify, sql.ObjectTypePlural(item.object_type), item.object_id, acl.access_control_list)
169+
160170
@rate_limited(max_requests=30)
161171
def _applier_task(self, object_type: sql.ObjectTypePlural, object_id: str, acl: list[sql.AccessControl]):
162172
"""
@@ -169,7 +179,7 @@ def _applier_task(self, object_type: sql.ObjectTypePlural, object_id: str, acl:
169179
set_retried_check(object_type, object_id, acl)
170180

171181
retry_on_value_error = retried(on=[InternalError, ValueError], timeout=self._verify_timeout)
172-
retried_check = retry_on_value_error(self._inflight_check)
182+
retried_check = retry_on_value_error(self._verify)
173183
return retried_check(object_type, object_id, acl)
174184

175185
def _prepare_new_acl(

src/databricks/labs/ucx/workspace_access/scim.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
from collections.abc import Callable
34
from datetime import timedelta
45
from functools import partial
56

@@ -94,7 +95,8 @@ def _crawler_task(group: iam.Group, property_name: str):
9495
raw=json.dumps([e.as_dict() for e in getattr(group, property_name)]),
9596
)
9697

97-
def _inflight_check(self, group_id: str, value: list[iam.ComplexValue], property_name: str):
98+
@rate_limited(max_requests=255, burst_period_seconds=60)
99+
def _verify(self, group_id: str, value: list[iam.ComplexValue], property_name: str):
98100
# in-flight check for the applied permissions
99101
# the api might be inconsistent, therefore we need to check that the permissions were applied
100102
group = self._safe_get_group(group_id)
@@ -105,13 +107,18 @@ def _inflight_check(self, group_id: str, value: list[iam.ComplexValue], property
105107
if property_name == "entitlements" and group.entitlements:
106108
if all(elem in group.entitlements for elem in value):
107109
return True
108-
msg = f"""Couldn't apply appropriate role for group {group_id}
109-
acl to be applied={[e.as_dict() for e in value]}
110-
acl found in the object={group.as_dict()}
111-
"""
110+
msg = (
111+
f"Couldn't find role for group {group_id}\n"
112+
f"acl to be applied={[e.as_dict() for e in value]}\n"
113+
f"acl found in the object={group.as_dict()}\n"
114+
)
112115
raise ValueError(msg)
113116
return False
114117

118+
def get_verify_task(self, item: Permissions) -> Callable[[], bool]:
119+
value = [iam.ComplexValue.from_dict(e) for e in json.loads(item.raw)]
120+
return partial(self._verify, item.object_id, value, item.object_type)
121+
115122
@rate_limited(max_requests=10, burst_period_seconds=60)
116123
def _applier_task(self, group_id: str, value: list[iam.ComplexValue], property_name: str):
117124
retryable_errors = [ResourceConflict, Aborted, DeadlineExceeded, InternalError]
@@ -126,7 +133,7 @@ def _applier_task(self, group_id: str, value: list[iam.ComplexValue], property_n
126133
patch_retried_check(group_id, operations, schemas)
127134

128135
retry_on_value_error = retried(on=[*retryable_errors, ValueError], timeout=self._verify_timeout)
129-
retried_check = retry_on_value_error(self._inflight_check)
136+
retried_check = retry_on_value_error(self._verify)
130137
return retried_check(group_id, value, property_name)
131138

132139
def _safe_patch_group(

src/databricks/labs/ucx/workspace_access/secrets.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import json
22
import logging
3+
from collections.abc import Callable, Iterable
34
from datetime import timedelta
45
from functools import partial
56

67
from databricks.labs.blueprint.limiter import rate_limited
78
from databricks.sdk import WorkspaceClient
89
from databricks.sdk.retries import retried
910
from databricks.sdk.service import workspace
11+
from databricks.sdk.service.workspace import AclItem
1012

1113
from databricks.labs.ucx.workspace_access.base import AclSupport, Permissions
1214
from databricks.labs.ucx.workspace_access.groups import MigrationState
@@ -58,7 +60,7 @@ def get_apply_task(self, item: Permissions, migration_state: MigrationState):
5860

5961
def apply_acls():
6062
for acl in new_acls:
61-
self._rate_limited_put_acl(item.object_id, acl.principal, acl.permission)
63+
self._applier_task(item.object_id, acl.principal, acl.permission)
6264
return True
6365

6466
return partial(apply_acls)
@@ -78,21 +80,42 @@ def secret_scope_permission(self, scope_name: str, group_name: str) -> workspace
7880
def _reapply_on_failure(self, scope_name: str, group_name: str, expected_permission: workspace.AclPermission):
7981
# in-flight check for the applied permissions
8082
# the api might be inconsistent, therefore we need to check that the permissions were applied
81-
applied_permission = self.secret_scope_permission(scope_name, group_name)
82-
if applied_permission != expected_permission:
83-
# try to apply again if the permissions are not equal: sometimes the list_acls api is inconsistent
83+
try:
84+
self._verify(scope_name, group_name, expected_permission)
85+
except ValueError:
8486
logger.info(f"Applying permissions again {expected_permission} to {group_name} for {scope_name}")
8587
self._ws.secrets.put_acl(scope_name, group_name, expected_permission)
88+
raise
89+
return True
90+
91+
@rate_limited(max_requests=1100, burst_period_seconds=60)
92+
def _verify(self, scope_name: str, group_name: str, expected_permission: workspace.AclPermission):
93+
# in-flight check for the applied permissions
94+
# the api might be inconsistent, therefore we need to check that the permissions were applied
95+
applied_permission = self.secret_scope_permission(scope_name, group_name)
96+
if applied_permission != expected_permission:
8697
msg = (
87-
f"Applied permission {applied_permission} is not equal to expected "
88-
f"permission {expected_permission} for {scope_name} and {group_name}!"
98+
f"Couldn't find permission for scope {scope_name} and group {group_name}\n"
99+
f"acl to be applied={expected_permission}\n"
100+
f"acl found in the object={applied_permission}\n"
89101
)
90102
raise ValueError(msg)
91-
logger.info(f"Permissions matched for {scope_name}, {group_name} and {expected_permission}!")
92103
return True
93104

105+
def get_verify_task(self, item: Permissions) -> Callable[[], bool]:
106+
acls = [workspace.AclItem.from_dict(acl) for acl in json.loads(item.raw)]
107+
108+
def _verify_acls(scope_name: str, acls: Iterable[AclItem]):
109+
for acl in acls:
110+
assert acl.permission is not None
111+
assert acl.principal is not None
112+
self._verify(scope_name, acl.principal, acl.permission)
113+
return True
114+
115+
return partial(_verify_acls, item.object_id, acls)
116+
94117
@rate_limited(max_requests=1100, burst_period_seconds=60)
95-
def _rate_limited_put_acl(self, object_id: str, principal: str, permission: workspace.AclPermission):
118+
def _applier_task(self, object_id: str, principal: str, permission: workspace.AclPermission):
96119
self._ws.secrets.put_acl(object_id, principal, permission)
97120
retry_on_value_error = retried(on=[ValueError], timeout=self._verify_timeout)
98121
retried_check = retry_on_value_error(self._reapply_on_failure)

src/databricks/labs/ucx/workspace_access/tacl.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
import functools
44
import json
55
from collections.abc import Callable, Iterator
6+
from datetime import timedelta
67
from functools import partial
78

9+
from databricks.sdk.retries import retried
10+
811
from databricks.labs.ucx.framework.crawlers import SqlBackend
912
from databricks.labs.ucx.hive_metastore import GrantsCrawler
1013
from databricks.labs.ucx.hive_metastore.grants import Grant
@@ -13,9 +16,15 @@
1316

1417

1518
class TableAclSupport(AclSupport):
16-
def __init__(self, grants_crawler: GrantsCrawler, sql_backend: SqlBackend):
19+
def __init__(
20+
self,
21+
grants_crawler: GrantsCrawler,
22+
sql_backend: SqlBackend,
23+
verify_timeout: timedelta | None = timedelta(minutes=1),
24+
):
1725
self._grants_crawler = grants_crawler
1826
self._sql_backend = sql_backend
27+
self._verify_timeout = verify_timeout
1928

2029
def get_crawler_tasks(self) -> Iterator[Callable[..., Permissions | None]]:
2130
# Table ACL permissions (grant/revoke and ownership) are not atomic. When granting the permissions,
@@ -97,4 +106,33 @@ def _apply_grant(self, grant: Grant):
97106
"""
98107
for sql in grant.hive_grant_sql():
99108
self._sql_backend.execute(sql)
100-
return True
109+
110+
object_type, object_id = grant.this_type_and_key()
111+
retry_on_value_error = retried(on=[ValueError], timeout=self._verify_timeout)
112+
retried_check = retry_on_value_error(self._verify)
113+
return retried_check(object_type, object_id, grant)
114+
115+
def _verify(self, object_type: str, object_id: str, acl: Grant) -> bool:
116+
grant_dict = dataclasses.asdict(acl)
117+
del grant_dict["action_type"]
118+
del grant_dict["principal"]
119+
grants_on_object = self._grants_crawler._grants(**grant_dict)
120+
121+
if grants_on_object:
122+
action_types_for_current_principal = [
123+
grant.action_type for grant in grants_on_object if grant.principal == acl.principal
124+
]
125+
acl_action_types = acl.action_type.split(", ")
126+
if all(action_type in action_types_for_current_principal for action_type in acl_action_types):
127+
return True
128+
msg = (
129+
f"Couldn't find permission for object type {object_type}, id {object_id} and principal {acl.principal}\n"
130+
f"acl to be applied={acl_action_types}\n"
131+
f"acl found in the object={action_types_for_current_principal}\n"
132+
)
133+
raise ValueError(msg)
134+
return False
135+
136+
def get_verify_task(self, item: Permissions) -> Callable[[], bool]:
137+
grant = Grant(**json.loads(item.raw))
138+
return partial(self._verify, item.object_type, item.object_id, grant)

0 commit comments

Comments
 (0)