Skip to content

Commit 2b8a0bb

Browse files
larsgeorge-dbFastLeenfx
authored
Assessment report (#157)
First steps, since I want to merge Liran's work and continue there. --------- Co-authored-by: FastLee <[email protected]> Co-authored-by: Serge Smertin <[email protected]>
1 parent 396ffaf commit 2b8a0bb

File tree

7 files changed

+151
-1
lines changed

7 files changed

+151
-1
lines changed

src/databricks/labs/ucx/assessment/__init__.py

Whitespace-only changes.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType};
2+
3+
import org.apache.spark.sql.catalyst.TableIdentifier;
4+
import java.io.{FileWriter, BufferedWriter, File};
5+
6+
val schema = "SCHEMA"
7+
val bw = new BufferedWriter(new FileWriter(new File("/tmp/metastore_schema.csv"), true));
8+
9+
bw.write("db,table,format,type,table_location,created_version,created_time,last_access,lib,inputformat,outputformat\n")
10+
val dbs = spark.sharedState.externalCatalog.listDatabases()
11+
for( db <- dbs) {
12+
//println(s"database: ${db}")
13+
val tables = spark.sharedState.externalCatalog.listTables(db)
14+
for (t <- tables) {
15+
try {
16+
//println(s"table: ${t}")
17+
val table: CatalogTable = spark.sharedState.externalCatalog.getTable(db = db, table = t)
18+
val row = s"${db},${t},${table.provider.getOrElse("Unknown")},${table.tableType.name},${table.storage.locationUri.getOrElse("None")},${table.createVersion},${table.createTime},${table.lastAccessTime},${table.storage.serde.getOrElse("Unknown")},${table.storage.inputFormat.getOrElse("Unknown")},${table.storage.outputFormat.getOrElse("Unknown")}\n"
19+
bw.write(row)
20+
} catch {
21+
case e: Exception => bw.write(s"${db},${t},Unknown,Unknown,NONE,,,,,,,\n")
22+
}
23+
}
24+
25+
}
26+
27+
bw.close;
28+
29+
dbutils.fs.cp("file:///tmp/metastore_schema.csv","dbfs:/tmp/uc_assessment/hms/")
30+
spark.sql(s"create database if not exists ${schema}")
31+
val tables_df=spark.read.option("header","true").csv("/tmp/uc_assessment/hms/metastore_schema.csv")
32+
tables_df.write.mode("overwrite").saveAsTable(s"${schema}.hms_tables");

src/databricks/labs/ucx/cli/app.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from pathlib import Path
23
from typing import Annotated
34

@@ -23,3 +24,21 @@ def migrate_groups(config_file: Annotated[Path, typer.Argument(help="Path to con
2324
toolkit.apply_permissions_to_account_groups()
2425
toolkit.delete_backup_groups()
2526
toolkit.cleanup_inventory_table()
27+
28+
29+
@app.command()
30+
def generate_assessment_report():
31+
from databricks.sdk import WorkspaceClient
32+
33+
from databricks.labs.ucx.toolkits.assessment import AssessmentToolkit
34+
35+
cluster_id = os.getenv("DATABRICKS_CLUSTER_ID")
36+
catalog = os.getenv("INVENTORY_CATALOG", "ucx")
37+
schema = os.getenv("INVENTORY_SCHEMA", "ucx")
38+
ws = WorkspaceClient()
39+
toolkit = AssessmentToolkit(ws=ws, cluster_id=cluster_id, inventory_catalog=catalog, inventory_schema=schema)
40+
toolkit.compile_report()
41+
42+
43+
if __name__ == "__main__":
44+
app()

src/databricks/labs/ucx/providers/mixins/compute.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def run(self, code):
7979

8080
ctx = self._running_command_context()
8181
result = self._commands.execute(
82-
cluster_id=self._cluster_id, language=compute.Language.PYTHON, context_id=ctx.id, command=code
82+
cluster_id=self._cluster_id, language=self._language, context_id=ctx.id, command=code
8383
).result()
8484

8585
results = result.results
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import logging
2+
import re
3+
from importlib import resources
4+
5+
from databricks.sdk import WorkspaceClient
6+
from databricks.sdk.service.compute import Language
7+
8+
from databricks.labs.ucx.assessment import commands
9+
from databricks.labs.ucx.providers.mixins.compute import CommandExecutor
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class AssessmentToolkit:
15+
def __init__(self, ws: WorkspaceClient, cluster_id, inventory_catalog, inventory_schema, warehouse_id=None):
16+
self._ws = ws
17+
self._inventory_catalog = inventory_catalog
18+
self._inventory_schema = inventory_schema
19+
self._warehouse_id = warehouse_id
20+
self._cluster_id = cluster_id
21+
self._command_executor = None
22+
self._managed_executor = False
23+
24+
def _get_command_executor(self, executor: CommandExecutor | None = None, language=None):
25+
ce = executor
26+
if ce is None:
27+
if language:
28+
ce = CommandExecutor(self._ws, language=language, cluster_id=self._cluster_id)
29+
else:
30+
ce = CommandExecutor(self._ws, cluster_id=self._cluster_id)
31+
self._managed_executor = True
32+
self._command_executor = ce
33+
return ce
34+
35+
def _remove_command_executor(self):
36+
if self._managed_executor:
37+
self._command_executor = None
38+
self._managed_executor = False
39+
40+
@staticmethod
41+
def _load_command_code(name):
42+
cmd_file = resources.files(commands) / name
43+
with cmd_file.open("rt") as f:
44+
cmd_code = f.read()
45+
return cmd_code
46+
47+
def _get_command(self, name, params: dict | None = None):
48+
cmd_code = self._load_command_code(name)
49+
if params:
50+
for pattern, replace in params.items():
51+
p = re.compile(pattern)
52+
cmd_code = p.sub(replace, cmd_code)
53+
return cmd_code
54+
55+
@staticmethod
56+
def _verify_ws_client(w: WorkspaceClient):
57+
_me = w.current_user.me()
58+
is_workspace_admin = any(g.display == "admins" for g in _me.groups)
59+
if not is_workspace_admin:
60+
msg = "Current user is not a workspace admin"
61+
raise RuntimeError(msg)
62+
63+
def table_inventory(self, executor: CommandExecutor | None = None):
64+
logger.info("Started dataset inventorization...")
65+
ce = self._get_command_executor(executor, language=Language.SCALA)
66+
params = {"SCHEMA": self._inventory_schema}
67+
cmd_code = self._get_command("create_table_inventory.scala", params=params)
68+
command_output = ce.run(cmd_code)
69+
logger.debug(command_output)
70+
if executor is None:
71+
self._remove_command_executor()
72+
logger.info("Completed dataset inventorization...")
73+
74+
def compile_report(self):
75+
logger.info("Started report compilation...")
76+
ce = self._get_command_executor(None, language=Language.SCALA)
77+
self.table_inventory(ce)
78+
self._remove_command_executor()
79+
logger.info("Completed report compilation...")
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from databricks.labs.ucx.toolkits.assessment import AssessmentToolkit
2+
3+
4+
def test_table_inventory(ws, make_catalog, make_schema):
5+
assess = AssessmentToolkit(ws, make_catalog(), make_schema())
6+
assess.table_inventory()

tests/unit/test_assessment.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from unittest.mock import Mock
2+
3+
from databricks.labs.ucx.toolkits.assessment import AssessmentToolkit
4+
5+
6+
def test_get_command():
7+
def _mock_load(name):
8+
return f"This is a test! Given name: {name}"
9+
10+
ws = Mock()
11+
at = AssessmentToolkit(ws, cluster_id=1, inventory_catalog="foo", inventory_schema="bar")
12+
at._load_command_code = _mock_load
13+
c = at._get_command("ignored", {"test": "WIN"})
14+
assert c.startswith("This is a WIN!")

0 commit comments

Comments
 (0)