Skip to content

Commit eb2849e

Browse files
committed
Adding Async Support
feat: enhance human feedback support in flows - Updated the @human_feedback decorator to use 'message' parameter instead of 'request' for clarity. - Introduced new FlowPausedEvent and MethodExecutionPausedEvent to handle flow and method pauses during human feedback. - Added ConsoleProvider for synchronous feedback collection and integrated async feedback capabilities. - Implemented SQLite persistence for managing pending feedback context. - Expanded documentation to include examples of async human feedback usage and best practices.
1 parent 400d75f commit eb2849e

File tree

13 files changed

+2431
-179
lines changed

13 files changed

+2431
-179
lines changed

docs/en/learn/human-feedback-in-flows.mdx

Lines changed: 209 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ from crewai.flow.flow import Flow, start, listen
3737
from crewai.flow.human_feedback import human_feedback
3838

3939
class SimpleReviewFlow(Flow):
40-
@human_feedback(request="Please review this content:")
4140
@start()
41+
@human_feedback(message="Please review this content:")
4242
def generate_content(self):
4343
return "This is AI-generated content that needs review."
4444

@@ -63,19 +63,20 @@ When this flow runs, it will:
6363

6464
| Parameter | Type | Required | Description |
6565
|-----------|------|----------|-------------|
66-
| `request` | `str` | Yes | The message shown to the human alongside the method output |
66+
| `message` | `str` | Yes | The message shown to the human alongside the method output |
6767
| `emit` | `Sequence[str]` | No | List of possible outcomes. Feedback is collapsed to one of these, which triggers `@listen` decorators |
6868
| `llm` | `str \| BaseLLM` | When `emit` specified | LLM used to interpret feedback and map to an outcome |
6969
| `default_outcome` | `str` | No | Outcome to use if no feedback provided. Must be in `emit` |
7070
| `metadata` | `dict` | No | Additional data for enterprise integrations |
71+
| `provider` | `HumanFeedbackProvider` | No | Custom provider for async/non-blocking feedback. See [Async Human Feedback](#async-human-feedback-non-blocking) |
7172

7273
### Basic Usage (No Routing)
7374

7475
When you don't specify `emit`, the decorator simply collects feedback and passes a `HumanFeedbackResult` to the next listener:
7576

7677
```python Code
77-
@human_feedback(request="What do you think of this analysis?")
7878
@start()
79+
@human_feedback(message="What do you think of this analysis?")
7980
def analyze_data(self):
8081
return "Analysis results: Revenue up 15%, costs down 8%"
8182

@@ -91,13 +92,13 @@ def handle_feedback(self, result):
9192
When you specify `emit`, the decorator becomes a router. The human's free-form feedback is interpreted by an LLM and collapsed into one of the specified outcomes:
9293

9394
```python Code
95+
@start()
9496
@human_feedback(
95-
request="Do you approve this content for publication?",
97+
message="Do you approve this content for publication?",
9698
emit=["approved", "rejected", "needs_revision"],
9799
llm="gpt-4o-mini",
98100
default_outcome="needs_revision",
99101
)
100-
@start()
101102
def review_content(self):
102103
return "Draft blog post content here..."
103104

@@ -212,13 +213,13 @@ class ContentApprovalFlow(Flow[ContentState]):
212213
self.state.draft = f"# {topic}\n\nThis is a draft about {topic}..."
213214
return self.state.draft
214215

216+
@listen(generate_draft)
215217
@human_feedback(
216-
request="Please review this draft. Reply 'approved', 'rejected', or provide revision feedback:",
218+
message="Please review this draft. Reply 'approved', 'rejected', or provide revision feedback:",
217219
emit=["approved", "rejected", "needs_revision"],
218220
llm="gpt-4o-mini",
219221
default_outcome="needs_revision",
220222
)
221-
@listen(generate_draft)
222223
def review_draft(self, draft):
223224
return draft
224225

@@ -278,23 +279,23 @@ Flow completed. Revisions requested: 0
278279

279280
## Combining with Other Decorators
280281

281-
The `@human_feedback` decorator works with other flow decorators. The order matters:
282+
The `@human_feedback` decorator works with other flow decorators. Place it as the innermost decorator (closest to the function):
282283

283284
```python Code
284-
# Correct: @human_feedback wraps the flow decorator
285-
@human_feedback(request="Review this:")
285+
# Correct: @human_feedback is innermost (closest to the function)
286286
@start()
287+
@human_feedback(message="Review this:")
287288
def my_start_method(self):
288289
return "content"
289290

290-
@human_feedback(request="Review this too:")
291291
@listen(other_method)
292+
@human_feedback(message="Review this too:")
292293
def my_listener(self, data):
293294
return f"processed: {data}"
294295
```
295296

296297
<Tip>
297-
Place `@human_feedback` as the outermost decorator (first/top) so it runs after the method completes and can capture the return value.
298+
Place `@human_feedback` as the innermost decorator (last/closest to the function) so it wraps the method directly and can capture the return value before passing to the flow system.
298299
</Tip>
299300

300301
## Best Practices
@@ -305,10 +306,10 @@ The `request` parameter is what the human sees. Make it actionable:
305306

306307
```python Code
307308
# ✅ Good - clear and actionable
308-
@human_feedback(request="Does this summary accurately capture the key points? Reply 'yes' or explain what's missing:")
309+
@human_feedback(message="Does this summary accurately capture the key points? Reply 'yes' or explain what's missing:")
309310

310311
# ❌ Bad - vague
311-
@human_feedback(request="Review this:")
312+
@human_feedback(message="Review this:")
312313
```
313314

314315
### 2. Choose Meaningful Outcomes
@@ -329,7 +330,7 @@ Use `default_outcome` to handle cases where users press Enter without typing:
329330

330331
```python Code
331332
@human_feedback(
332-
request="Approve? (press Enter to request revision)",
333+
message="Approve? (press Enter to request revision)",
333334
emit=["approved", "needs_revision"],
334335
llm="gpt-4o-mini",
335336
default_outcome="needs_revision", # Safe default
@@ -365,9 +366,202 @@ When designing flows, consider whether you need routing:
365366
| Approval gates with approve/reject/revise | Use `emit` |
366367
| Collecting comments for logging only | No `emit` |
367368

369+
## Async Human Feedback (Non-Blocking)
370+
371+
By default, `@human_feedback` blocks execution waiting for console input. For production applications, you may need **async/non-blocking** feedback that integrates with external systems like Slack, email, webhooks, or APIs.
372+
373+
### The Provider Abstraction
374+
375+
Use the `provider` parameter to specify a custom feedback collection strategy:
376+
377+
```python Code
378+
from crewai.flow import Flow, start, human_feedback, HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
379+
380+
class WebhookProvider(HumanFeedbackProvider):
381+
"""Provider that pauses flow and waits for webhook callback."""
382+
383+
def __init__(self, webhook_url: str):
384+
self.webhook_url = webhook_url
385+
386+
def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
387+
# Notify external system (e.g., send Slack message, create ticket)
388+
self.send_notification(context)
389+
390+
# Pause execution - framework handles persistence automatically
391+
raise HumanFeedbackPending(
392+
context=context,
393+
callback_info={"webhook_url": f"{self.webhook_url}/{context.flow_id}"}
394+
)
395+
396+
class ReviewFlow(Flow):
397+
@start()
398+
@human_feedback(
399+
message="Review this content:",
400+
emit=["approved", "rejected"],
401+
llm="gpt-4o-mini",
402+
provider=WebhookProvider("https://myapp.com/api"),
403+
)
404+
def generate_content(self):
405+
return "AI-generated content..."
406+
407+
@listen("approved")
408+
def publish(self, result):
409+
return "Published!"
410+
```
411+
412+
<Tip>
413+
The flow framework **automatically persists state** when `HumanFeedbackPending` is raised. Your provider only needs to notify the external system and raise the exception—no manual persistence calls required.
414+
</Tip>
415+
416+
### Handling Paused Flows
417+
418+
When using an async provider, `kickoff()` returns a `HumanFeedbackPending` object instead of raising an exception:
419+
420+
```python Code
421+
flow = ReviewFlow()
422+
result = flow.kickoff()
423+
424+
if isinstance(result, HumanFeedbackPending):
425+
# Flow is paused, state is automatically persisted
426+
print(f"Waiting for feedback at: {result.callback_info['webhook_url']}")
427+
print(f"Flow ID: {result.context.flow_id}")
428+
else:
429+
# Normal completion
430+
print(f"Flow completed: {result}")
431+
```
432+
433+
### Resuming a Paused Flow
434+
435+
When feedback arrives (e.g., via webhook), resume the flow:
436+
437+
```python Code
438+
# In your webhook handler:
439+
def handle_feedback_webhook(flow_id: str, feedback: str):
440+
# Restore the paused flow
441+
flow = ReviewFlow.from_pending(flow_id)
442+
443+
# Resume with the feedback
444+
result = flow.resume(feedback)
445+
446+
return result
447+
```
448+
449+
### Key Types
450+
451+
| Type | Description |
452+
|------|-------------|
453+
| `HumanFeedbackProvider` | Protocol for custom feedback providers |
454+
| `PendingFeedbackContext` | Contains all info needed to resume a paused flow |
455+
| `HumanFeedbackPending` | Returned by `kickoff()` when flow is paused for feedback |
456+
| `ConsoleProvider` | Default blocking console input provider |
457+
458+
### PendingFeedbackContext
459+
460+
The context contains everything needed to resume:
461+
462+
```python Code
463+
@dataclass
464+
class PendingFeedbackContext:
465+
flow_id: str # Unique identifier for this flow execution
466+
flow_class: str # Fully qualified class name
467+
method_name: str # Method that triggered feedback
468+
method_output: Any # Output shown to the human
469+
message: str # The request message
470+
emit: list[str] | None # Possible outcomes for routing
471+
default_outcome: str | None
472+
metadata: dict # Custom metadata
473+
llm: str | None # LLM for outcome collapsing
474+
requested_at: datetime
475+
```
476+
477+
### Complete Async Flow Example
478+
479+
```python Code
480+
from crewai.flow import (
481+
Flow, start, listen, human_feedback,
482+
HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
483+
)
484+
485+
class SlackNotificationProvider(HumanFeedbackProvider):
486+
"""Provider that sends Slack notifications and pauses for async feedback."""
487+
488+
def __init__(self, channel: str):
489+
self.channel = channel
490+
491+
def request_feedback(self, context: PendingFeedbackContext, flow: Flow) -> str:
492+
# Send Slack notification (implement your own)
493+
slack_thread_id = self.post_to_slack(
494+
channel=self.channel,
495+
message=f"Review needed:\n\n{context.method_output}\n\n{context.message}",
496+
)
497+
498+
# Pause execution - framework handles persistence automatically
499+
raise HumanFeedbackPending(
500+
context=context,
501+
callback_info={
502+
"slack_channel": self.channel,
503+
"thread_id": slack_thread_id,
504+
}
505+
)
506+
507+
class ContentPipeline(Flow):
508+
@start()
509+
@human_feedback(
510+
message="Approve this content for publication?",
511+
emit=["approved", "rejected", "needs_revision"],
512+
llm="gpt-4o-mini",
513+
default_outcome="needs_revision",
514+
provider=SlackNotificationProvider("#content-reviews"),
515+
)
516+
def generate_content(self):
517+
return "AI-generated blog post content..."
518+
519+
@listen("approved")
520+
def publish(self, result):
521+
print(f"Publishing! Reviewer said: {result.feedback}")
522+
return {"status": "published"}
523+
524+
@listen("rejected")
525+
def archive(self, result):
526+
print(f"Archived. Reason: {result.feedback}")
527+
return {"status": "archived"}
528+
529+
@listen("needs_revision")
530+
def queue_revision(self, result):
531+
print(f"Queued for revision: {result.feedback}")
532+
return {"status": "revision_needed"}
533+
534+
535+
# Starting the flow (will pause and wait for Slack response)
536+
def start_content_pipeline():
537+
flow = ContentPipeline()
538+
result = flow.kickoff()
539+
540+
if isinstance(result, HumanFeedbackPending):
541+
return {"status": "pending", "flow_id": result.context.flow_id}
542+
543+
return result
544+
545+
546+
# Resuming when Slack webhook fires
547+
def on_slack_feedback(flow_id: str, slack_message: str):
548+
flow = ContentPipeline.from_pending(flow_id)
549+
result = flow.resume(slack_message)
550+
return result
551+
```
552+
553+
### Best Practices for Async Feedback
554+
555+
1. **Check the return type**: `kickoff()` returns `HumanFeedbackPending` when paused—no try/except needed
556+
2. **Store callback info**: Use `callback_info` to store webhook URLs, ticket IDs, etc.
557+
3. **Implement idempotency**: Your resume handler should be idempotent for safety
558+
4. **Automatic persistence**: State is automatically saved when `HumanFeedbackPending` is raised and uses `SQLiteFlowPersistence` by default
559+
5. **Custom persistence**: Pass a custom persistence instance to `from_pending()` if needed
560+
368561
## Related Documentation
369562

370563
- [Flows Overview](/en/concepts/flows) - Learn about CrewAI Flows
371564
- [Flow State Management](/en/guides/flows/mastering-flow-state) - Managing state in flows
565+
- [Flow Persistence](/en/concepts/flows#persistence) - Persisting flow state
372566
- [Routing with @router](/en/concepts/flows#router) - More about conditional routing
373567
- [Human Input on Execution](/en/learn/human-input-on-execution) - Task-level human input

lib/crewai/src/crewai/events/event_listener.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
from crewai.events.types.flow_events import (
3939
FlowCreatedEvent,
4040
FlowFinishedEvent,
41+
FlowPausedEvent,
4142
FlowStartedEvent,
4243
MethodExecutionFailedEvent,
4344
MethodExecutionFinishedEvent,
45+
MethodExecutionPausedEvent,
4446
MethodExecutionStartedEvent,
4547
)
4648
from crewai.events.types.knowledge_events import (
@@ -363,6 +365,28 @@ def on_method_execution_failed(
363365
)
364366
self.method_branches[event.method_name] = updated_branch
365367

368+
@crewai_event_bus.on(MethodExecutionPausedEvent)
369+
def on_method_execution_paused(
370+
_: Any, event: MethodExecutionPausedEvent
371+
) -> None:
372+
method_branch = self.method_branches.get(event.method_name)
373+
updated_branch = self.formatter.update_method_status(
374+
method_branch,
375+
self.formatter.current_flow_tree,
376+
event.method_name,
377+
"paused",
378+
)
379+
self.method_branches[event.method_name] = updated_branch
380+
381+
@crewai_event_bus.on(FlowPausedEvent)
382+
def on_flow_paused(_: Any, event: FlowPausedEvent) -> None:
383+
self.formatter.update_flow_status(
384+
self.formatter.current_flow_tree,
385+
event.flow_name,
386+
event.flow_id,
387+
"paused",
388+
)
389+
366390
# ----------- TOOL USAGE EVENTS -----------
367391

368392
@crewai_event_bus.on(ToolUsageStartedEvent)

0 commit comments

Comments
 (0)