@@ -16,6 +16,7 @@ class LoopItems(BaseModel):
1616
1717class Process :
1818 DEFAULT_RETRY_LIMIT = 3 # Predefined retry limit in a common place
19+ VALIDATION_FAILURE_DECISIONS = ["invalid" , "retry" , "failed" , "error" , "unsuccessful" , "fail" , "errors" , "reject" , "rejected" , "incomplete" ] # Decision strings that trigger validation feedback
1920
2021 def __init__ (self , tasks : Dict [str , Task ], agents : List [Agent ], manager_llm : Optional [str ] = None , verbose : bool = False , max_iter : int = 10 ):
2122 logging .debug (f"=== Initializing Process ===" )
@@ -33,12 +34,38 @@ def __init__(self, tasks: Dict[str, Task], agents: List[Agent], manager_llm: Opt
3334 self .task_retry_counter : Dict [str , int ] = {} # Initialize retry counter
3435 self .workflow_finished = False # ADDED: Workflow finished flag
3536
37+ def _create_loop_subtasks (self , loop_task : Task ):
38+ """Create subtasks for a loop task from input file."""
39+ logging .warning (f"_create_loop_subtasks called for { loop_task .name } but method not fully implemented" )
40+ # TODO: Implement loop subtask creation from input file
41+ # This should read loop_task.input_file and create subtasks
42+ pass
43+
3644 def _build_task_context (self , current_task : Task ) -> str :
3745 """Build context for a task based on its retain_full_context setting"""
38- if not (current_task .previous_tasks or current_task .context ):
39- return ""
46+ # Check if we have validation feedback to include
47+ if current_task .validation_feedback :
48+ feedback = current_task .validation_feedback
49+ context = f"\n Previous attempt failed validation with reason: { feedback ['validation_response' ]} "
50+ if feedback .get ('validated_task' ):
51+ context += f"\n Validated task: { feedback ['validated_task' ]} "
52+ if feedback .get ('validation_details' ):
53+ context += f"\n Validation feedback: { feedback ['validation_details' ]} "
54+ if feedback .get ('rejected_output' ):
55+ context += f"\n Rejected output: { feedback ['rejected_output' ]} "
56+ context += "\n Please try again with a different approach based on this feedback.\n "
57+ # Clear the feedback after including it to prevent it from persisting
58+ current_task .validation_feedback = None
4059
41- context = "\n Input data from previous tasks:"
60+ # If we have validation feedback but no previous tasks context, return just the feedback
61+ if not (current_task .previous_tasks or current_task .context ):
62+ return context
63+ # Otherwise, append the regular context
64+ context += "\n Input data from previous tasks:"
65+ elif not (current_task .previous_tasks or current_task .context ):
66+ return ""
67+ else :
68+ context = "\n Input data from previous tasks:"
4269
4370 if current_task .retain_full_context :
4471 # Original behavior: include all previous tasks
@@ -496,6 +523,35 @@ async def aworkflow(self) -> AsyncGenerator[str, None]:
496523 next_task = next ((t for t in self .tasks .values () if t .name == task_value ), None )
497524 if next_task :
498525 next_task .status = "not started" # Reset status to allow execution
526+
527+ # Capture validation feedback for retry scenarios
528+ if decision_str in Process .VALIDATION_FAILURE_DECISIONS :
529+ if current_task and current_task .result :
530+ # Get the rejected output from the task that was validated
531+ validated_task = None
532+ # Find the task that produced the output being validated
533+ if current_task .previous_tasks :
534+ # For validation tasks, typically validate the most recent previous task
535+ prev_task_name = current_task .previous_tasks [- 1 ]
536+ validated_task = next ((t for t in self .tasks .values () if t .name == prev_task_name ), None )
537+ elif current_task .context :
538+ # If no previous_tasks, check context for the validated task
539+ # Use the most recent task with a result from context
540+ for ctx_task in reversed (current_task .context ):
541+ if ctx_task .result and ctx_task .name != current_task .name :
542+ validated_task = ctx_task
543+ break
544+
545+ feedback = {
546+ 'validation_response' : decision_str ,
547+ 'validation_details' : current_task .result .raw ,
548+ 'rejected_output' : validated_task .result .raw if validated_task and validated_task .result else None ,
549+ 'validator_task' : current_task .name ,
550+ 'validated_task' : validated_task .name if validated_task else None
551+ }
552+ next_task .validation_feedback = feedback
553+ logging .debug (f"Added validation feedback to { next_task .name } : { feedback ['validation_response' ]} (validated task: { feedback .get ('validated_task' , 'None' )} )" )
554+
499555 logging .debug (f"Routing to { next_task .name } based on decision: { decision_str } " )
500556 # Don't mark workflow as finished when following condition path
501557 self .workflow_finished = False
@@ -1098,6 +1154,35 @@ def workflow(self):
10981154 next_task = next ((t for t in self .tasks .values () if t .name == task_value ), None )
10991155 if next_task :
11001156 next_task .status = "not started" # Reset status to allow execution
1157+
1158+ # Capture validation feedback for retry scenarios
1159+ if decision_str in Process .VALIDATION_FAILURE_DECISIONS :
1160+ if current_task and current_task .result :
1161+ # Get the rejected output from the task that was validated
1162+ validated_task = None
1163+ # Find the task that produced the output being validated
1164+ if current_task .previous_tasks :
1165+ # For validation tasks, typically validate the most recent previous task
1166+ prev_task_name = current_task .previous_tasks [- 1 ]
1167+ validated_task = next ((t for t in self .tasks .values () if t .name == prev_task_name ), None )
1168+ elif current_task .context :
1169+ # If no previous_tasks, check context for the validated task
1170+ # Use the most recent task with a result from context
1171+ for ctx_task in reversed (current_task .context ):
1172+ if ctx_task .result and ctx_task .name != current_task .name :
1173+ validated_task = ctx_task
1174+ break
1175+
1176+ feedback = {
1177+ 'validation_response' : decision_str ,
1178+ 'validation_details' : current_task .result .raw ,
1179+ 'rejected_output' : validated_task .result .raw if validated_task and validated_task .result else None ,
1180+ 'validator_task' : current_task .name ,
1181+ 'validated_task' : validated_task .name if validated_task else None
1182+ }
1183+ next_task .validation_feedback = feedback
1184+ logging .debug (f"Added validation feedback to { next_task .name } : { feedback ['validation_response' ]} (validated task: { feedback .get ('validated_task' , 'None' )} )" )
1185+
11011186 logging .debug (f"Routing to { next_task .name } based on decision: { decision_str } " )
11021187 # Don't mark workflow as finished when following condition path
11031188 self .workflow_finished = False
0 commit comments