2121
2222import os
2323import re
24+ import json
2425import logging
2526from pathlib import Path
2627from typing import Any , Dict , List , Optional , Callable , Tuple , Union
3435logger = logging .getLogger (__name__ )
3536
3637
38+ def _parse_json_output (output : Any , step_name : str = "step" ) -> Any :
39+ """
40+ Parse JSON from LLM output if it's a string.
41+
42+ Handles:
43+ - Direct JSON strings: '{"key": "value"}'
44+ - Markdown code blocks: ```json\n {"key": "value"}\n ```
45+
46+ Returns:
47+ Parsed dict/list if successful, original output otherwise
48+ """
49+ if not isinstance (output , str ) or not output :
50+ return output
51+
52+ # Try direct JSON parse first
53+ try :
54+ return json .loads (output )
55+ except json .JSONDecodeError :
56+ pass
57+
58+ # Try extracting from markdown code block
59+ json_match = re .search (r'```(?:json)?\s*\n?([\s\S]*?)\n?```' , output )
60+ if json_match :
61+ try :
62+ return json .loads (json_match .group (1 ).strip ())
63+ except json .JSONDecodeError :
64+ pass
65+
66+ # Try finding JSON object/array in text
67+ # Look for {...} or [...]
68+ for pattern in [r'(\{[^}]+\})' , r'(\[[^\]]+\])' ]:
69+ match = re .search (pattern , output )
70+ if match :
71+ try :
72+ return json .loads (match .group (1 ))
73+ except json .JSONDecodeError :
74+ continue
75+
76+ # Return original if can't parse
77+ logger .debug (f"Could not parse JSON from step '{ step_name } ' output" )
78+ return output
79+
80+
3781@dataclass
3882class WorkflowContext :
3983 """Context passed to step handlers. Contains all information about the current workflow state."""
@@ -1206,6 +1250,10 @@ def run(
12061250
12071251 output = step .agent .chat (action , ** chat_kwargs )
12081252
1253+ # Parse JSON output if output_json was requested and output is a string
1254+ if step_output_json and output and isinstance (output , str ):
1255+ output = _parse_json_output (output , step .name )
1256+
12091257 # Handle output_pydantic if present
12101258 output_pydantic = getattr (step , 'output_pydantic' , None )
12111259 if output_pydantic and output :
@@ -1257,6 +1305,11 @@ def run(
12571305
12581306 output = temp_agent .chat (action , stream = stream )
12591307
1308+ # Parse JSON output if output_json was requested
1309+ step_output_json = getattr (step , '_output_json' , None )
1310+ if step_output_json and output and isinstance (output , str ):
1311+ output = _parse_json_output (output , step .name )
1312+
12601313 except Exception as e :
12611314 step_error = e
12621315 output = f"Error: { e } "
@@ -1342,6 +1395,26 @@ def run(
13421395 var_name = step .output_variable or f"{ step .name } _output"
13431396 all_variables [var_name ] = output
13441397
1398+ # Validate output and warn about issues
1399+ if output is None :
1400+ logger .warning (f"⚠️ Step '{ step .name } ': Output is None. Agent may not have returned expected format." )
1401+ if verbose :
1402+ print (f"⚠️ WARNING: Step '{ step .name } ' output is None!" )
1403+ else :
1404+ # Check type against output_json schema if defined
1405+ expected_schema = getattr (step , '_output_json' , None )
1406+ if expected_schema and isinstance (expected_schema , dict ):
1407+ expected_type = expected_schema .get ('type' )
1408+ actual_type = type (output ).__name__
1409+ if expected_type == 'object' and not isinstance (output , dict ):
1410+ logger .warning (f"⚠️ Step '{ step .name } ': Expected object/dict, got { actual_type } " )
1411+ if verbose :
1412+ print (f"⚠️ Step '{ step .name } ': Expected 'object', received '{ actual_type } '" )
1413+ elif expected_type == 'array' and not isinstance (output , list ):
1414+ logger .warning (f"⚠️ Step '{ step .name } ': Expected array/list, got { actual_type } " )
1415+ if verbose :
1416+ print (f"⚠️ Step '{ step .name } ': Expected 'array', received '{ actual_type } '" )
1417+
13451418 i += 1
13461419
13471420 # Update workflow status
@@ -1399,33 +1472,12 @@ async def arun(
13991472 return await self .astart (input , llm , verbose )
14001473
14011474 def _normalize_steps (self ) -> List ['WorkflowStep' ]:
1402- """Convert mixed steps (Agent, function, WorkflowStep) to WorkflowStep list."""
1403- normalized = []
1404-
1405- for i , step in enumerate (self .steps ):
1406- if isinstance (step , WorkflowStep ):
1407- normalized .append (step )
1408- elif callable (step ):
1409- # It's a function - wrap as handler
1410- normalized .append (WorkflowStep (
1411- name = getattr (step , '__name__' , f'step_{ i + 1 } ' ),
1412- handler = step
1413- ))
1414- elif hasattr (step , 'chat' ):
1415- # It's an Agent - wrap with agent reference
1416- normalized .append (WorkflowStep (
1417- name = getattr (step , 'name' , f'agent_{ i + 1 } ' ),
1418- agent = step ,
1419- action = "{{input}}"
1420- ))
1421- else :
1422- # Unknown type - try to use as string action
1423- normalized .append (WorkflowStep (
1424- name = f'step_{ i + 1 } ' ,
1425- action = str (step )
1426- ))
1475+ """Convert mixed steps (Agent, function, WorkflowStep) to WorkflowStep list.
14271476
1428- return normalized
1477+ This method uses _normalize_single_step to ensure consistent normalization
1478+ and avoid duplicated code paths (DRY principle).
1479+ """
1480+ return [self ._normalize_single_step (step , i ) for i , step in enumerate (self .steps )]
14291481
14301482 def _create_plan (self , input : str , model : str , verbose : bool ) -> Optional [str ]:
14311483 """Create an execution plan for the workflow using LLM.
@@ -1608,6 +1660,11 @@ def _execute_single_step_internal(
16081660 action = f"{ action } \n \n Context from previous step:\n { previous_output } "
16091661 action = action .replace ("{{input}}" , input )
16101662 output = normalized .agent .chat (action , stream = stream )
1663+
1664+ # Parse JSON output if output_json was requested
1665+ step_output_json = getattr (normalized , '_output_json' , None )
1666+ if step_output_json and output and isinstance (output , str ):
1667+ output = _parse_json_output (output , normalized .name )
16111668 except Exception as e :
16121669 output = f"Error: { e } "
16131670 elif normalized .action :
@@ -1633,6 +1690,11 @@ def _execute_single_step_internal(
16331690 action = f"{ action } \n \n Context from previous step:\n { previous_output } "
16341691
16351692 output = temp_agent .chat (action , stream = stream )
1693+
1694+ # Parse JSON output if output_json was requested
1695+ step_output_json = getattr (normalized , '_output_json' , None )
1696+ if step_output_json and output and isinstance (output , str ):
1697+ output = _parse_json_output (output , normalized .name )
16361698 except Exception as e :
16371699 output = f"Error: { e } "
16381700
@@ -1850,6 +1912,32 @@ def execute_item(idx_item_tuple):
18501912 all_variables ["loop_outputs" ] = outputs # Also keep for backward compatibility
18511913 combined_output = "\n " .join (str (o ) for o in outputs ) if outputs else ""
18521914
1915+ # Validate outputs and warn about issues
1916+ none_count = sum (1 for o in outputs if o is None )
1917+ if none_count > 0 :
1918+ logger .warning (f"⚠️ Loop '{ output_var_name } ': { none_count } /{ len (outputs )} outputs are None. "
1919+ f"Check if agent returned expected format." )
1920+ if verbose :
1921+ print (f"⚠️ WARNING: { none_count } /{ len (outputs )} loop outputs are None!" )
1922+
1923+ # Check for type consistency
1924+ if outputs and len (outputs ) > 0 :
1925+ expected_type = loop_step .step ._output_json if hasattr (loop_step .step , '_output_json' ) else None
1926+ if expected_type :
1927+ expected_schema_type = expected_type .get ('type' ) if isinstance (expected_type , dict ) else None
1928+ for i , o in enumerate (outputs ):
1929+ if o is None :
1930+ continue
1931+ actual_type = type (o ).__name__
1932+ if expected_schema_type == 'object' and not isinstance (o , dict ):
1933+ logger .warning (f"⚠️ Loop output[{ i } ]: Expected object/dict, got { actual_type } " )
1934+ if verbose :
1935+ print (f"⚠️ Loop output[{ i } ]: Expected 'object', received '{ actual_type } '" )
1936+ elif expected_schema_type == 'array' and not isinstance (o , list ):
1937+ logger .warning (f"⚠️ Loop output[{ i } ]: Expected array/list, got { actual_type } " )
1938+ if verbose :
1939+ print (f"⚠️ Loop output[{ i } ]: Expected 'array', received '{ actual_type } '" )
1940+
18531941 # Debug logging for output_variable
18541942 if verbose :
18551943 print (f"📦 Loop stored { len (outputs )} results in variable: '{ output_var_name } '" )
0 commit comments