77
88import json
99import os
10- import re
11- from typing import Dict , Any , Set
1210from pathlib import Path
11+ from typing import Any , Dict , Set
1312
1413
1514class SchemaConsolidator :
@@ -24,7 +23,7 @@ def load_schema(self, filename: str) -> Dict[str, Any]:
2423 if filename not in self .schemas :
2524 file_path = self .schema_dir / filename
2625 try :
27- with open (file_path , 'r' ) as f :
26+ with open (file_path , "r" ) as f :
2827 self .schemas [filename ] = json .load (f )
2928 print (f"Loaded schema: { filename } " )
3029 except FileNotFoundError :
@@ -34,26 +33,30 @@ def load_schema(self, filename: str) -> Dict[str, Any]:
3433
3534 def extract_ref_filename (self , ref : str ) -> str :
3635 """Extract filename from $ref string."""
37- # Handle refs like "secrets.json#/secrets" or "common.json#/definitions/metadata"
38- if '#' in ref :
39- return ref .split ('#' )[0 ]
36+ # Handle refs like "secrets.json#/secrets" or
37+ # "common.json#/definitions/metadata"
38+ if "#" in ref :
39+ return ref .split ("#" )[0 ]
4040 return ref
4141
4242 def extract_ref_path (self , ref : str ) -> str :
4343 """Extract JSON path from $ref string."""
44- # Handle refs like "secrets.json#/secrets" or "common.json#/definitions/metadata"
45- if '#' in ref :
46- return ref .split ('#' )[1 ]
44+ # Handle refs like "secrets.json#/secrets" or
45+ # "common.json#/definitions/metadata"
46+ if "#" in ref :
47+ return ref .split ("#" )[1 ]
4748 return ""
4849
49- def resolve_external_refs (self , schema : Dict [str , Any ], current_file : str = "" ) -> Dict [str , Any ]:
50+ def resolve_external_refs (
51+ self , schema : Dict [str , Any ], current_file : str = ""
52+ ) -> Dict [str , Any ]:
5053 """Recursively resolve external $ref references."""
5154 if isinstance (schema , dict ):
52- if ' $ref' in schema :
53- ref = schema [' $ref' ]
55+ if " $ref" in schema :
56+ ref = schema [" $ref" ]
5457
5558 # Skip internal references (starting with #)
56- if ref .startswith ('#' ):
59+ if ref .startswith ("#" ):
5760 return schema
5861
5962 # Extract filename and path
@@ -68,12 +71,15 @@ def resolve_external_refs(self, schema: Dict[str, Any], current_file: str = "")
6871 # Navigate to the specific path in the referenced schema
6972 target = ref_schema
7073 if ref_path :
71- path_parts = ref_path .strip ('/' ).split ('/' )
74+ path_parts = ref_path .strip ("/" ).split ("/" )
7275 for part in path_parts :
7376 if part in target :
7477 target = target [part ]
7578 else :
76- print (f"Warning: Path { ref_path } not found in { ref_file } " )
79+ print (
80+ f"Warning: Path { ref_path } not found in "
81+ f"{ ref_file } "
82+ )
7783 return schema
7884
7985 # Recursively resolve refs in the target
@@ -100,25 +106,31 @@ def resolve_external_refs(self, schema: Dict[str, Any], current_file: str = "")
100106
101107 def process_schema_definitions (self , schema : Dict [str , Any ], filename : str ):
102108 """Extract and merge definitions from a schema into consolidated definitions."""
103- if ' definitions' in schema :
104- for def_name , def_value in schema [' definitions' ].items ():
109+ if " definitions" in schema :
110+ for def_name , def_value in schema [" definitions" ].items ():
105111 # Create unique key to avoid conflicts
106112 unique_key = f"{ filename .replace ('.json' , '' )} _{ def_name } "
107- self .consolidated_definitions [unique_key ] = self .resolve_external_refs (def_value , filename )
113+ self .consolidated_definitions [unique_key ] = self .resolve_external_refs (
114+ def_value , filename
115+ )
108116 print (f"Added definition: { unique_key } " )
109117
110- def update_internal_refs (self , schema : Dict [str , Any ], filename : str ) -> Dict [str , Any ]:
118+ def update_internal_refs (
119+ self , schema : Dict [str , Any ], filename : str
120+ ) -> Dict [str , Any ]:
111121 """Update internal references to point to consolidated definitions."""
112122 if isinstance (schema , dict ):
113- if ' $ref' in schema :
114- ref = schema [' $ref' ]
123+ if " $ref" in schema :
124+ ref = schema [" $ref" ]
115125
116126 # Handle internal references
117- if ref .startswith (' #/definitions/' ):
118- def_name = ref .replace (' #/definitions/' , '' )
127+ if ref .startswith (" #/definitions/" ):
128+ def_name = ref .replace (" #/definitions/" , "" )
119129 # Update to use the prefixed definition name
120- new_ref = f"#/definitions/{ filename .replace ('.json' , '' )} _{ def_name } "
121- return {'$ref' : new_ref }
130+ new_ref = (
131+ f"#/definitions/{ filename .replace ('.json' , '' )} _{ def_name } "
132+ )
133+ return {"$ref" : new_ref }
122134
123135 return schema
124136 else :
@@ -146,7 +158,7 @@ def consolidate(self, main_schema_file: str = "workflow.json") -> Dict[str, Any]
146158 # First pass: collect all definitions from all referenced schemas
147159 print ("\n --- Collecting definitions from all schemas ---" )
148160 for filename in os .listdir (self .schema_dir ):
149- if filename .endswith (' .json' ):
161+ if filename .endswith (" .json" ):
150162 schema = self .load_schema (filename )
151163 self .process_schema_definitions (schema , filename )
152164
@@ -159,20 +171,22 @@ def consolidate(self, main_schema_file: str = "workflow.json") -> Dict[str, Any]
159171 final_schema = self .update_internal_refs (resolved_main , main_schema_file )
160172
161173 # Add all consolidated definitions
162- if ' definitions' not in final_schema :
163- final_schema [' definitions' ] = {}
174+ if " definitions" not in final_schema :
175+ final_schema [" definitions" ] = {}
164176
165- final_schema [' definitions' ].update (self .consolidated_definitions )
177+ final_schema [" definitions" ].update (self .consolidated_definitions )
166178
167- print (f "\n --- Consolidation complete ---" )
179+ print ("\n --- Consolidation complete ---" )
168180 print (f"Total definitions consolidated: { len (self .consolidated_definitions )} " )
169181
170182 return final_schema
171183
172- def save_consolidated_schema (self , consolidated_schema : Dict [str , Any ], output_file : str ):
184+ def save_consolidated_schema (
185+ self , consolidated_schema : Dict [str , Any ], output_file : str
186+ ):
173187 """Save the consolidated schema to a file."""
174188 output_path = output_file
175- with open (output_path , 'w' ) as f :
189+ with open (output_path , "w" ) as f :
176190 json .dump (consolidated_schema , f , indent = 2 )
177191 print (f"Consolidated schema saved to: { output_path } " )
178192
@@ -191,7 +205,9 @@ def main():
191205 consolidated = consolidator .consolidate ("workflow.json" )
192206
193207 # Save the result
194- consolidator .save_consolidated_schema (consolidated , "consolidated_workflow_schema.json" )
208+ consolidator .save_consolidated_schema (
209+ consolidated , "consolidated_workflow_schema.json"
210+ )
195211 print ("\n Consolidation completed successfully!" )
196212 except Exception as e :
197213 print (f"Error during consolidation: { e } " )
0 commit comments