Skip to content

Commit 438ffc8

Browse files
authored
Track DirectFsAccess on JobsProgressEncoder (#3375)
## Changes Track `DirectFsAccess` on `JobsProgressEncoder` ### Linked issues Resolves #3059 ### Functionality - [x] modified existing workflow: `migration-progress-experimental` ### Tests - [x] added unit tests - [x] added integration tests
1 parent bccc103 commit 438ffc8

File tree

6 files changed

+172
-17
lines changed

6 files changed

+172
-17
lines changed

src/databricks/labs/ucx/contexts/workflow_task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ def jobs_progress(self) -> ProgressEncoder[JobInfo]:
203203
return JobsProgressEncoder(
204204
self.sql_backend,
205205
self.job_ownership,
206+
[self.directfs_access_crawler_for_paths, self.directfs_access_crawler_for_queries],
206207
self.inventory_database,
207208
self.parent_run_id,
208209
self.workspace_id,

src/databricks/labs/ucx/progress/jobs.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@
77
from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership
88
from databricks.labs.ucx.progress.history import ProgressEncoder
99
from databricks.labs.ucx.progress.install import Historical
10+
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
1011
from databricks.labs.ucx.source_code.jobs import JobProblem
1112

1213

1314
class JobsProgressEncoder(ProgressEncoder[JobInfo]):
15+
"""Encoder class:Job to class:History."""
1416

1517
def __init__(
1618
self,
1719
sql_backend: SqlBackend,
1820
ownership: JobOwnership,
21+
direct_fs_access_crawlers: list[DirectFsAccessCrawler],
1922
inventory_database: str,
2023
run_id: int,
2124
workspace_id: int,
2225
catalog: str,
23-
schema: str = "multiworkspace",
24-
table: str = "historical",
2526
) -> None:
2627
super().__init__(
2728
sql_backend,
@@ -30,9 +31,10 @@ def __init__(
3031
run_id,
3132
workspace_id,
3233
catalog,
33-
schema,
34-
table,
34+
"multiworkspace",
35+
"historical",
3536
)
37+
self._direct_fs_access_crawlers = direct_fs_access_crawlers
3638
self._inventory_database = inventory_database
3739

3840
@cached_property
@@ -48,7 +50,36 @@ def _job_problems(self) -> dict[int, list[str]]:
4850
index[job_problem.job_id].append(failure)
4951
return index
5052

53+
@cached_property
54+
def _direct_fs_accesses(self) -> dict[str, list[str]]:
55+
index = collections.defaultdict(list)
56+
for crawler in self._direct_fs_access_crawlers:
57+
for direct_fs_access in crawler.snapshot():
58+
# The workflow and task source lineage are added by the WorkflowLinter
59+
if len(direct_fs_access.source_lineage) < 2:
60+
continue
61+
if direct_fs_access.source_lineage[0].object_type != "WORKFLOW":
62+
continue
63+
if direct_fs_access.source_lineage[1].object_type != "TASK":
64+
continue
65+
job_id = direct_fs_access.source_lineage[0].object_id
66+
task_key = direct_fs_access.source_lineage[1].object_id # <job id>/<task key>
67+
# Follow same failure message structure as the JobProblems above and DirectFsAccessPyLinter deprecation
68+
code = "direct-filesystem-access"
69+
message = f"The use of direct filesystem references is deprecated: {direct_fs_access.path}"
70+
failure = f"{code}: {task_key} task: {direct_fs_access.source_id}: {message}"
71+
index[job_id].append(failure)
72+
return index
73+
5174
def _encode_record_as_historical(self, record: JobInfo) -> Historical:
75+
"""Encode a job as a historical records.
76+
77+
Failures are detected by the WorkflowLinter:
78+
- Job problems
79+
- Direct filesystem access by code used in job
80+
"""
5281
historical = super()._encode_record_as_historical(record)
53-
failures = self._job_problems.get(int(record.job_id), [])
82+
failures = []
83+
failures.extend(self._job_problems.get(int(record.job_id), []))
84+
failures.extend(self._direct_fs_accesses.get(record.job_id, []))
5485
return replace(historical, failures=historical.failures + failures)

src/databricks/labs/ucx/source_code/directfs_access.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None:
5050
Providing a multi-entity crawler is out-of-scope of this PR
5151
"""
5252
try:
53-
# TODO until we historize data, we append all DFSAs
53+
# TODO: Until we historize data, we append all DFSAs
54+
# UPDATE: We historize DFSA from WorkflowLinter, not from QueryLinter yet
5455
self._update_snapshot(dfsas, mode="append")
5556
except DatabricksError as e:
5657
logger.error("Failed to store DFSAs", exc_info=e)

tests/integration/conftest.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -570,20 +570,26 @@ def make_cluster_policy(self, **kwargs) -> CreatePolicyResponse:
570570
def make_cluster_policy_permissions(self, **kwargs):
571571
return self._make_cluster_policy_permissions(**kwargs)
572572

573+
def make_job(self, **kwargs) -> Job:
574+
job = self._make_job(**kwargs)
575+
self._jobs.append(job)
576+
return job
577+
578+
def make_dashboard(self, **kwargs) -> Dashboard:
579+
dashboard = self._make_dashboard(**kwargs)
580+
self._dashboards.append(dashboard)
581+
return dashboard
582+
573583
def make_linting_resources(self) -> None:
574584
"""Make resources to lint."""
575-
notebook_job_1 = self._make_job(content="spark.read.parquet('dbfs://mnt/notebook/')")
576-
notebook_job_2 = self._make_job(content="spark.table('old.stuff')")
577-
file_job_1 = self._make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask)
578-
file_job_2 = self._make_job(content="spark.table('some.table')", task_type=SparkPythonTask)
585+
self.make_job(content="spark.read.parquet('dbfs://mnt/notebook/')")
586+
self.make_job(content="spark.table('old.stuff')")
587+
self.make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask)
588+
self.make_job(content="spark.table('some.table')", task_type=SparkPythonTask)
579589
query_1 = self._make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`')
580-
dashboard_1 = self._make_dashboard(query=query_1)
590+
self._make_dashboard(query=query_1)
581591
query_2 = self._make_query(sql_query='SELECT * from my_schema.my_table')
582-
dashboard_2 = self._make_dashboard(query=query_2)
583-
584-
self._jobs.extend([notebook_job_1, notebook_job_2, file_job_1, file_job_2])
585-
self._dashboards.append(dashboard_1)
586-
self._dashboards.append(dashboard_2)
592+
self._make_dashboard(query=query_2)
587593

588594
def add_table(self, table: TableInfo):
589595
self._tables.append(table)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from databricks.labs.ucx.assessment.jobs import JobInfo
2+
from databricks.labs.ucx.framework.utils import escape_sql_identifier
3+
from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom
4+
from databricks.labs.ucx.source_code.jobs import JobProblem
5+
6+
7+
def test_job_progress_encoder_failures(runtime_ctx, az_cli_ctx) -> None:
8+
az_cli_ctx.progress_tracking_installation.run()
9+
runtime_ctx = runtime_ctx.replace(
10+
parent_run_id=1,
11+
sql_backend=az_cli_ctx.sql_backend,
12+
ucx_catalog=az_cli_ctx.ucx_catalog,
13+
)
14+
15+
job = runtime_ctx.make_job()
16+
assert job.job_id, "Expected job with id"
17+
assert job.settings and job.settings.tasks, "Expected job with tasks"
18+
19+
job_problems = [
20+
JobProblem(
21+
job_id=job.job_id,
22+
job_name=job.settings.name,
23+
task_key=job.settings.tasks[0].task_key,
24+
path="parent/child.py",
25+
code="sql-parse-error",
26+
message="Could not parse SQL",
27+
start_line=12,
28+
start_col=0,
29+
end_line=12,
30+
end_col=20,
31+
)
32+
]
33+
runtime_ctx.sql_backend.save_table(
34+
f'{runtime_ctx.inventory_database}.workflow_problems',
35+
job_problems,
36+
JobProblem,
37+
mode='overwrite',
38+
)
39+
40+
dashboard = runtime_ctx.make_dashboard()
41+
42+
direct_fs_access_for_path = DirectFsAccess(
43+
source_id="/path/to/write_dfsa.py",
44+
source_lineage=[
45+
LineageAtom(object_type="WORKFLOW", object_id=str(job.job_id), other={"name": job.settings.name}),
46+
LineageAtom(object_type="TASK", object_id=job.settings.tasks[0].task_key),
47+
],
48+
path="dfsa:/path/to/data/",
49+
is_read=False,
50+
is_write=True,
51+
)
52+
runtime_ctx.directfs_access_crawler_for_paths.dump_all([direct_fs_access_for_path])
53+
54+
direct_fs_access_for_query = DirectFsAccess(
55+
source_id="/path/to/write_dfsa.py",
56+
source_lineage=[
57+
LineageAtom(
58+
object_type="DASHBOARD",
59+
object_id=dashboard.id,
60+
other={"parent": dashboard.parent, "name": dashboard.name},
61+
),
62+
LineageAtom(object_type="QUERY", object_id=f"{dashboard.id}/query", other={"name": "test"}),
63+
],
64+
path="dfsa:/path/to/data/",
65+
is_read=False,
66+
is_write=True,
67+
)
68+
runtime_ctx.directfs_access_crawler_for_queries.dump_all([direct_fs_access_for_query])
69+
70+
job_info = JobInfo(
71+
str(job.job_id),
72+
success=1,
73+
failures="[]",
74+
job_name=job.settings.name,
75+
creator=job.creator_user_name,
76+
)
77+
runtime_ctx.jobs_progress.append_inventory_snapshot([job_info])
78+
79+
history_table_name = escape_sql_identifier(runtime_ctx.tables_progress.full_name)
80+
records = list(runtime_ctx.sql_backend.fetch(f"SELECT * FROM {history_table_name}"))
81+
82+
assert len(records) == 1, "Expected one historical entry"
83+
assert records[0].failures == [
84+
f"sql-parse-error: {job.settings.tasks[0].task_key} task: parent/child.py: Could not parse SQL",
85+
f"direct-filesystem-access: {job.settings.tasks[0].task_key} task: /path/to/write_dfsa.py: The use of direct filesystem references is deprecated: dfsa:/path/to/data/",
86+
]

tests/unit/progress/test_jobs.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from databricks.labs.lsql import Row
44
from databricks.labs.lsql.backends import MockBackend
55

6+
from databricks.labs.ucx import __version__
67
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
78
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
8-
from databricks.labs.ucx import __version__
9+
from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom
10+
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
911

1012

1113
def test_jobs_progress_encoder() -> None:
@@ -28,9 +30,35 @@ def test_jobs_progress_encoder() -> None:
2830
)
2931
job_ownership = create_autospec(JobOwnership)
3032
job_ownership.owner_of.return_value = "some_owner"
33+
direct_fs_access_crawler = create_autospec(DirectFsAccessCrawler)
34+
direct_fs_accesses = [
35+
DirectFsAccess(
36+
source_id="/path/to/write_dfsa.py",
37+
source_lineage=[
38+
LineageAtom(object_type="WORKFLOW", object_id="1", other={"name": "test"}),
39+
LineageAtom(object_type="TASK", object_id="1/write-dfsa"),
40+
],
41+
path="dfsa:/path/to/data/",
42+
is_read=False,
43+
is_write=True,
44+
),
45+
DirectFsAccess(
46+
source_id="/path/to/write_dfsa.py",
47+
source_lineage=[
48+
# Dashboard with same id as job is unlikely, but here to test it is not included
49+
LineageAtom(object_type="DASHBOARD", object_id="1", other={"parent": "parent", "name": "test"}),
50+
LineageAtom(object_type="QUERY", object_id="1/query", other={"name": "test"}),
51+
],
52+
path="dfsa:/path/to/data/",
53+
is_read=False,
54+
is_write=True,
55+
),
56+
]
57+
direct_fs_access_crawler.snapshot.return_value = direct_fs_accesses
3158
jobs_progress_encoder = JobsProgressEncoder(
3259
sql_backend,
3360
job_ownership,
61+
[direct_fs_access_crawler],
3462
"inventory",
3563
2,
3664
3,
@@ -59,8 +87,10 @@ def test_jobs_progress_encoder() -> None:
5987
'some failure from config',
6088
'cannot-autofix-table-reference: a task: /some/path: some failure',
6189
'catalog-api-in-shared-clusters: b task: /some/other: some failure',
90+
"direct-filesystem-access: 1/write-dfsa task: /path/to/write_dfsa.py: The use of direct filesystem references is deprecated: dfsa:/path/to/data/",
6291
],
6392
owner='some_owner',
6493
ucx_version=__version__,
6594
)
6695
]
96+
direct_fs_access_crawler.snapshot.assert_called_once()

0 commit comments

Comments
 (0)