99logger = get_logger (__name__ )
1010
1111
12+ def _label_for_schedule (schedule_obj ) -> str :
13+ """
14+ Build a robust label for the schedule table row.
15+ Prefer cron humanization when possible, but never fail the row.
16+ """
17+ try :
18+ import yaml
19+ label_core = yaml .safe_dump (schedule_obj ).strip ()
20+ except Exception :
21+ import json
22+
23+ label_core = f"{ schedule_obj !r} " if isinstance (schedule_obj , str ) else json .dumps (schedule_obj )
24+
25+ # Try to humanize cron, but never make this fatal
26+ try :
27+ if isinstance (schedule_obj , dict ) and "cron>" in schedule_obj :
28+ from cron_descriptor import get_description
29+
30+ return f"{ label_core } \n { get_description (schedule_obj ['cron>' ])} "
31+ except Exception as e :
32+ logger .warning (f"cron description failed: { e } " )
33+
34+ return f"schedule\n { label_core } "
35+
36+
1237def main () -> None :
1338 start_time = time .time ()
1439 count = 0
1540 Path (GRAPHS_DIR ).mkdir (exist_ok = True )
1641 schedule_entries : list [ScheduleEntry ] = []
1742
18- for path in Path (os .getcwd ()).rglob ("*.dig" ):
19- if GRAPHS_DIR in str (path ):
20- continue
43+ # Discover .dig files
44+ dig_files = [p for p in Path (os .getcwd ()).rglob ("*.dig" ) if GRAPHS_DIR not in str (p )]
45+ logger .info (f"Found { len (dig_files )} .dig files" )
46+
47+ for path in dig_files :
2148 input_file_path = path
2249 out_dir = Path (os .getcwd ()) / GRAPHS_DIR / path .parent .name
2350 out_dir .mkdir (parents = True , exist_ok = True )
@@ -26,41 +53,35 @@ def main() -> None:
2653 print (f"Generating graph for { input_file_path } → { output_dot_file } " )
2754 try :
2855 generate_graph (input_filepath = str (input_file_path ), output_dot_file = output_dot_file )
29- # collect schedule entries (best-effort lightweight parse)
30- try :
31- import yaml
32- import json
33- from cron_descriptor import get_description
34- from .yaml_includes import DigLoader , resolve_includes
35-
36- with open (input_file_path , encoding = "utf-8" ) as f :
37- data_raw = yaml .load (f , Loader = DigLoader )
38- data = resolve_includes (data_raw ) or {}
39- if "schedule" in data :
40- if isinstance (data ["schedule" ], dict ) and "cron>" in data ["schedule" ]:
41- label = (
42- f"{ yaml .safe_dump (data ['schedule' ]).strip ()} "
43- f"\n { get_description (data ['schedule' ]['cron>' ])} "
44- )
45- else :
46- label = f"schedule\n { json .dumps (data ['schedule' ])} "
47- schedule_entries .append (
48- ScheduleEntry (
49- project = path .parent .name ,
50- workflow = path .name ,
51- schedule_text = label ,
52- href = f"./{ GRAPHS_DIR } /{ path .parent .name } /{ path .name .replace ('.dig' ,'.html' )} " ,
53- )
54- )
55- except Exception :
56- # don't let schedule parsing failures break the whole run
57- pass
5856 count += 1
5957 logger .info (f"COMPLETE generating graph for { input_file_path } " )
6058 except Exception as e :
6159 logger .error (f"FAILED generating graph for { input_file_path } : { e } " , exc_info = True )
60+ # continue on other files
61+ continue
62+
63+ # Collect schedule entry (robust, never fatal)
64+ try :
65+ import yaml
66+ from .yaml_includes import DigLoader , resolve_includes
67+
68+ with open (input_file_path , encoding = "utf-8" ) as f :
69+ data_raw = yaml .load (f , Loader = DigLoader )
70+ data = resolve_includes (data_raw ) or {}
71+ if "schedule" in data :
72+ label = _label_for_schedule (data ["schedule" ])
73+ schedule_entries .append (
74+ ScheduleEntry (
75+ project = path .parent .name ,
76+ workflow = path .name ,
77+ schedule_text = label ,
78+ href = f"./{ GRAPHS_DIR } /{ path .parent .name } /{ path .name .replace ('.dig' ,'.html' )} " ,
79+ )
80+ )
81+ except Exception as e :
82+ logger .warning (f"Schedule collection failed for { input_file_path } : { e } " )
6283
63- # Always write the index, even if there were zero .dig files
84+ # Always write the index
6485 write_scheduled_workflows (schedule_entries , out_path = SCHEDULE_INDEX_FILE )
6586
6687 elapsed = time .time () - start_time
0 commit comments