Skip to content

Commit b55f5ff

Browse files
authored
Merge branch 'main' into release/v2.5.4
2 parents ef37add + f73d0b6 commit b55f5ff

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+12259
-1031
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""
2+
Task Mode Streaming Events
3+
==========================
4+
5+
Demonstrates how to consume streaming events programmatically in `mode=tasks`.
6+
7+
This example shows how to:
8+
1. Use `stream=True` with `run()` to get an iterator of events
9+
2. Handle task iteration events (started/completed)
10+
3. Handle task state updates
11+
4. Process content deltas as they arrive
12+
"""
13+
14+
from agno.agent import Agent
15+
from agno.models.openai import OpenAIResponses
16+
from agno.run.agent import RunContentEvent as AgentRunContentEvent
17+
from agno.run.team import (
18+
RunContentEvent,
19+
TaskIterationCompletedEvent,
20+
TaskIterationStartedEvent,
21+
TaskStateUpdatedEvent,
22+
TeamRunEvent,
23+
ToolCallCompletedEvent,
24+
ToolCallStartedEvent,
25+
)
26+
from agno.team.mode import TeamMode
27+
from agno.team.team import Team
28+
29+
# ---------------------------------------------------------------------------
30+
# Create Members
31+
# ---------------------------------------------------------------------------
32+
33+
researcher = Agent(
34+
name="Researcher",
35+
role="Researches topics and gathers information",
36+
model=OpenAIResponses(id="gpt-5.1"),
37+
instructions=["Research the given topic thoroughly.", "Provide factual information."],
38+
)
39+
40+
summarizer = Agent(
41+
name="Summarizer",
42+
role="Summarizes information into concise points",
43+
model=OpenAIResponses(id="gpt-5.1"),
44+
instructions=["Create clear, concise summaries.", "Highlight key points."],
45+
)
46+
47+
# ---------------------------------------------------------------------------
48+
# Create Team
49+
# ---------------------------------------------------------------------------
50+
51+
team = Team(
52+
name="Research Team",
53+
mode=TeamMode.tasks,
54+
model=OpenAIResponses(id="gpt-5.1"),
55+
members=[researcher, summarizer],
56+
instructions=[
57+
"You are a research team leader. Follow these steps exactly:",
58+
"1. Create a task for the Researcher to gather information.",
59+
"2. Execute the Researcher's task.",
60+
"3. Create a task for the Summarizer to summarize the research.",
61+
"4. Execute the Summarizer's task.",
62+
"5. Call mark_all_complete with a final summary when all tasks are done.",
63+
],
64+
max_iterations=3,
65+
)
66+
67+
68+
# ---------------------------------------------------------------------------
69+
# Sync streaming with event handling
70+
# ---------------------------------------------------------------------------
71+
def streaming_with_events() -> None:
72+
"""Demonstrates sync streaming with programmatic event handling."""
73+
print("\n--- Sync Streaming with Event Handling ---\n")
74+
75+
# Use stream=True to get an iterator of events
76+
response_stream = team.run(
77+
"What are the key benefits of microservices architecture?",
78+
stream=True,
79+
stream_events=True,
80+
)
81+
82+
for event in response_stream:
83+
# Handle task iteration started - show all fields
84+
if isinstance(event, TaskIterationStartedEvent):
85+
print("\n" + "=" * 60)
86+
print("TASK ITERATION STARTED")
87+
print("=" * 60)
88+
print(f" event: {event.event}")
89+
print(f" iteration: {event.iteration}")
90+
print(f" max_iterations: {event.max_iterations}")
91+
print("=" * 60)
92+
93+
# Handle task iteration completed - show all fields
94+
elif isinstance(event, TaskIterationCompletedEvent):
95+
print("\n" + "=" * 60)
96+
print("TASK ITERATION COMPLETED")
97+
print("=" * 60)
98+
print(f" event: {event.event}")
99+
print(f" iteration: {event.iteration}")
100+
print(f" max_iterations: {event.max_iterations}")
101+
print(f" task_summary: {event.task_summary[:100] if event.task_summary else None}...")
102+
print("=" * 60)
103+
104+
# Handle task state updates - show all fields
105+
elif isinstance(event, TaskStateUpdatedEvent):
106+
print("\n" + "-" * 60)
107+
print("TASK STATE UPDATED")
108+
print("-" * 60)
109+
print(f" event: {event.event}")
110+
print(f" task_summary: {event.task_summary[:100] if event.task_summary else None}...")
111+
print(f" goal_complete: {event.goal_complete}")
112+
print("-" * 60)
113+
114+
# Handle tool call events (shows when tasks are being executed)
115+
elif isinstance(event, ToolCallStartedEvent):
116+
if event.tool and event.tool.tool_name:
117+
print(f"\n[Tool: {event.tool.tool_name}]", end="")
118+
119+
elif isinstance(event, ToolCallCompletedEvent):
120+
pass # Tool completed
121+
122+
# Handle member agent content streaming
123+
elif isinstance(event, AgentRunContentEvent):
124+
if event.content:
125+
print(event.content, end="", flush=True)
126+
127+
# Handle team content deltas
128+
elif isinstance(event, RunContentEvent):
129+
if event.content:
130+
print(event.content, end="", flush=True)
131+
132+
# Handle other events by their event type
133+
elif hasattr(event, "event"):
134+
if event.event == TeamRunEvent.run_started.value:
135+
print("[Run Started]")
136+
elif event.event == TeamRunEvent.run_completed.value:
137+
print("\n[Run Completed]")
138+
139+
print()
140+
141+
142+
if __name__ == "__main__":
143+
streaming_with_events()

cookbook/04_workflows/_07_human_in_the_loop/__init__.py

Whitespace-only changes.
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
"""
2+
Condition with User Decision HITL Example
3+
4+
This example demonstrates how to use HITL with a Condition component,
5+
allowing the user to decide which branch to execute at runtime.
6+
7+
When `requires_confirmation=True` on a Condition, the `on_reject` setting
8+
controls what happens when the user rejects:
9+
10+
- on_reject="else" (default): Execute `else_steps` if provided, otherwise skip
11+
- on_reject="skip": Skip the entire condition (both branches)
12+
- on_reject="cancel": Cancel the workflow
13+
14+
This is useful for:
15+
- User-driven decision points
16+
- Interactive branching workflows
17+
- A/B testing with human judgment
18+
"""
19+
20+
from agno.db.sqlite import SqliteDb
21+
from agno.workflow.condition import Condition
22+
from agno.workflow.step import Step
23+
from agno.workflow.types import OnReject, StepInput, StepOutput
24+
from agno.workflow.workflow import Workflow
25+
26+
27+
# ============================================================
28+
# Step functions
29+
# ============================================================
30+
def analyze_data(step_input: StepInput) -> StepOutput:
31+
"""Analyze the data."""
32+
user_query = step_input.input or "data"
33+
return StepOutput(
34+
content=f"Analysis complete for '{user_query}':\n"
35+
"- Found potential issues that may require detailed review\n"
36+
"- Quick summary is available\n\n"
37+
"Would you like to proceed with detailed analysis?"
38+
)
39+
40+
41+
def detailed_analysis(step_input: StepInput) -> StepOutput:
42+
"""Perform detailed analysis (if branch)."""
43+
return StepOutput(
44+
content="Detailed Analysis Results:\n"
45+
"- Comprehensive review completed\n"
46+
"- All edge cases examined\n"
47+
"- Full report generated\n"
48+
"- Processing time: 10 minutes"
49+
)
50+
51+
52+
def quick_summary(step_input: StepInput) -> StepOutput:
53+
"""Provide quick summary (else branch)."""
54+
return StepOutput(
55+
content="Quick Summary:\n"
56+
"- Basic metrics computed\n"
57+
"- Key highlights identified\n"
58+
"- Processing time: 1 minute"
59+
)
60+
61+
62+
def generate_report(step_input: StepInput) -> StepOutput:
63+
"""Generate final report."""
64+
previous_content = step_input.previous_step_content or "No analysis"
65+
return StepOutput(
66+
content=f"=== FINAL REPORT ===\n\n{previous_content}\n\n"
67+
"Report generated successfully."
68+
)
69+
70+
71+
def run_demo(on_reject_mode: OnReject, demo_name: str):
72+
"""Run a demo with the specified on_reject mode."""
73+
print("\n" + "=" * 60)
74+
print(f"Demo: {demo_name}")
75+
print(f"on_reject = {on_reject_mode.value}")
76+
print("=" * 60)
77+
78+
# Define the steps
79+
analyze_step = Step(name="analyze_data", executor=analyze_data)
80+
81+
# Condition with HITL - user decides which branch to take
82+
# The evaluator is ignored when requires_confirmation=True
83+
# User confirms -> detailed_analysis (if branch)
84+
# User rejects -> behavior depends on on_reject setting
85+
analysis_condition = Condition(
86+
name="analysis_depth_decision",
87+
steps=[Step(name="detailed_analysis", executor=detailed_analysis)],
88+
else_steps=[Step(name="quick_summary", executor=quick_summary)],
89+
requires_confirmation=True,
90+
confirmation_message="Would you like to perform detailed analysis?",
91+
on_reject=on_reject_mode,
92+
)
93+
94+
report_step = Step(name="generate_report", executor=generate_report)
95+
96+
# Create workflow with database for HITL persistence
97+
workflow = Workflow(
98+
name="condition_hitl_demo",
99+
steps=[analyze_step, analysis_condition, report_step],
100+
db=SqliteDb(db_file="tmp/condition_hitl.db"),
101+
)
102+
103+
run_output = workflow.run("Q4 sales data")
104+
105+
# Handle HITL pauses
106+
while run_output.is_paused:
107+
# Handle Step requirements (confirmation)
108+
for requirement in run_output.steps_requiring_confirmation:
109+
print(f"\n[DECISION POINT] {requirement.step_name}")
110+
print(f"[HITL] {requirement.confirmation_message}")
111+
print(f"[INFO] on_reject mode: {requirement.on_reject}")
112+
113+
user_choice = input("\nYour choice (yes/no): ").strip().lower()
114+
if user_choice in ("yes", "y"):
115+
requirement.confirm()
116+
print("[HITL] Confirmed - executing 'if' branch (detailed analysis)")
117+
else:
118+
requirement.reject()
119+
if on_reject_mode == OnReject.else_branch:
120+
print("[HITL] Rejected - executing 'else' branch (quick summary)")
121+
elif on_reject_mode == OnReject.skip:
122+
print("[HITL] Rejected - skipping entire condition")
123+
else:
124+
print("[HITL] Rejected - cancelling workflow")
125+
126+
run_output = workflow.continue_run(run_output)
127+
128+
print("\n" + "-" * 40)
129+
print(f"Status: {run_output.status}")
130+
print("-" * 40)
131+
print(run_output.content)
132+
133+
134+
if __name__ == "__main__":
135+
print("=" * 60)
136+
print("Condition with User Decision HITL Example")
137+
print("=" * 60)
138+
print("\nThis demo shows 3 different on_reject behaviors:")
139+
print(" 1. on_reject='else' (default) - Execute else branch on reject")
140+
print(" 2. on_reject='skip' - Skip entire condition on reject")
141+
print(" 3. on_reject='cancel' - Cancel workflow on reject")
142+
print()
143+
144+
# Let user choose which demo to run
145+
print("Which demo would you like to run?")
146+
print(" 1. on_reject='else' (execute else branch)")
147+
print(" 2. on_reject='skip' (skip condition)")
148+
print(" 3. on_reject='cancel' (cancel workflow)")
149+
print(" 4. Run all demos")
150+
151+
choice = input("\nEnter choice (1-4): ").strip()
152+
153+
if choice == "1":
154+
run_demo(OnReject.else_branch, "Execute Else Branch on Reject")
155+
elif choice == "2":
156+
run_demo(OnReject.skip, "Skip Condition on Reject")
157+
elif choice == "3":
158+
run_demo(OnReject.cancel, "Cancel Workflow on Reject")
159+
elif choice == "4":
160+
# Run all demos - use a non-interactive mode for demonstration
161+
print(
162+
"\nRunning all demos with automatic 'no' response to show rejection behavior..."
163+
)
164+
165+
for mode, name in [
166+
(OnReject.else_branch, "Execute Else Branch on Reject"),
167+
(OnReject.skip, "Skip Condition on Reject"),
168+
(OnReject.cancel, "Cancel Workflow on Reject"),
169+
]:
170+
print("\n" + "=" * 60)
171+
print(f"Demo: {name}")
172+
print(f"on_reject = {mode.value}")
173+
print("=" * 60)
174+
175+
analyze_step = Step(name="analyze_data", executor=analyze_data)
176+
analysis_condition = Condition(
177+
name="analysis_depth_decision",
178+
evaluator=True,
179+
steps=[Step(name="detailed_analysis", executor=detailed_analysis)],
180+
else_steps=[Step(name="quick_summary", executor=quick_summary)],
181+
requires_confirmation=True,
182+
confirmation_message="Would you like to perform detailed analysis?",
183+
on_reject=mode,
184+
)
185+
report_step = Step(name="generate_report", executor=generate_report)
186+
187+
workflow = Workflow(
188+
name="condition_hitl_demo",
189+
steps=[analyze_step, analysis_condition, report_step],
190+
db=SqliteDb(db_file="tmp/condition_hitl.db"),
191+
)
192+
193+
run_output = workflow.run("Q4 sales data")
194+
195+
# Auto-reject for demonstration
196+
while run_output.is_paused:
197+
for requirement in run_output.steps_requiring_confirmation:
198+
print(f"\n[DECISION POINT] {requirement.step_name}")
199+
print(f"[HITL] {requirement.confirmation_message}")
200+
print("[AUTO] Rejecting to demonstrate on_reject behavior...")
201+
requirement.reject()
202+
203+
run_output = workflow.continue_run(run_output)
204+
205+
print(f"\nStatus: {run_output.status}")
206+
print(f"Content: {run_output.content}")
207+
else:
208+
print("Invalid choice. Running default demo (on_reject='else')...")
209+
run_demo(OnReject.else_branch, "Execute Else Branch on Reject")

0 commit comments

Comments
 (0)