11import collections
2+ import logging
3+ from collections .abc import Iterable
24from dataclasses import replace
3- from functools import cached_property
45
56from databricks .labs .lsql .backends import SqlBackend
67
78from databricks .labs .ucx .assessment .jobs import JobInfo , JobOwnership
9+ from databricks .labs .ucx .framework .utils import escape_sql_identifier
810from databricks .labs .ucx .progress .history import ProgressEncoder
911from databricks .labs .ucx .progress .install import Historical
1012from databricks .labs .ucx .source_code .directfs_access import DirectFsAccessCrawler
1113from databricks .labs .ucx .source_code .jobs import JobProblem
1214
1315
16+ logger = logging .getLogger (__name__ )
17+ JobIdToFailuresType = dict [str , list [str ]] # dict[<job id>, list[<failure message>]]
18+
19+
1420class JobsProgressEncoder (ProgressEncoder [JobInfo ]):
1521 """Encoder class:Job to class:History."""
1622
@@ -37,8 +43,18 @@ def __init__(
3743 self ._direct_fs_access_crawlers = direct_fs_access_crawlers
3844 self ._inventory_database = inventory_database
3945
40- @cached_property
41- def _job_problems (self ) -> dict [int , list [str ]]:
46+ def append_inventory_snapshot (self , snapshot : Iterable [JobInfo ]) -> None :
47+ job_problems = self ._get_job_problems ()
48+ dfsas = self ._get_direct_filesystem_accesses ()
49+ history_records = []
50+ for record in snapshot :
51+ history_record = self ._encode_job_as_historical (record , job_problems , dfsas )
52+ history_records .append (history_record )
53+ logger .debug (f"Appending { len (history_records )} { self ._klass } table record(s) to history." )
54+ # The mode is 'append'. This is documented as conflict-free.
55+ self ._sql_backend .save_table (escape_sql_identifier (self .full_name ), history_records , Historical , mode = "append" )
56+
57+ def _get_job_problems (self ) -> JobIdToFailuresType :
4258 index = collections .defaultdict (list )
4359 for row in self ._sql_backend .fetch (
4460 'SELECT * FROM workflow_problems' ,
@@ -47,11 +63,10 @@ def _job_problems(self) -> dict[int, list[str]]:
4763 ):
4864 job_problem = JobProblem (** row .asDict ())
4965 failure = f'{ job_problem .code } : { job_problem .task_key } task: { job_problem .path } : { job_problem .message } '
50- index [job_problem .job_id ].append (failure )
66+ index [str ( job_problem .job_id ) ].append (failure )
5167 return index
5268
53- @cached_property
54- def _direct_fs_accesses (self ) -> dict [str , list [str ]]:
69+ def _get_direct_filesystem_accesses (self ) -> JobIdToFailuresType :
5570 index = collections .defaultdict (list )
5671 for crawler in self ._direct_fs_access_crawlers :
5772 for direct_fs_access in crawler .snapshot ():
@@ -71,7 +86,12 @@ def _direct_fs_accesses(self) -> dict[str, list[str]]:
7186 index [job_id ].append (failure )
7287 return index
7388
74- def _encode_record_as_historical (self , record : JobInfo ) -> Historical :
89+ def _encode_job_as_historical (
90+ self ,
91+ record : JobInfo ,
92+ job_problems : JobIdToFailuresType ,
93+ dfsas : JobIdToFailuresType ,
94+ ) -> Historical :
7595 """Encode a job as a historical records.
7696
7797 Failures are detected by the WorkflowLinter:
@@ -80,6 +100,6 @@ def _encode_record_as_historical(self, record: JobInfo) -> Historical:
80100 """
81101 historical = super ()._encode_record_as_historical (record )
82102 failures = []
83- failures .extend (self . _job_problems . get (int ( record .job_id ) , []))
84- failures .extend (self . _direct_fs_accesses .get (record .job_id , []))
103+ failures .extend (job_problems . get (record .job_id , []))
104+ failures .extend (dfsas .get (record .job_id , []))
85105 return replace (historical , failures = historical .failures + failures )
0 commit comments