Skip to content

Commit 9479258

Browse files
authored
Removed thread pool for any IAM Group removals and additions (#394)
Resolves #384 Closes #383
1 parent e701e7b commit 9479258

File tree

18 files changed

+303
-190
lines changed

18 files changed

+303
-190
lines changed

src/databricks/labs/ucx/framework/parallel.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,24 +48,22 @@ def _run(self) -> (list[Result], list[Exception]):
4848
if result is None:
4949
continue
5050
collected.append(result)
51-
self._on_finish(given_cnt, len(errors))
51+
self._on_finish(given_cnt, len(collected), len(errors))
5252

5353
return collected, errors
5454

55-
def _on_finish(self, given_cnt, failed_cnt):
55+
def _on_finish(self, given_cnt: int, collected_cnt: int, failed_cnt: int):
5656
since = dt.datetime.now() - self._started
57-
failed_pct = 0
58-
if failed_cnt > 0:
59-
failed_pct = failed_cnt / given_cnt * 100
60-
stats = f"{failed_pct:.0f}% ({failed_cnt}/{given_cnt}). Took {since}"
57+
success_pct = collected_cnt / given_cnt * 100
58+
stats = f"{success_pct:.0f}% results available ({collected_cnt}/{given_cnt}). Took {since}"
6159
if failed_cnt == given_cnt:
6260
logger.critical(f"All '{self._name}' tasks failed!!!")
63-
elif failed_pct >= self._task_fail_error_pct:
61+
elif failed_cnt > 0 and success_pct <= self._task_fail_error_pct:
6462
logger.error(f"More than half '{self._name}' tasks failed: {stats}")
6563
elif failed_cnt > 0:
6664
logger.warning(f"Some '{self._name}' tasks failed: {stats}")
6765
else:
68-
logger.info(f"Finished '{self._name}' tasks: non-empty {stats}")
66+
logger.info(f"Finished '{self._name}' tasks: {stats}")
6967

7068
def _execute(self):
7169
with ThreadPoolExecutor(self._num_threads) as pool:

src/databricks/labs/ucx/framework/tasks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from functools import wraps
55
from pathlib import Path
66

7+
from databricks.labs.ucx.__about__ import __version__
78
from databricks.labs.ucx.config import WorkspaceConfig
89
from databricks.labs.ucx.framework.logger import _install
910

@@ -95,6 +96,8 @@ def trigger(*argv):
9596
msg = f'task "{task_name}" not found. Valid tasks are: {", ".join(_TASKS.keys())}'
9697
raise KeyError(msg)
9798

99+
print(f"UCX v{__version__}")
100+
98101
current_task = _TASKS[task_name]
99102
print(current_task.doc)
100103

src/databricks/labs/ucx/hive_metastore/grants.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def for_schema_info(self, schema: SchemaInfo):
192192
def _grants(
193193
self,
194194
*,
195-
catalog: str = False,
195+
catalog: str | None = None,
196196
database: str | None = None,
197197
table: str | None = None,
198198
view: str | None = None,
@@ -257,5 +257,6 @@ def _grants(
257257
anonymous_function=anonymous_function,
258258
)
259259
except Exception as e:
260+
# TODO: https://github.com/databrickslabs/ucx/issues/406
260261
logger.error(f"Couldn't fetch grants for object {on_type} {key}: {e}")
261262
return []

src/databricks/labs/ucx/mixins/fixtures.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def create(*, name: str | None = None, **kwargs):
447447
name = f"sdk-{make_random(4)}"
448448
if "definition" not in kwargs:
449449
kwargs["definition"] = json.dumps(
450-
{"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": True}}
450+
{"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"}}
451451
)
452452
return ws.cluster_policies.create(name, **kwargs)
453453

@@ -711,7 +711,7 @@ def inventory_schema(make_schema):
711711

712712

713713
@pytest.fixture
714-
def make_catalog(ws, sql_backend, make_random):
714+
def make_catalog(ws, sql_backend, make_random) -> Callable[..., CatalogInfo]:
715715
def create() -> CatalogInfo:
716716
name = f"ucx_C{make_random(4)}".lower()
717717
sql_backend.execute(f"CREATE CATALOG {name}")
@@ -726,7 +726,7 @@ def create() -> CatalogInfo:
726726

727727

728728
@pytest.fixture
729-
def make_schema(sql_backend, make_random):
729+
def make_schema(sql_backend, make_random) -> Callable[..., SchemaInfo]:
730730
def create(*, catalog_name: str = "hive_metastore", name: str | None = None) -> SchemaInfo:
731731
if name is None:
732732
name = f"ucx_S{make_random(4)}"
@@ -742,7 +742,7 @@ def create(*, catalog_name: str = "hive_metastore", name: str | None = None) ->
742742

743743

744744
@pytest.fixture
745-
def make_table(sql_backend, make_schema, make_random):
745+
def make_table(sql_backend, make_schema, make_random) -> Callable[..., TableInfo]:
746746
def create(
747747
*,
748748
catalog_name="hive_metastore",

src/databricks/labs/ucx/runtime.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,9 @@ def migrate_permissions(cfg: WorkspaceConfig):
232232
workspace_start_path=cfg.workspace_start_path,
233233
)
234234

235-
permission_manager.apply_group_permissions(group_manager.migration_groups_provider, destination="backup")
235+
permission_manager.apply_group_permissions(group_manager.migration_state, destination="backup")
236236
group_manager.replace_workspace_groups_with_account_groups()
237-
permission_manager.apply_group_permissions(group_manager.migration_groups_provider, destination="account")
237+
permission_manager.apply_group_permissions(group_manager.migration_state, destination="account")
238238

239239

240240
@task("migrate-groups-cleanup", depends_on=[migrate_permissions])

src/databricks/labs/ucx/workspace_access/generic.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,40 @@ def _crawler_task(self, object_type: str, object_id: str) -> Permissions | None:
9898
raw=json.dumps(permissions.as_dict()),
9999
)
100100

101+
def _load_as_request(self, object_type: str, object_id: str) -> list[iam.AccessControlRequest]:
102+
loaded = self._safe_get_permissions(object_type, object_id)
103+
if loaded is None:
104+
return []
105+
acl = []
106+
for v in loaded.access_control_list:
107+
for permission in v.all_permissions:
108+
if permission.inherited:
109+
continue
110+
acl.append(
111+
iam.AccessControlRequest(
112+
permission_level=permission.permission_level,
113+
service_principal_name=v.service_principal_name,
114+
group_name=v.group_name,
115+
user_name=v.user_name,
116+
)
117+
)
118+
# sort to return deterministic results
119+
return sorted(acl, key=lambda v: f"{v.group_name}:{v.user_name}:{v.service_principal_name}")
120+
121+
def load_as_dict(self, object_type: str, object_id: str) -> dict[str, iam.PermissionLevel]:
122+
result = {}
123+
for acl in self._load_as_request(object_type, object_id):
124+
result[self._key_for_acl_dict(acl)] = acl.permission_level
125+
return result
126+
127+
@staticmethod
128+
def _key_for_acl_dict(acl: iam.AccessControlRequest) -> str:
129+
if acl.group_name is not None:
130+
return acl.group_name
131+
if acl.user_name is not None:
132+
return acl.user_name
133+
return acl.service_principal_name
134+
101135
# TODO remove after ES-892977 is fixed
102136
@retried(on=[RetryableError])
103137
def _safe_get_permissions(self, object_type: str, object_id: str) -> iam.ObjectPermissions | None:

0 commit comments

Comments
 (0)