Skip to content

Commit 24e354b

Browse files
FastLeelarsgeorge-dbwilliam-conti
authored
Added crawlers for compatibility of jobs and clusters, along with basic recommendations for external locations (#244)
This PR include the following assessment features: External Locations Jobs Clusters --------- Co-authored-by: Lars George <[email protected]> Co-authored-by: william-conti <[email protected]> Co-authored-by: Lars George <[email protected]>
1 parent 06f104e commit 24e354b

File tree

7 files changed

+423
-70
lines changed

7 files changed

+423
-70
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from databricks.labs.ucx.assessment.assessment import AssessmentToolkit
2+
3+
__all__ = ["AssessmentToolkit"]
Lines changed: 169 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,66 @@
1-
import logging
2-
import re
3-
from importlib import resources
1+
import json
2+
import typing
3+
from dataclasses import dataclass
44

55
from databricks.sdk import WorkspaceClient
6-
from databricks.sdk.service.compute import Language
6+
from databricks.sdk.service.jobs import BaseJob
77

8-
from databricks.labs.ucx.assessment import commands
9-
from databricks.labs.ucx.mixins.compute import CommandExecutor
8+
from databricks.labs.ucx.framework.crawlers import CrawlerBase
9+
from databricks.labs.ucx.hive_metastore.data_objects import ExternalLocationCrawler
10+
from databricks.labs.ucx.hive_metastore.table_acls import SqlBackend
1011

11-
logger = logging.getLogger(__name__)
12+
13+
@dataclass
14+
class JobInfo:
15+
job_id: str
16+
job_name: str
17+
creator: str
18+
success: int
19+
failures: str
20+
21+
22+
@dataclass
23+
class ClusterInfo:
24+
cluster_id: str
25+
cluster_name: str
26+
creator: str
27+
success: int
28+
failures: str
29+
30+
31+
def spark_version_compatibility(spark_version: str) -> str:
32+
first_comp_custom_rt = 3
33+
first_comp_custom_x = 2
34+
dbr_version_components = spark_version.split("-")
35+
first_components = dbr_version_components[0].split(".")
36+
if len(first_components) != first_comp_custom_rt:
37+
# custom runtime
38+
return "unsupported"
39+
if first_components[first_comp_custom_x] != "x":
40+
# custom runtime
41+
return "unsupported"
42+
version = int(first_components[0]), int(first_components[1])
43+
if version < (10, 0):
44+
return "unsupported"
45+
if (10, 0) <= version < (11, 3):
46+
return "kinda works"
47+
return "supported"
1248

1349

1450
class AssessmentToolkit:
15-
def __init__(self, ws: WorkspaceClient, cluster_id, inventory_catalog, inventory_schema, warehouse_id=None):
51+
incompatible_spark_config_keys: typing.ClassVar[tuple] = {
52+
"spark.databricks.passthrough.enabled",
53+
"spark.hadoop.javax.jdo.option.ConnectionURL",
54+
"spark.databricks.hive.metastore.glueCatalog.enabled",
55+
}
56+
57+
def __init__(self, ws: WorkspaceClient, inventory_schema, backend=None):
58+
self._all_jobs = None
59+
self._all_clusters_by_id = None
1660
self._ws = ws
17-
self._inventory_catalog = inventory_catalog
1861
self._inventory_schema = inventory_schema
19-
self._warehouse_id = warehouse_id
20-
self._cluster_id = cluster_id
21-
self._command_executor = None
22-
self._managed_executor = False
23-
24-
def _get_command_executor(self, executor: CommandExecutor | None = None, language=None):
25-
ce = executor
26-
if ce is None:
27-
if language:
28-
ce = CommandExecutor(self._ws, language=language, cluster_id=self._cluster_id)
29-
else:
30-
ce = CommandExecutor(self._ws, cluster_id=self._cluster_id)
31-
self._managed_executor = True
32-
self._command_executor = ce
33-
return ce
34-
35-
def _remove_command_executor(self):
36-
if self._managed_executor:
37-
self._command_executor = None
38-
self._managed_executor = False
39-
40-
@staticmethod
41-
def _load_command_code(name):
42-
cmd_file = resources.files(commands) / name
43-
with cmd_file.open("rt") as f:
44-
cmd_code = f.read()
45-
return cmd_code
46-
47-
def _get_command(self, name, params: dict | None = None):
48-
cmd_code = self._load_command_code(name)
49-
if params:
50-
for pattern, replace in params.items():
51-
p = re.compile(pattern)
52-
cmd_code = p.sub(replace, cmd_code)
53-
return cmd_code
62+
self._backend = backend
63+
self._external_locations = None
5464

5565
@staticmethod
5666
def _verify_ws_client(w: WorkspaceClient):
@@ -60,20 +70,118 @@ def _verify_ws_client(w: WorkspaceClient):
6070
msg = "Current user is not a workspace admin"
6171
raise RuntimeError(msg)
6272

63-
def table_inventory(self, executor: CommandExecutor | None = None):
64-
logger.info("Started dataset inventorization...")
65-
ce = self._get_command_executor(executor, language=Language.SCALA)
66-
params = {"SCHEMA": self._inventory_schema}
67-
cmd_code = self._get_command("create_table_inventory.scala", params=params)
68-
command_output = ce.run(cmd_code)
69-
logger.debug(command_output)
70-
if executor is None:
71-
self._remove_command_executor()
72-
logger.info("Completed dataset inventorization...")
73-
74-
def compile_report(self):
75-
logger.info("Started report compilation...")
76-
ce = self._get_command_executor(None, language=Language.SCALA)
77-
self.table_inventory(ce)
78-
self._remove_command_executor()
79-
logger.info("Completed report compilation...")
73+
def generate_external_location_list(self):
74+
crawler = ExternalLocationCrawler(self._ws, self._backend, self._inventory_schema)
75+
return crawler.snapshot()
76+
77+
def generate_job_assessment(self):
78+
crawler = JobsCrawler(self._ws, self._backend, self._inventory_schema)
79+
return crawler.snapshot()
80+
81+
def generate_cluster_assessment(self):
82+
crawler = ClustersCrawler(self._ws, self._backend, self._inventory_schema)
83+
return crawler.snapshot()
84+
85+
86+
class ClustersCrawler(CrawlerBase):
87+
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
88+
super().__init__(sbe, "hive_metastore", schema, "clusters")
89+
self._ws = ws
90+
91+
def _crawl(self) -> list[ClusterInfo]:
92+
all_clusters = list(self._ws.clusters.list())
93+
return list(self._assess_clusters(all_clusters))
94+
95+
def _assess_clusters(self, all_clusters):
96+
for cluster in all_clusters:
97+
cluster_info = ClusterInfo(cluster.cluster_id, cluster.cluster_name, cluster.creator_user_name, 1, "")
98+
support_status = spark_version_compatibility(cluster.spark_version)
99+
failures = []
100+
if support_status != "supported":
101+
failures.append(f"not supported DBR: {cluster.spark_version}")
102+
103+
if cluster.spark_conf is not None:
104+
for k in AssessmentToolkit.incompatible_spark_config_keys:
105+
if k in cluster.spark_conf:
106+
failures.append(f"unsupported config: {k}")
107+
108+
for value in cluster.spark_conf.values():
109+
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
110+
failures.append(f"using DBFS mount in configuration: {value}")
111+
cluster_info.failures = json.dumps(failures)
112+
if len(failures) > 0:
113+
cluster_info.success = 0
114+
yield cluster_info
115+
116+
def snapshot(self) -> list[ClusterInfo]:
117+
return self._snapshot(self._try_fetch, self._crawl)
118+
119+
def _try_fetch(self) -> list[ClusterInfo]:
120+
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
121+
yield ClusterInfo(*row)
122+
123+
124+
class JobsCrawler(CrawlerBase):
125+
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
126+
super().__init__(sbe, "hive_metastore", schema, "jobs")
127+
self._ws = ws
128+
129+
def _get_cluster_configs_from_all_jobs(self, all_jobs, all_clusters_by_id):
130+
for j in all_jobs:
131+
if j.settings.job_clusters is not None:
132+
for jc in j.settings.job_clusters:
133+
if jc.new_cluster is None:
134+
continue
135+
yield j, jc.new_cluster
136+
137+
for t in j.settings.tasks:
138+
if t.existing_cluster_id is not None:
139+
interactive_cluster = all_clusters_by_id.get(t.existing_cluster_id, None)
140+
if interactive_cluster is None:
141+
continue
142+
yield j, interactive_cluster
143+
144+
elif t.new_cluster is not None:
145+
yield j, t.new_cluster
146+
147+
def _crawl(self) -> list[JobInfo]:
148+
all_jobs = list(self._ws.jobs.list(expand_tasks=True))
149+
all_clusters = {c.cluster_id: c for c in self._ws.clusters.list()}
150+
return self._assess_jobs(all_jobs, all_clusters)
151+
152+
def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobInfo]:
153+
job_assessment = {}
154+
job_details = {}
155+
for job in all_jobs:
156+
job_assessment[job.job_id] = set()
157+
job_details[job.job_id] = JobInfo(str(job.job_id), job.settings.name, job.creator_user_name, 1, "")
158+
159+
for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
160+
support_status = spark_version_compatibility(cluster_config.spark_version)
161+
if support_status != "supported":
162+
job_assessment[job.job_id].add(f"not supported DBR: {cluster_config.spark_version}")
163+
164+
if cluster_config.spark_conf is not None:
165+
for k in AssessmentToolkit.incompatible_spark_config_keys:
166+
if k in cluster_config.spark_conf:
167+
job_assessment[job.job_id].add(f"unsupported config: {k}")
168+
169+
for value in cluster_config.spark_conf.values():
170+
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
171+
job_assessment[job.job_id].add(f"using DBFS mount in configuration: {value}")
172+
for job_key in job_details.keys():
173+
job_details[job_key].failures = json.dumps(list(job_assessment[job_key]))
174+
if len(job_assessment[job_key]) > 0:
175+
job_details[job_key].success = 0
176+
return list(job_details.values())
177+
178+
def snapshot(self) -> list[ClusterInfo]:
179+
return self._snapshot(self._try_fetch, self._crawl)
180+
181+
def _try_fetch(self) -> list[ClusterInfo]:
182+
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
183+
yield JobInfo(*row)
184+
185+
186+
if __name__ == "__main__":
187+
print("Databricks UC Assessment")

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ def _row_to_sql(row, fields):
8686
elif f.type == bool:
8787
data.append("TRUE" if value else "FALSE")
8888
elif f.type == str:
89+
value = value.replace("'", "''")
8990
data.append(f"'{value}'")
91+
elif f.type == int:
92+
data.append(f"{value}")
9093
else:
9194
msg = f"unknown type: {f.type}"
9295
raise ValueError(msg)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import os
2+
import typing
3+
from dataclasses import dataclass
4+
5+
from databricks.sdk import WorkspaceClient
6+
7+
from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend
8+
from databricks.labs.ucx.hive_metastore.list_mounts import Mounts
9+
10+
11+
@dataclass
12+
class ExternalLocation:
13+
location: str
14+
15+
16+
class ExternalLocationCrawler(CrawlerBase):
17+
_prefix_size: typing.ClassVar[list[int]] = [1, 12]
18+
19+
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
20+
super().__init__(sbe, "hive_metastore", schema, "external_locations")
21+
self._ws = ws
22+
23+
def _external_locations(self, tables, mounts):
24+
min_slash = 2
25+
external_locations = []
26+
for table in tables:
27+
location = table.as_dict()["location"]
28+
if location is not None and len(location) > 0:
29+
if location.startswith("dbfs:/mnt"):
30+
for mount in mounts:
31+
if location[5:].startswith(mount.name):
32+
location = location[5:].replace(mount.name, mount.source)
33+
break
34+
if not location.startswith("dbfs") and (
35+
self._prefix_size[0] < location.find(":/") < self._prefix_size[1]
36+
):
37+
dupe = False
38+
loc = 0
39+
while loc < len(external_locations) and not dupe:
40+
common = (
41+
os.path.commonpath(
42+
[external_locations[loc].location, os.path.dirname(location) + "/"]
43+
).replace(":/", "://")
44+
+ "/"
45+
)
46+
if common.count("/") > min_slash:
47+
external_locations[loc] = ExternalLocation(common)
48+
dupe = True
49+
loc += 1
50+
if not dupe:
51+
external_locations.append(ExternalLocation(os.path.dirname(location) + "/"))
52+
return external_locations
53+
54+
def _external_location_list(self):
55+
tables = self._backend.fetch(f"SELECT location FROM {self._schema}.tables WHERE location IS NOT NULL")
56+
mounts = Mounts(self._backend, self._ws, self._schema).snapshot()
57+
return self._external_locations(list(tables), list(mounts))
58+
59+
def snapshot(self) -> list[ExternalLocation]:
60+
return self._snapshot(self._try_fetch, self._external_location_list)
61+
62+
def _try_fetch(self) -> list[ExternalLocation]:
63+
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
64+
yield ExternalLocation(*row)

src/databricks/labs/ucx/hive_metastore/list_mounts.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,10 @@ def _list_mounts(self):
2727
for mount_point, source, _ in self._dbutils.fs.mounts():
2828
mounts.append(Mount(mount_point, source))
2929
return mounts
30+
31+
def snapshot(self) -> list[Mount]:
32+
return self._snapshot(self._try_fetch, self._list_mounts)
33+
34+
def _try_fetch(self) -> list[Mount]:
35+
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
36+
yield Mount(*row)

0 commit comments

Comments
 (0)