Skip to content

Commit e824c18

Browse files
authored
Create $inventory.tables from Scala notebook (#207)
This PR allows fetching table metadata without the need to access storage, allowing for a more straightforward configuration that no longer requires Azure storage credentials to be present in the cluster config. This notebook allows for a faster scanning performance. Fixes #205 Co-authored by: Lars George <[email protected]>
1 parent 980894d commit e824c18

File tree

4 files changed

+94
-29
lines changed

4 files changed

+94
-29
lines changed

src/databricks/labs/ucx/framework/tasks.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ class Task:
1818
fn: Callable[[MigrationConfig], None]
1919
depends_on: list[str] = None
2020
job_cluster: str = "main"
21+
notebook: str = None
2122

2223

23-
def task(workflow, *, depends_on=None, job_cluster="main"):
24+
def task(workflow, *, depends_on=None, job_cluster="main", notebook: str | None = None):
2425
def decorator(func):
2526
@wraps(func)
2627
def wrapper(*args, **kwargs):
@@ -52,7 +53,13 @@ def wrapper(*args, **kwargs):
5253
raise SyntaxError(msg)
5354

5455
_TASKS[func.__name__] = Task(
55-
workflow=workflow, name=func.__name__, doc=func.__doc__, fn=func, depends_on=deps, job_cluster=job_cluster
56+
workflow=workflow,
57+
name=func.__name__,
58+
doc=func.__doc__,
59+
fn=func,
60+
depends_on=deps,
61+
job_cluster=job_cluster,
62+
notebook=notebook,
5663
)
5764

5865
return wrapper
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
2+
import org.apache.spark.sql.DataFrame
3+
4+
// must follow the same structure as databricks.labs.ucx.hive_metastore.tables.Table
5+
case class TableDetails(catalog: String, database: String, name: String, object_type: String,
6+
table_format: String, location: String, view_text: String)
7+
8+
def metadataForAllTables(databases: Seq[String]): DataFrame = {
9+
import spark.implicits._
10+
11+
val externalCatalog = spark.sharedState.externalCatalog
12+
databases.par.flatMap(databaseName => {
13+
val tables = externalCatalog.listTables(databaseName)
14+
if (tables == null) {
15+
println(s"[WARN][${databaseName}] listTables returned null")
16+
Seq()
17+
} else {
18+
tables.par.map(tableName => try {
19+
val table = externalCatalog.getTable(databaseName, tableName)
20+
if (table == null) {
21+
println(s"[WARN][${databaseName}.${tableName}] result is null")
22+
None
23+
} else {
24+
Some(TableDetails("hive_metastore", databaseName, tableName, table.tableType.name, table.provider.orNull,
25+
table.storage.locationUri.map(_.toString).orNull, table.viewText.orNull))
26+
}
27+
} catch {
28+
case err: Throwable =>
29+
println(s"[ERROR][${databaseName}.${tableName}] ignoring table because of ${err}")
30+
None
31+
}).toList.collect {
32+
case Some(x) => x
33+
}
34+
}
35+
}).toList.toDF
36+
}
37+
38+
dbutils.widgets.text("inventory_database", "ucx")
39+
val inventoryDatabase = dbutils.widgets.get("inventory_database")
40+
41+
val df = metadataForAllTables(spark.sharedState.externalCatalog.listDatabases())
42+
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(s"$inventoryDatabase.tables")

src/databricks/labs/ucx/install.py

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from databricks.labs.ucx.__about__ import __version__
1818
from databricks.labs.ucx.config import GroupsConfig, MigrationConfig, TaclConfig
19-
from databricks.labs.ucx.framework.tasks import _TASKS
19+
from databricks.labs.ucx.framework.tasks import _TASKS, Task
2020
from databricks.labs.ucx.runtime import main
2121

2222
TAG_STEP = "step"
@@ -65,6 +65,7 @@ def __init__(self, ws: WorkspaceClient, *, prefix: str = "ucx", promtps: bool =
6565
self._ws = ws
6666
self._prefix = prefix
6767
self._prompts = promtps
68+
self._this_file = Path(__file__)
6869

6970
def run(self):
7071
self._configure()
@@ -230,8 +231,7 @@ def _upload_wheel(self) -> str:
230231
self._ws.workspace.upload(remote_wheel, f, overwrite=True, format=ImportFormat.AUTO)
231232
return remote_wheel
232233

233-
def _job_settings(self, step_name, dbfs_path):
234-
config_file = f"/Workspace/{self._install_folder}/config.yml"
234+
def _job_settings(self, step_name: str, dbfs_path: str):
235235
email_notifications = None
236236
if "@" in self._my_username:
237237
email_notifications = jobs.JobEmailNotifications(
@@ -243,22 +243,44 @@ def _job_settings(self, step_name, dbfs_path):
243243
"tags": {TAG_APP: self._prefix, TAG_STEP: step_name},
244244
"job_clusters": self._job_clusters({t.job_cluster for t in tasks}),
245245
"email_notifications": email_notifications,
246-
"tasks": [
247-
jobs.Task(
248-
task_key=task.name,
249-
job_cluster_key=task.job_cluster,
250-
depends_on=[jobs.TaskDependency(task_key=d) for d in _TASKS[task.name].depends_on],
251-
libraries=[compute.Library(whl=f"dbfs:{dbfs_path}")],
252-
python_wheel_task=jobs.PythonWheelTask(
253-
package_name="databricks_labs_ucx",
254-
entry_point="runtime", # [project.entry-points.databricks] in pyproject.toml
255-
named_parameters={"task": task.name, "config": config_file},
256-
),
257-
)
258-
for task in tasks
259-
],
246+
"tasks": [self._job_task(task, dbfs_path) for task in tasks],
260247
}
261248

249+
def _job_task(self, task: Task, dbfs_path: str) -> jobs.Task:
250+
jobs_task = jobs.Task(
251+
task_key=task.name,
252+
job_cluster_key=task.job_cluster,
253+
depends_on=[jobs.TaskDependency(task_key=d) for d in _TASKS[task.name].depends_on],
254+
)
255+
if task.notebook:
256+
return self._job_notebook_task(jobs_task, task)
257+
return self._job_wheel_task(jobs_task, task, dbfs_path)
258+
259+
def _job_notebook_task(self, jobs_task: jobs.Task, task: Task) -> jobs.Task:
260+
local_notebook = self._this_file.parent / task.notebook
261+
remote_notebook = f"{self._install_folder}/{local_notebook.name}"
262+
with local_notebook.open("rb") as f:
263+
self._ws.workspace.upload(remote_notebook, f)
264+
return replace(
265+
jobs_task,
266+
notebook_task=jobs.NotebookTask(
267+
notebook_path=remote_notebook,
268+
# ES-872211: currently, we cannot read WSFS files from Scala context
269+
base_parameters={"inventory_database": self._current_config.inventory_database},
270+
),
271+
)
272+
273+
def _job_wheel_task(self, jobs_task: jobs.Task, task: Task, dbfs_path: str) -> jobs.Task:
274+
return replace(
275+
jobs_task,
276+
libraries=[compute.Library(whl=f"dbfs:{dbfs_path}")],
277+
python_wheel_task=jobs.PythonWheelTask(
278+
package_name="databricks_labs_ucx",
279+
entry_point="runtime", # [project.entry-points.databricks] in pyproject.toml
280+
named_parameters={"task": task.name, "config": self._config_file},
281+
),
282+
)
283+
262284
def _job_clusters(self, names: set[str]):
263285
clusters = []
264286
spec = self._cluster_node_type(
@@ -292,16 +314,15 @@ def _job_clusters(self, names: set[str]):
292314
)
293315
return clusters
294316

295-
@staticmethod
296-
def _build_wheel(tmp_dir: str, *, verbose: bool = False):
317+
def _build_wheel(self, tmp_dir: str, *, verbose: bool = False):
297318
"""Helper to build the wheel package"""
298319
streams = {}
299320
if not verbose:
300321
streams = {
301322
"stdout": subprocess.DEVNULL,
302323
"stderr": subprocess.DEVNULL,
303324
}
304-
project_root = Installer._find_project_root(Path(__file__))
325+
project_root = Installer._find_project_root(self._this_file)
305326
if not project_root:
306327
msg = "Cannot find project root"
307328
raise NotADirectoryError(msg)

src/databricks/labs/ucx/runtime.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,15 @@ def setup_schema(cfg: MigrationConfig):
2020
backend.execute(f"CREATE SCHEMA IF NOT EXISTS hive_metastore.{cfg.inventory_database}")
2121

2222

23-
@task("assessment", depends_on=[setup_schema])
24-
def crawl_tables(cfg: MigrationConfig):
23+
@task("assessment", depends_on=[setup_schema], notebook="hive_metastore/tables.scala")
24+
def crawl_tables(_: MigrationConfig):
2525
"""During this operation, a systematic scan is conducted, encompassing every table within the Hive Metastore.
2626
This scan extracts essential details associated with each table, including its unique identifier or name, table
2727
format, storage location details.
2828
2929
The extracted metadata is subsequently organized and cataloged within a dedicated storage entity known as
3030
the `$inventory.tables` table. This table functions as a comprehensive inventory, providing a structured and
3131
easily accessible reference point for users, data engineers, and administrators."""
32-
ws = WorkspaceClient(config=cfg.to_databricks_config())
33-
tacls = TaclToolkit(
34-
ws, inventory_catalog="hive_metastore", inventory_schema=cfg.inventory_database, databases=cfg.tacl.databases
35-
)
36-
tacls.database_snapshot()
3732

3833

3934
@task("assessment", depends_on=[crawl_tables], job_cluster="tacl")

0 commit comments

Comments
 (0)