1+ import dataclasses
12import functools
23import logging
34import shutil
45import tempfile
56from collections .abc import Generator , Iterable
67from contextlib import contextmanager
7- from dataclasses import dataclass , asdict
8+ from dataclasses import dataclass
89from datetime import datetime , timezone
910from importlib import metadata
1011from pathlib import Path
2829 guess_encoding ,
2930)
3031from databricks .labs .ucx .source_code .directfs_access import (
31- DirectFsAccess ,
3232 LineageAtom ,
3333 DirectFsAccessCrawler ,
34- DirectFsAccessInPath ,
34+ DirectFsAccess ,
3535)
3636from databricks .labs .ucx .source_code .graph import (
3737 Dependency ,
@@ -86,8 +86,8 @@ def __repr__(self):
8686 @property
8787 def lineage (self ) -> list [LineageAtom ]:
8888 job_name = (None if self ._job .settings is None else self ._job .settings .name ) or "unknown job"
89- job_lineage = LineageAtom ("job " , str (self ._job .job_id ), {"name" : job_name })
90- task_lineage = LineageAtom ("task " , self ._task .task_key )
89+ job_lineage = LineageAtom ("JOB " , str (self ._job .job_id ), {"name" : job_name })
90+ task_lineage = LineageAtom ("TASK " , self ._task .task_key )
9191 return [job_lineage , task_lineage ]
9292
9393
@@ -363,7 +363,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
363363 logger .info (f"Running { tasks } linting tasks in parallel..." )
364364 job_results , errors = Threads .gather ('linting workflows' , tasks )
365365 job_problems : list [JobProblem ] = []
366- job_dfsas : list [DirectFsAccessInPath ] = []
366+ job_dfsas : list [DirectFsAccess ] = []
367367 for problems , dfsas in job_results :
368368 job_problems .extend (problems )
369369 job_dfsas .extend (dfsas )
@@ -378,7 +378,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
378378 if len (errors ) > 0 :
379379 raise ManyError (errors )
380380
381- def lint_job (self , job_id : int ) -> tuple [list [JobProblem ], list [DirectFsAccessInPath ]]:
381+ def lint_job (self , job_id : int ) -> tuple [list [JobProblem ], list [DirectFsAccess ]]:
382382 try :
383383 job = self ._ws .jobs .get (job_id )
384384 except NotFound :
@@ -393,9 +393,9 @@ def lint_job(self, job_id: int) -> tuple[list[JobProblem], list[DirectFsAccessIn
393393
394394 _UNKNOWN = Path ('<UNKNOWN>' )
395395
396- def _lint_job (self , job : jobs .Job ) -> tuple [list [JobProblem ], list [DirectFsAccessInPath ]]:
396+ def _lint_job (self , job : jobs .Job ) -> tuple [list [JobProblem ], list [DirectFsAccess ]]:
397397 problems : list [JobProblem ] = []
398- dfsas : list [DirectFsAccessInPath ] = []
398+ dfsas : list [DirectFsAccess ] = []
399399 assert job .job_id is not None
400400 assert job .settings is not None
401401 assert job .settings .name is not None
@@ -420,9 +420,9 @@ def _lint_job(self, job: jobs.Job) -> tuple[list[JobProblem], list[DirectFsAcces
420420 end_col = advice .advice .end_col ,
421421 )
422422 problems .append (job_problem )
423- assessment_start = datetime .now ()
424- task_dfsas = self ._collect_task_dfsas (task , job , graph , session_state )
425- assessment_end = datetime .now ()
423+ assessment_start = datetime .now (timezone . utc )
424+ task_dfsas = self ._collect_task_dfsas (job , task , graph , session_state )
425+ assessment_end = datetime .now (timezone . utc )
426426 for dfsa in task_dfsas :
427427 dfsa = dfsa .replace_assessment_infos (assessment_start = assessment_start , assessment_end = assessment_end )
428428 dfsas .append (dfsa )
@@ -462,15 +462,17 @@ def _lint_task(
462462 yield from walker
463463
464464 def _collect_task_dfsas (
465- self , task : jobs .Task , job : jobs .Job , graph : DependencyGraph , session_state : CurrentSessionState
466- ) -> Iterable [DirectFsAccessInPath ]:
467- collector = DfsaCollectorWalker (graph , set (), self ._path_lookup , session_state )
468- assert job .settings is not None # as already done in _lint_job
469- job_name = job .settings .name
470- for dfsa in collector :
471- yield DirectFsAccessInPath (** asdict (dfsa )).replace_job_infos (
472- job_id = job .job_id , job_name = job_name , task_key = task .task_key
473- )
465+ self , job : jobs .Job , task : jobs .Task , graph : DependencyGraph , session_state : CurrentSessionState
466+ ) -> Iterable [DirectFsAccess ]:
467+ # walker doesn't register lineage for job/task
468+ job_id = str (job .job_id )
469+ job_name = job .settings .name if job .settings and job .settings .name else "<anonymous>"
470+ for dfsa in DfsaCollectorWalker (graph , set (), self ._path_lookup , session_state ):
471+ atoms = [
472+ LineageAtom (object_type = "JOB" , object_id = job_id , other = {"name" : job_name }),
473+ LineageAtom (object_type = "TASK" , object_id = task .task_key ),
474+ ]
475+ yield dataclasses .replace (dfsa , source_lineage = atoms + dfsa .source_lineage )
474476
475477
476478class LintingWalker (DependencyGraphWalker [LocatedAdvice ]):
0 commit comments