2929 ForLoopHandler ,
3030 WhileLoopHandler ,
3131 DynamicFlowHandler ,
32- ControlFlowAutoResolver
32+ ControlFlowAutoResolver ,
3333)
3434from .engine .control_flow_engine import ControlFlowEngine
3535
@@ -85,7 +85,7 @@ def init_models(config_path: str = None) -> ModelRegistry:
8585 from .utils .api_keys import load_api_keys
8686
8787 print (">> Initializing model pool..." )
88-
88+
8989 # Load API keys first - this will raise an error if keys are missing
9090 load_api_keys ()
9191
@@ -106,22 +106,22 @@ def init_models(config_path: str = None) -> ModelRegistry:
106106 if not isinstance (models_config , list ):
107107 print (">> ⚠️ Invalid models configuration format - expected list" )
108108 models_config = []
109-
109+
110110 # Process each model
111111 for model_config in models_config :
112112 provider = model_config .get ("source" )
113113 name = model_config .get ("name" )
114-
114+
115115 # Parse size
116116 size_str = str (model_config .get ("size" , "1b" ))
117- if size_str .endswith ('b' ):
117+ if size_str .endswith ("b" ):
118118 size_billions = float (size_str [:- 1 ])
119119 else :
120120 size_billions = float (size_str )
121-
121+
122122 # Get expertise
123123 expertise = model_config .get ("expertise" , ["general" ])
124-
124+
125125 if not provider or not name :
126126 continue
127127
@@ -177,9 +177,7 @@ def init_models(config_path: str = None) -> ModelRegistry:
177177 setattr (model , "_expertise" , expertise )
178178 setattr (model , "_size_billions" , size_billions )
179179 _model_registry .register_model (model )
180- print (
181- f">> ✅ Registered OpenAI model: { name } ({ size_billions } B params)"
182- )
180+ print (f">> ✅ Registered OpenAI model: { name } ({ size_billions } B params)" )
183181
184182 elif provider == "anthropic" and os .environ .get ("ANTHROPIC_API_KEY" ):
185183 # Only register if API key is available
@@ -188,20 +186,18 @@ def init_models(config_path: str = None) -> ModelRegistry:
188186 setattr (model , "_expertise" , expertise )
189187 setattr (model , "_size_billions" , size_billions )
190188 _model_registry .register_model (model )
191- print (
192- f">> ✅ Registered Anthropic model: { name } ({ size_billions } B params)"
193- )
189+ print (f">> ✅ Registered Anthropic model: { name } ({ size_billions } B params)" )
194190
195- elif provider == "google" and (os .environ .get ("GOOGLE_AI_API_KEY" ) or os .environ .get ("GOOGLE_API_KEY" )):
191+ elif provider == "google" and (
192+ os .environ .get ("GOOGLE_AI_API_KEY" ) or os .environ .get ("GOOGLE_API_KEY" )
193+ ):
196194 # Only register if API key is available
197195 model = GoogleModel (model_name = name )
198196 # Add dynamic attributes for model selection
199197 setattr (model , "_expertise" , expertise )
200198 setattr (model , "_size_billions" , size_billions )
201199 _model_registry .register_model (model )
202- print (
203- f">> ✅ Registered Google model: { name } ({ size_billions } B params)"
204- )
200+ print (f">> ✅ Registered Google model: { name } ({ size_billions } B params)" )
205201
206202 except Exception as e :
207203 print (f">> ⚠️ Error registering { provider } model { name } : { e } " )
@@ -215,7 +211,7 @@ def init_models(config_path: str = None) -> ModelRegistry:
215211
216212 # Store defaults in registry for later use
217213 setattr (_model_registry , "_defaults" , config .get ("defaults" , {}))
218-
214+
219215 # Enable auto-registration for new models
220216 _model_registry .enable_auto_registration ()
221217 print (">> Auto-registration enabled for new models" )
@@ -226,9 +222,7 @@ def init_models(config_path: str = None) -> ModelRegistry:
226222class OrchestratorPipeline :
227223 """Wrapper for compiled pipeline that can be called with keyword arguments."""
228224
229- def __init__ (
230- self , pipeline : Pipeline , compiler : YAMLCompiler , orchestrator : Orchestrator
231- ):
225+ def __init__ (self , pipeline : Pipeline , compiler : YAMLCompiler , orchestrator : Orchestrator ):
232226 self .pipeline = pipeline
233227 self .compiler = compiler
234228 self .orchestrator = orchestrator
@@ -275,10 +269,10 @@ def run(self, **kwargs: Any) -> Any:
275269 # Run pipeline asynchronously
276270 try :
277271 # Check if there's an event loop running
278- loop = asyncio .get_running_loop ()
272+ asyncio .get_running_loop ()
279273 # We're in an async context but called from sync code
280274 # This is not ideal but we need to handle it
281- future = asyncio .ensure_future (self ._run_async (** kwargs ))
275+ asyncio .ensure_future (self ._run_async (** kwargs ))
282276 # Can't await here since this is a sync method
283277 raise RuntimeError (
284278 "Cannot call synchronous run() method from within an async context. "
@@ -294,7 +288,7 @@ def run(self, **kwargs: Any) -> Any:
294288 async def run_async (self , ** kwargs : Any ) -> Any :
295289 """Run the pipeline asynchronously with given keyword arguments."""
296290 return await self ._run_async (** kwargs )
297-
291+
298292 async def _run_async (self , ** kwargs ):
299293 """Async pipeline execution."""
300294 # Validate required inputs
@@ -307,9 +301,7 @@ async def _run_async(self, **kwargs):
307301 context = {"inputs" : kwargs , "outputs" : outputs }
308302
309303 # Apply runtime template resolution to pipeline tasks
310- resolved_pipeline = await self ._resolve_runtime_templates (
311- self .pipeline , context
312- )
304+ resolved_pipeline = await self ._resolve_runtime_templates (self .pipeline , context )
313305
314306 # Execute pipeline
315307 # Set the resolved pipeline's context
@@ -346,24 +338,20 @@ async def _resolve_outputs(self, inputs):
346338 if value .startswith ("<AUTO>" ) and value .endswith ("</AUTO>" ):
347339 # Resolve AUTO tag
348340 auto_content = value [6 :- 7 ] # Remove <AUTO> tags
349- if hasattr (
350- self . orchestrator . yaml_compiler , "ambiguity_resolver"
351- ):
352- resolved = await self . orchestrator . yaml_compiler . ambiguity_resolver . resolve (
353- auto_content , f"outputs. { name } "
341+ if hasattr (self . orchestrator . yaml_compiler , "ambiguity_resolver" ):
342+ resolved = (
343+ await self . orchestrator . yaml_compiler . ambiguity_resolver . resolve (
344+ auto_content , f"outputs. { name } "
345+ )
354346 )
355347 outputs [name ] = resolved
356348 else :
357- outputs [name ] = (
358- f"report_{ inputs .get ('topic' , 'research' )} .pdf"
359- )
349+ outputs [name ] = f"report_{ inputs .get ('topic' , 'research' )} .pdf"
360350 else :
361351 # Regular template - render with current context
362352 try :
363353 template = Template (value )
364- outputs [name ] = template .render (
365- inputs = inputs , outputs = outputs
366- )
354+ outputs [name ] = template .render (inputs = inputs , outputs = outputs )
367355 except Exception :
368356 outputs [name ] = value
369357 else :
@@ -389,9 +377,7 @@ async def _resolve_runtime_templates(
389377 # Resolve templates in each task
390378 for task_id , task in resolved_pipeline .tasks .items ():
391379 if hasattr (task , "parameters" ):
392- task .parameters = await self ._resolve_task_templates (
393- task .parameters , context
394- )
380+ task .parameters = await self ._resolve_task_templates (task .parameters , context )
395381
396382 return resolved_pipeline
397383
@@ -409,10 +395,7 @@ async def _resolve_task_templates(self, obj, context):
409395 return obj
410396 return obj
411397 elif isinstance (obj , dict ):
412- return {
413- k : await self ._resolve_task_templates (v , context )
414- for k , v in obj .items ()
415- }
398+ return {k : await self ._resolve_task_templates (v , context ) for k , v in obj .items ()}
416399 elif isinstance (obj , list ):
417400 return [await self ._resolve_task_templates (item , context ) for item in obj ]
418401 else :
@@ -435,12 +418,9 @@ async def compile_async(yaml_path: str) -> "OrchestratorPipeline":
435418 raise RuntimeError (
436419 "No models available. Run init_models() first or ensure API keys are set."
437420 )
438-
421+
439422 control_system = HybridControlSystem (_model_registry )
440- _orchestrator = Orchestrator (
441- model_registry = _model_registry ,
442- control_system = control_system
443- )
423+ _orchestrator = Orchestrator (model_registry = _model_registry , control_system = control_system )
444424
445425 # Set up model for ambiguity resolution
446426 model_keys = _model_registry .list_models () if _model_registry else []
@@ -490,9 +470,6 @@ def compile(yaml_path: str) -> "OrchestratorPipeline":
490470 # Check if we're already in an event loop
491471 try :
492472 asyncio .get_running_loop ()
493- # We're in an async context, need to run in a new thread or return a coroutine
494- import concurrent .futures
495-
496473 # We're in an async context but called from sync code
497474 raise RuntimeError (
498475 "Cannot call synchronous compile_yaml() from within an async context. "
0 commit comments