Lorenze/feat/conversational flows#5896
Conversation
- Introduced a new guide for building multi-turn chat applications using , detailing session management and message handling. - Added class to facilitate chat interactions, including streaming support and event handling. - Implemented for class-level defaults and improved input normalization for conversational turns. - Enhanced event listeners to manage flow events and tracing more effectively, including support for nested crew executions. - Added tests for conversational flow helpers and kickoff parameters to ensure functionality and reliability.
- Updated TraceCollectionListener to handle nested flows without re-claiming parent session batches. - Ensured that method execution events are always emitted for tracing, regardless of flow event suppression. - Improved finalization logic for flow trace batches to respect session deferral flags. - Added tests to verify that method execution events are emitted correctly when flow events are suppressed and that deferred session finalization is respected in nested flows.
…conversational-flows
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds multi-turn conversational flow support: ChatSession transport and event bridge, conversation helpers and ChatState, Flow kickoff/session APIs (interactive, session params, deferred tracing), QueueInputProvider, tracing ownership/deferral changes, experimental ConversationalFlow, multilingual docs, and tests. ChangesConversational Flows Implementation and Documentation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
| from unittest.mock import MagicMock, patch | ||
| from uuid import uuid4 | ||
|
|
||
| import pytest |
| from uuid import uuid4 | ||
|
|
||
| import pytest | ||
| from pydantic import BaseModel, Field |
|
Preview deployment for your docs. Learn more about Mintlify Previews.
💡 Tip: Enable Workflows to automatically generate PRs for you. |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py (1)
346-350:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAvoid aliasing
current_batch.eventswithevent_bufferbefore send.Line 349 reuses the same list object as Line 346. When
_send_events_to_backend()clearsevent_buffer, it can also wipecurrent_batch.events, so finalized batches may lose their event payload.Suggested fix
- self.current_batch.events = sorted_events + self.current_batch.events = sorted_events.copy() events_sent_count = len(sorted_events) if sorted_events: - self.event_buffer = sorted_events + self.event_buffer = sorted_events.copy() events_sent_to_backend_status = self._send_events_to_backend()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py` around lines 346 - 350, current_batch.events is being aliased to event_buffer which can be mutated by _send_events_to_backend; instead assign a shallow copy for event_buffer so clearing or modifying it won't affect current_batch.events. Update the assignment in trace_batch_manager so you keep self.current_batch.events = sorted_events but set self.event_buffer to a copy (e.g., list(sorted_events) or sorted_events.copy()) before calling _send_events_to_backend(), ensuring _send_events_to_backend() can safely clear/modify event_buffer without wiping current_batch.events.lib/crewai/tests/tracing/test_tracing.py (1)
429-446:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMake these tracing tests fully offline/deterministic.
These paths are currently causing CI failures (Line 445 and Line 482) due to VCR cassette mismatch against
POST https://api.openai.com/v1/chat/completions. Please stub/mock model execution in these tests socrew.kickoff()cannot hit live LLM endpoints.Also applies to: 455-483
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/tracing/test_tracing.py` around lines 429 - 446, The test must not call real LLM endpoints: stub the model execution invoked by Crew.kickoff so it returns a deterministic response (e.g., "hello world") instead of performing a POST to OpenAI. Locate the code path used by kickoff (the Agent/Task model invocation) and patch or monkeypatch that function (the method on Agent or Crew that sends chat completions) to a fake implementation that returns a deterministic completion; keep using Agent, Task, Crew, TraceCollectionListener and then call crew.kickoff() so the trace assertions run offline and deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/flow/chat.py`:
- Around line 160-167: iter_turn_stream replaces self._on_event with _collect
but only sets the handler when self._bridge is None, so if a bridge already
exists its handler still points to the old callback and streamed events are
dropped; update the existing bridge to use the new _collect handler instead of
skipping it (e.g., assign the bridge's handler to _collect or call its
registration method to update the handler) and apply the same change to the
similar block around the 171-173 section so existing ConversationEventBridge
instances always forward events to the newly assigned self._on_event.
- Around line 280-281: unregister currently only clears the local list
self._handlers which leaves callbacks registered on the global event bus and
causes leaks; update unregister to iterate over each handler in self._handlers
and call the event bus' detach method (e.g., self._bus.remove_listener /
self._bus.off / self._bus.unsubscribe as appropriate for your bus
implementation) for each handler before clearing self._handlers so listeners are
actually removed from the event bus.
- Around line 136-140: ChatSession is emitting "turn_finished" in handle_turn
(the self._emit("turn_finished", ...) calls) while ConversationEventBridge also
emits "turn_finished" on FlowFinishedEvent, causing duplicate terminal events;
remove the redundant self._emit("turn_finished", ...) calls inside
ChatSession.handle_turn (both occurrences around the assistant_done emit and the
later block) so that turn termination is emitted centrally by
ConversationEventBridge on FlowFinishedEvent, or alternatively add a guard/flag
in ChatSession to emit only when ConversationEventBridge will not emit—refer to
ChatSession.handle_turn, the "assistant_done"/_emit calls, and
ConversationEventBridge/FlowFinishedEvent when making the change.
In `@lib/crewai/src/crewai/flow/conversation.py`:
- Around line 140-143: flow._conversation_messages is currently inferred as
list[dict[str, Any]] but you append an LLMMessage; change the fallback to use a
list of LLMMessage (or convert the LLMMessage to dict before appending) so types
align. Specifically update the initialization for flow._conversation_messages in
the block that checks hasattr(flow, "_conversation_messages") to create a
list[LLMMessage] (or call message.to_dict()/asdict equivalent before append) and
ensure any other code that reads _conversation_messages expects LLMMessage
instances if you choose that route; refer to flow._conversation_messages and the
LLMMessage type when making the change.
- Around line 169-174: The call to flow.classify_intent(text, outcomes, llm=llm,
context=get_conversation_messages(flow)) passes a list[LLMMessage] but
classify_intent expects Sequence[dict[str, Any]] | None; fix by converting the
messages to plain dicts before passing (e.g. map each LLMMessage from
get_conversation_messages(flow) to a dict via its serialization method or
dataclass conversion) and pass that result as context to flow.classify_intent;
ensure to handle an empty list -> None if classify_intent semantics require it.
In `@lib/crewai/src/crewai/flow/flow.py`:
- Around line 3685-3686: The truthy check before appending conversation messages
drops intentional empty-string responses from ask(); change the condition in the
code around _append_conversation_message(self, "user", response) to check for
None instead (e.g., use "if response is not None:") so that "" is preserved in
the conversation history while still skipping truly missing/None responses;
update the condition where response is handled in flow.py near the call to
_append_conversation_message and ensure this behavior aligns with the documented
ask() semantics.
- Around line 2621-2626: The input hydration currently builds filtered_inputs in
the flow by excluding only ("id", "user_message", "last_intent") but still
passes "messages" into _initialize_state; update the filter used when building
filtered_inputs (the dict comprehension in the flow where filtered_inputs is
created) to also exclude "messages" so that _initialize_state is not given
conversational message data (avoid structured message/file coercion errors).
- Around line 1281-1284: Update the Flow.conversation_messages property
annotation to return list[LLMMessage] (matching get_conversation_messages) to
fix the mypy mismatch; in the kickoff path, remove "messages" from the
filtered_inputs that get passed into _initialize_state (or coerce it to the
proper ChatState.messages shape) so initialization doesn't receive invalid
inputs for ChatState.messages; and in ask(), change the append condition from
"if response:" to "if response is not None" so empty-string responses are
preserved in the conversation history. Ensure references:
conversation_messages/get_conversation_messages, kickoff filtered_inputs ->
_initialize_state, ChatState.messages, and ask()/response check are updated
accordingly.
In `@lib/crewai/src/crewai/flow/providers/queue.py`:
- Around line 48-50: The close_session implementation currently calls
_get_queue(session_id) which leaves the session_id in the _queues mapping;
change close_session to avoid creating or leaving entries by retrieving and
removing the queue entry from self._queues (e.g. q =
self._queues.pop(session_id, None)) and if q is not None call q.put(None) to
signal closure; ensure you do the pop under the same synchronization used for
_queues (if there is a lock/guard used elsewhere) to avoid races.
In `@lib/crewai/tests/test_flow_conversation.py`:
- Around line 327-350: In test_flow_finished_without_flow_started_warns save the
previous event scope returned by restore_event_scope(()) before setting the
empty scope, wrap the test body that emits FlowFinishedEvent in a try block, and
in a finally call restore_event_scope(previous) to restore the prior scope;
reference the test_flow_finished_without_flow_started_warns function and the
restore_event_scope and crewai_event_bus.emit usage so the restoration happens
even on failures.
---
Outside diff comments:
In `@lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py`:
- Around line 346-350: current_batch.events is being aliased to event_buffer
which can be mutated by _send_events_to_backend; instead assign a shallow copy
for event_buffer so clearing or modifying it won't affect current_batch.events.
Update the assignment in trace_batch_manager so you keep
self.current_batch.events = sorted_events but set self.event_buffer to a copy
(e.g., list(sorted_events) or sorted_events.copy()) before calling
_send_events_to_backend(), ensuring _send_events_to_backend() can safely
clear/modify event_buffer without wiping current_batch.events.
In `@lib/crewai/tests/tracing/test_tracing.py`:
- Around line 429-446: The test must not call real LLM endpoints: stub the model
execution invoked by Crew.kickoff so it returns a deterministic response (e.g.,
"hello world") instead of performing a POST to OpenAI. Locate the code path used
by kickoff (the Agent/Task model invocation) and patch or monkeypatch that
function (the method on Agent or Crew that sends chat completions) to a fake
implementation that returns a deterministic completion; keep using Agent, Task,
Crew, TraceCollectionListener and then call crew.kickoff() so the trace
assertions run offline and deterministically.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 546c6575-a4e9-4fe9-a02b-c2a961cf0edd
📒 Files selected for processing (26)
docs/ar/guides/flows/conversational-flows.mdxdocs/ar/guides/flows/first-flow.mdxdocs/ar/guides/flows/mastering-flow-state.mdxdocs/docs.jsondocs/en/guides/flows/conversational-flows.mdxdocs/en/guides/flows/first-flow.mdxdocs/en/guides/flows/mastering-flow-state.mdxdocs/ko/guides/flows/conversational-flows.mdxdocs/ko/guides/flows/first-flow.mdxdocs/ko/guides/flows/mastering-flow-state.mdxdocs/pt-BR/guides/flows/conversational-flows.mdxdocs/pt-BR/guides/flows/first-flow.mdxdocs/pt-BR/guides/flows/mastering-flow-state.mdxlib/crewai/src/crewai/events/event_listener.pylib/crewai/src/crewai/events/listeners/tracing/first_time_trace_handler.pylib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.pylib/crewai/src/crewai/events/listeners/tracing/trace_listener.pylib/crewai/src/crewai/flow/__init__.pylib/crewai/src/crewai/flow/chat.pylib/crewai/src/crewai/flow/conversation.pylib/crewai/src/crewai/flow/flow.pylib/crewai/src/crewai/flow/flow_context.pylib/crewai/src/crewai/flow/providers/__init__.pylib/crewai/src/crewai/flow/providers/queue.pylib/crewai/tests/test_flow_conversation.pylib/crewai/tests/tracing/test_tracing.py
| if not hasattr(flow, "_conversation_messages"): | ||
| object.__setattr__(flow, "_conversation_messages", []) | ||
| flow._conversation_messages.append(message) | ||
|
|
There was a problem hiding this comment.
Type mismatch when appending to _conversation_messages fallback buffer.
flow._conversation_messages is inferred as list[dict[str, Any]], but message is LLMMessage; this is currently failing mypy in CI.
Proposed fix
- if not hasattr(flow, "_conversation_messages"):
- object.__setattr__(flow, "_conversation_messages", [])
- flow._conversation_messages.append(message)
+ if not hasattr(flow, "_conversation_messages"):
+ object.__setattr__(flow, "_conversation_messages", [])
+ cast(list[LLMMessage], flow._conversation_messages).append(message)🧰 Tools
🪛 GitHub Actions: Run Type Checks / 1_type-checker (3.11).txt
[error] 142-142: mypy: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🪛 GitHub Actions: Run Type Checks / 2_type-checker (3.12).txt
[error] 142-142: mypy error: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🪛 GitHub Actions: Run Type Checks / 3_type-checker (3.13).txt
[error] 142-142: mypy: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🪛 GitHub Actions: Run Type Checks / 4_type-checker (3.10).txt
[error] 142-142: mypy: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.10)
[error] 142-142: mypy: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.11)
[error] 142-142: mypy: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.12)
[error] 142-142: mypy error: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]". [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.13)
[error] 142-142: mypy: Argument 1 to "append" of "list" has incompatible type "LLMMessage"; expected "dict[str, Any]" [arg-type]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/flow/conversation.py` around lines 140 - 143,
flow._conversation_messages is currently inferred as list[dict[str, Any]] but
you append an LLMMessage; change the fallback to use a list of LLMMessage (or
convert the LLMMessage to dict before appending) so types align. Specifically
update the initialization for flow._conversation_messages in the block that
checks hasattr(flow, "_conversation_messages") to create a list[LLMMessage] (or
call message.to_dict()/asdict equivalent before append) and ensure any other
code that reads _conversation_messages expects LLMMessage instances if you
choose that route; refer to flow._conversation_messages and the LLMMessage type
when making the change.
| intent = flow.classify_intent( | ||
| text, | ||
| outcomes, | ||
| llm=llm, | ||
| context=get_conversation_messages(flow), | ||
| ) |
There was a problem hiding this comment.
classify_intent(..., context=...) receives an incompatible type.
get_conversation_messages(flow) returns list[LLMMessage], but classify_intent expects Sequence[dict[str, Any]] | None; this is also failing mypy in CI.
Proposed fix
intent = flow.classify_intent(
text,
outcomes,
llm=llm,
- context=get_conversation_messages(flow),
+ context=cast(Sequence[dict[str, Any]], get_conversation_messages(flow)),
)🧰 Tools
🪛 GitHub Actions: Run Type Checks / 1_type-checker (3.11).txt
[error] 173-173: mypy: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🪛 GitHub Actions: Run Type Checks / 2_type-checker (3.12).txt
[error] 173-173: mypy error: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🪛 GitHub Actions: Run Type Checks / 3_type-checker (3.13).txt
[error] 173-173: mypy: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🪛 GitHub Actions: Run Type Checks / 4_type-checker (3.10).txt
[error] 173-173: mypy: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.10)
[error] 173-173: mypy: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.11)
[error] 173-173: mypy: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.12)
[error] 173-173: mypy error: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None". [arg-type]
🪛 GitHub Actions: Run Type Checks / type-checker (3.13)
[error] 173-173: mypy: Argument "context" to "classify_intent" of "Flow" has incompatible type "list[LLMMessage]"; expected "Sequence[dict[str, Any]] | None" [arg-type]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/flow/conversation.py` around lines 169 - 174, The call
to flow.classify_intent(text, outcomes, llm=llm,
context=get_conversation_messages(flow)) passes a list[LLMMessage] but
classify_intent expects Sequence[dict[str, Any]] | None; fix by converting the
messages to plain dicts before passing (e.g. map each LLMMessage from
get_conversation_messages(flow) to a dict via its serialization method or
dataclass conversion) and pass that result as context to flow.classify_intent;
ensure to handle an empty list -> None if classify_intent semantics require it.
| @property | ||
| def conversation_messages(self) -> list[dict[str, Any]]: | ||
| """Message history from state or the internal conversation buffer.""" | ||
| return get_conversation_messages(self) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate and print relevant sections from flow.py around the referenced lines
FILE="lib/crewai/src/crewai/flow/flow.py"
echo "=== conversation_messages property (around 1265-1305) ==="
sed -n '1265,1305p' "$FILE" | nl -ba -w2 -s':'
echo "=== call sites / definitions for get_conversation_messages ==="
rg -n "def get_conversation_messages|get_conversation_messages\(" "$FILE"
echo "=== Types of get_conversation_messages (search in repo) ==="
rg -n "def get_conversation_messages" lib/crewai/src/crewai || true
rg -n "get_conversation_messages" lib/crewai/src/crewai || true
echo "=== _initialize_state conversion block (around 2050-2120) ==="
sed -n '2050,2120p' "$FILE" | nl -ba -w2 -s':'
echo "=== segment filters conversational keys (around 2580-2660) ==="
sed -n '2580,2660p' "$FILE" | nl -ba -w2 -s':'
echo "=== ask()/append conversation message logic (around 3660-3710) ==="
sed -n '3660,3710p' "$FILE" | nl -ba -w2 -s':'
echo "=== ask() implementation (broader context) ==="
rg -n "def ask\\b|\\bask\\(" "$FILE" | head -n 50
sed -n '3710,3900p' "$FILE" | nl -ba -w2 -s':' | head -n 220Repository: crewAIInc/crewAI
Length of output: 179
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
if [ ! -f "$FILE" ]; then
echo "File not found: $FILE"
exit 1
fi
echo "=== conversation_messages property (around 1260-1310) ==="
sed -n '1240,1320p' "$FILE" | cat -n -w2 -s':'
echo "=== Search for get_conversation_messages definition/usages in flow.py ==="
rg -n "get_conversation_messages" "$FILE" || true
rg -n "def get_conversation_messages" "$FILE" || true
echo "=== Types / implementation of get_conversation_messages in repo ==="
rg -n "def get_conversation_messages" lib/crewai/src/crewai || true
rg -n "get_conversation_messages" lib/crewai/src/crewai || true
echo "=== _initialize_state conversion block (around 2050-2145) ==="
sed -n '2000,2180p' "$FILE" | cat -n -w2 -s':'
echo "=== segment filters conversational keys (around 2550-2680) ==="
sed -n '2520,2700p' "$FILE" | cat -n -w2 -s':'
echo "=== ask()/append conversation message logic (around 3600-3720) ==="
sed -n '3600,3725p' "$FILE" | cat -n -w2 -s':'
echo "=== ask() implementation (broader context) ==="
sed -n '3720,3900p' "$FILE" | cat -n -w2 -s':'Repository: crewAIInc/crewAI
Length of output: 202
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
if [ ! -f "$FILE" ]; then
echo "File not found: $FILE"
exit 1
fi
num() { awk '{printf "%d:%s\n", NR, $0}'; }
echo "=== conversation_messages property (around 1240-1320) ==="
sed -n '1240,1320p' "$FILE" | num
echo "=== Search for get_conversation_messages definition/usages in flow.py ==="
rg -n "get_conversation_messages" "$FILE" || true
rg -n "def get_conversation_messages" "$FILE" || true
echo "=== Types / implementation of get_conversation_messages in repo ==="
rg -n "def get_conversation_messages" lib/crewai/src/crewai || true
rg -n "get_conversation_messages" lib/crewai/src/crewai || true
echo "=== _initialize_state conversion block (around 2000-2180) ==="
sed -n '2000,2180p' "$FILE" | num
echo "=== segment filters conversational keys (around 2520-2700) ==="
sed -n '2520,2700p' "$FILE" | num
echo "=== ask()/append conversation message logic (around 3600-3725) ==="
sed -n '3600,3725p' "$FILE" | num
echo "=== ask() implementation (broader context 3720-3900) ==="
sed -n '3720,3900p' "$FILE" | numRepository: crewAIInc/crewAI
Length of output: 34130
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
CONV_FILE="lib/crewai/src/crewai/flow/conversation.py"
echo "=== conversation.py: get_conversation_messages + LLMMessage definition ==="
if [ -f "$CONV_FILE" ]; then
sed -n '1,260p' "$CONV_FILE" | cat -n
else
echo "Missing: $CONV_FILE"
fi
echo "=== flow.py: ask() definition/docstring + response handling ==="
rg -n "def ask\\b" "$FILE" | head -n 20 || true
# show likely region around ask
sed -n '3300,3750p' "$FILE" | cat -n | head -n 500
echo "=== flow.py: search for \"messages\" key handling before _initialize_state ==="
rg -n "\"messages\"|messages\\s*=" "$FILE" || true
echo "=== flow.py: search for state validation / source conversion related to messages ==="
rg -n "messages\\..*files|files\\..*source|source\\b|_initialize_state\\(" "$FILE" || true
echo "=== conversation.py: look for normalization/append behavior ==="
rg -n "_append_conversation_message|append_conversation_message|conversation_messages" "$CONV_FILE" || true
sed -n '260,520p' "$CONV_FILE" | cat -nRepository: crewAIInc/crewAI
Length of output: 34139
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
TYPES_FILE="lib/crewai/src/crewai/utilities/types.py"
CONV_FILE="lib/crewai/src/crewai/flow/conversation.py"
echo "=== LLMMessage type definition ==="
if [ -f "$TYPES_FILE" ]; then
rg -n "LLMMessage" "$TYPES_FILE"
sed -n '1,220p' "$TYPES_FILE" | cat -n
else
echo "TYPES_FILE not found: $TYPES_FILE"
rg -n "LLMMessage" lib/crewai/src/crewai/utilities || true
fi
echo "=== flow.py: exact lines 1260-1310 around conversation_messages ==="
python3 - <<'PY'
import itertools, pathlib
path = pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
start, end = 1240, 1310
lines = path.read_text(encoding="utf-8").splitlines()
for i in range(start-1, min(end, len(lines))):
print(f"{i+1}:{lines[i]}")
PY
echo "=== flow.py: exact lines around filtered_inputs exclusion ==="
python3 - <<'PY'
import pathlib, re
path = pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
lines = path.read_text(encoding="utf-8").splitlines()
# find the filtered_inputs dict comprehension
pat = re.compile(r'filtered_inputs\s*=\s*\{')
for idx, line in enumerate(lines, start=1):
if pat.search(line):
# print 40 lines around
lo = max(1, idx-15); hi = min(len(lines), idx+35)
for i in range(lo, hi+1):
print(f"{i}:{lines[i-1]}")
break
else:
print("filtered_inputs comprehension not found")
PY
echo "=== flow.py: exact ask() append logic (search for 'if response:' and append message) ==="
python3 - <<'PY'
import pathlib, re
path = pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
lines = path.read_text(encoding="utf-8").splitlines()
# find "if response:" then next lines contain _append_conversation_message
for idx, line in enumerate(lines, start=1):
if line.strip() == "if response:":
lo = max(1, idx-10); hi = min(len(lines), idx+10)
for i in range(lo, hi+1):
print(f"{i}:{lines[i-1]}")
break
else:
print("No 'if response:' found")
PY
echo "=== flow.py: search for messages.*.files.*.source conversion ==="
rg -n "messages\..*files\..*source|files\..*source|_initialize_state.*files|source\b" "$FILE" || true
echo "=== flow.py: locate _initialize_state and print exact range covering any conversion ==="
python3 - <<'PY'
import pathlib
path = pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
lines = path.read_text(encoding="utf-8").splitlines()
# find def _initialize_state
start=None
for idx,line in enumerate(lines, start=1):
if line.startswith(" def _initialize_state"):
start=idx; break
if start is None:
raise SystemExit("def _initialize_state not found")
# print a larger window around it
lo = max(1, start-20); hi = min(len(lines), start+260)
for i in range(lo, hi+1):
print(f"{i}:{lines[i-1]}")
PYRepository: crewAIInc/crewAI
Length of output: 21963
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
echo "=== Locate _configure_conversational_kickoff ==="
rg -n "def _configure_conversational_kickoff" "$FILE" || true
# Print function body around it
python3 - <<'PY'
import pathlib, re
path = pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
lines = path.read_text(encoding="utf-8").splitlines()
start=None
for i,l in enumerate(lines, start=1):
if re.match(r"\s*def _configure_conversational_kickoff\b", l):
start=i; break
if not start:
raise SystemExit("not found")
lo=max(1,start-30); hi=min(len(lines), start+260)
for i in range(lo, hi+1):
print(f"{i}:{lines[i-1]}")
PY
echo "=== Search for 'inputs.get(\"messages\")' and '\"messages\" in inputs' ==="
rg -n "inputs\.get\(\"messages\"\)|\"messages\"\s+in\s+inputs|messages\s*=" "$FILE" || true
echo "=== Search for _apply_pending_conversational_turn ==="
rg -n "def _apply_pending_conversational_turn|_apply_pending_conversational_turn" "$FILE" || true
python3 - <<'PY'
import pathlib, re
path = pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
lines = path.read_text(encoding="utf-8").splitlines()
start=None
for i,l in enumerate(lines, start=1):
if re.match(r"\s*def _apply_pending_conversational_turn\b", l):
start=i; break
if not start:
# if method name not found, just exit
print("no def _apply_pending_conversational_turn")
raise SystemExit(0)
lo=max(1,start-20); hi=min(len(lines), start+220)
for i in range(lo, hi+1):
print(f"{i}:{lines[i-1]}")
PY
echo "=== Ensure conversation messages are appended during kickoff path ==="
rg -n "prepare_conversational_turn|append_message|get_conversation_messages|state\.messages|\"messages\".*append" "$FILE" || trueRepository: crewAIInc/crewAI
Length of output: 24330
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
echo "=== Search for LLMMessage import in flow.py ==="
rg -n "LLMMessage" "$FILE" || true
echo "=== Search for conversation_messages usages ==="
rg -n "conversation_messages" "$FILE" || trueRepository: crewAIInc/crewAI
Length of output: 398
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Search for any validation/coercion around "source" inside "files" and/or "messages"
rg -n "messages\..*files\..*source|files\..*source|\"source\"\s*:.*|source\s*=" lib/crewai/src/crewai || true
# Also search for FileInput / crewai_files usage patterns
rg -n "FileInput|crewai_files" lib/crewai/src/crewai || true
# Search for pydantic validators mentioning "files" or "source"
rg -n "validator\\(|field_validator\\(|model_validator\\(|ValidationError|coerce|parse_source|from_source" lib/crewai/src/crewai || trueRepository: crewAIInc/crewAI
Length of output: 28303
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="lib/crewai/src/crewai/flow/flow.py"
python3 - <<'PY'
import pathlib
path=pathlib.Path("lib/crewai/src/crewai/flow/flow.py")
lines=path.read_text(encoding="utf-8").splitlines()
# print around _conversation_messages declaration
for i,l in enumerate(lines, start=1):
if "_conversation_messages" in l and "PrivateAttr" in l:
lo=max(1,i-30); hi=min(len(lines), i+30)
for j in range(lo, hi+1):
print(f"{j}:{lines[j-1]}")
break
else:
print("No _conversation_messages declaration found")
PYRepository: crewAIInc/crewAI
Length of output: 3354
Fix conversation_messages typing + kickoff state handling for messages.
Flow.conversation_messagesis annotated aslist[dict[str, Any]]but returnsget_conversation_messages(self)which islist[LLMMessage](TypedDict), causing the mypy return-type mismatch at ~lines 1281-1284—align the property annotation tolist[LLMMessage](or cast to the annotated type).- Kickoff’s
filtered_inputsexcludes only("id", "user_message", "last_intent")and still passesmessagesinto_initialize_state(~lines 2621-2628). Since state validation expectsChatState.messages: list[LLMMessage](includingLLMMessage.files: dict[str, FileInput]), invalidinputs["messages"]can raise the structured validation error surfaced from_initialize_state(caught around ~2091-2092). Excludemessagesfromfiltered_inputs(or coerce it) so conversational hydration ownsstate.messages. ask()appends to conversation history only whenresponseis truthy (if response:at ~3685-3686), butask()docs state""is an intentional empty input—append whenresponse is not Noneso empty-string turns are preserved.
🧰 Tools
🪛 GitHub Actions: Run Type Checks / 1_type-checker (3.11).txt
[error] 1284-1284: mypy: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🪛 GitHub Actions: Run Type Checks / 2_type-checker (3.12).txt
[error] 1284-1284: mypy error: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🪛 GitHub Actions: Run Type Checks / 3_type-checker (3.13).txt
[error] 1284-1284: mypy: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🪛 GitHub Actions: Run Type Checks / 4_type-checker (3.10).txt
[error] 1284-1284: mypy: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🪛 GitHub Actions: Run Type Checks / type-checker (3.10)
[error] 1284-1284: mypy: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🪛 GitHub Actions: Run Type Checks / type-checker (3.11)
[error] 1284-1284: mypy: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🪛 GitHub Actions: Run Type Checks / type-checker (3.12)
[error] 1284-1284: mypy error: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]"). [return-value]
🪛 GitHub Actions: Run Type Checks / type-checker (3.13)
[error] 1284-1284: mypy: Incompatible return value type (got "list[LLMMessage]", expected "list[dict[str, Any]]") [return-value]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/flow/flow.py` around lines 1281 - 1284, Update the
Flow.conversation_messages property annotation to return list[LLMMessage]
(matching get_conversation_messages) to fix the mypy mismatch; in the kickoff
path, remove "messages" from the filtered_inputs that get passed into
_initialize_state (or coerce it to the proper ChatState.messages shape) so
initialization doesn't receive invalid inputs for ChatState.messages; and in
ask(), change the append condition from "if response:" to "if response is not
None" so empty-string responses are preserved in the conversation history.
Ensure references: conversation_messages/get_conversation_messages, kickoff
filtered_inputs -> _initialize_state, ChatState.messages, and ask()/response
check are updated accordingly.
| if response: | ||
| _append_conversation_message(self, "user", response) |
There was a problem hiding this comment.
Preserve intentional empty user inputs in conversation history.
Line 3685 uses a truthy check, so "" responses are dropped even though ask() documents empty string as intentional input.
Suggested fix
- if response:
+ if response is not None:
_append_conversation_message(self, "user", response)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/flow/flow.py` around lines 3685 - 3686, The truthy
check before appending conversation messages drops intentional empty-string
responses from ask(); change the condition in the code around
_append_conversation_message(self, "user", response) to check for None instead
(e.g., use "if response is not None:") so that "" is preserved in the
conversation history while still skipping truly missing/None responses; update
the condition where response is handled in flow.py near the call to
_append_conversation_message and ensure this behavior aligns with the documented
ask() semantics.
| def test_flow_finished_without_flow_started_warns(self, capsys) -> None: | ||
| from crewai.events.event_bus import crewai_event_bus | ||
| from crewai.events.event_context import restore_event_scope | ||
| from crewai.events.types.flow_events import FlowFinishedEvent | ||
|
|
||
| class BareFlow(Flow[ChatState]): | ||
| @start() | ||
| def begin(self) -> str: | ||
| return "ok" | ||
|
|
||
| restore_event_scope(()) | ||
| flow = BareFlow() | ||
| crewai_event_bus.emit( | ||
| flow, | ||
| FlowFinishedEvent( | ||
| type="flow_finished", | ||
| flow_name="BareFlow", | ||
| result="ok", | ||
| state={}, | ||
| ), | ||
| ) | ||
| captured = capsys.readouterr().out | ||
| assert "flow_finished" in captured | ||
| assert "Missing starting event" in captured |
There was a problem hiding this comment.
Restore event scope after the forced empty scope.
Line 337 sets restore_event_scope(()) but the prior scope is not restored, which can leak global context across tests and create order-dependent failures. Please restore the previous scope in a finally block.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/tests/test_flow_conversation.py` around lines 327 - 350, In
test_flow_finished_without_flow_started_warns save the previous event scope
returned by restore_event_scope(()) before setting the empty scope, wrap the
test body that emits FlowFinishedEvent in a try block, and in a finally call
restore_event_scope(previous) to restore the prior scope; reference the
test_flow_finished_without_flow_started_warns function and the
restore_event_scope and crewai_event_bus.emit usage so the restoration happens
even on failures.
- Added a new module for conversational flow, including classes for managing conversation state, messages, and events. - Implemented and for structured intent handling and routing. - Enhanced the class to support turn-oriented conversational applications with built-in routing and message handling. - Updated to include new classes in the public API. - Added tests to validate the functionality of the new conversational flow features.
…conversational-flows
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/crewai/src/crewai/flow/flow.py (1)
2421-2424:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPass the resolved session ID into the interactive
ask()call.Line 2423 calls
ask()without metadata, butQueueInputProviderresolves the queue frommetadata["session_id"]before falling back toflow._state.id. On the first interactive turn those IDs can differ, so a socket-backed session can block on the wrong queue until after the first kickoff updates state.Suggested fix
- line = self.ask(prompt, timeout=timeout) + line = self.ask( + prompt, + timeout=timeout, + metadata={"session_id": sid}, + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/src/crewai/flow/flow.py` around lines 2421 - 2424, The interactive loop calls ask(prompt, timeout=timeout) without passing the resolved session id, causing QueueInputProvider to possibly read metadata["session_id"] and pick the wrong queue; fix by resolving the session id before the loop (use flow._state.id fallback logic or the same resolution used by QueueInputProvider) and pass it into ask via the metadata param (e.g., ask(prompt, timeout=timeout, metadata={"session_id": resolved_session_id})), so the first interactive turn targets the correct queue; reference ask(), QueueInputProvider, metadata["session_id"], and flow._state.id when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/experimental/conversational_flow.py`:
- Around line 478-490: The route set derivation currently exposes internal
triggers (e.g., "conversation_start" and "answer_from_history") because
_valid_route_labels collects every listener label; change _valid_route_labels to
filter out internal-only labels before returning (explicitly exclude
"conversation_start" and "answer_from_history"), and ensure _effective_routes
still unions with self.builtin_routes; also if you want dynamic inclusion of
answer_from_history use the existing can_answer_from_history() check in the
callsite (do not advertise answer_from_history from _valid_route_labels). Update
references in _valid_route_labels and _effective_routes to use those filtered
labels so internal triggers cannot be returned by the router.
---
Outside diff comments:
In `@lib/crewai/src/crewai/flow/flow.py`:
- Around line 2421-2424: The interactive loop calls ask(prompt, timeout=timeout)
without passing the resolved session id, causing QueueInputProvider to possibly
read metadata["session_id"] and pick the wrong queue; fix by resolving the
session id before the loop (use flow._state.id fallback logic or the same
resolution used by QueueInputProvider) and pass it into ask via the metadata
param (e.g., ask(prompt, timeout=timeout, metadata={"session_id":
resolved_session_id})), so the first interactive turn targets the correct queue;
reference ask(), QueueInputProvider, metadata["session_id"], and flow._state.id
when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 5e2badba-cdbf-40ad-a9fa-a17242299ee7
📒 Files selected for processing (5)
lib/crewai/src/crewai/experimental/__init__.pylib/crewai/src/crewai/experimental/conversational_flow.pylib/crewai/src/crewai/flow/conversation.pylib/crewai/src/crewai/flow/flow.pylib/crewai/tests/test_flow_conversation.py
💤 Files with no reviewable changes (1)
- lib/crewai/src/crewai/flow/conversation.py
…conversational-flows
- Introduced support for deferred multi-turn tracing to maintain continuous event sequences. - Updated method to delegate to restored checkpoint flows, improving session management. - Added tests to validate the new tracing behavior and ensure correct event handling in conversational flows.
…conversational-flows
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
lib/crewai/tests/test_flow_conversation.py (1)
257-263: ⚡ Quick winAssert method events are emitted before validating parent linkage.
This test can pass even if no method events are emitted (
all(...)on an empty list). Add a non-empty assertion so regressions in method-event emission are caught.Suggested patch
assert len(flow_started_events) == 1 + assert method_events, "Expected method execution events to be emitted" assert len(sequences) == len(set(sequences)) assert all( event.parent_event_id == flow_started_events[0].event_id for event in method_events )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/crewai/tests/test_flow_conversation.py` around lines 257 - 263, The test currently checks parent linkage for method_events but doesn't assert that method_events is non-empty (so all(...) vacuously passes); update the test to first assert that method_events is non-empty (e.g., assert len(method_events) > 0 or assert method_events) before the existing all(...) check, referencing the method_events list and keeping the subsequent checks on parent_event_id against flow_started_events[0].event_id unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/ar/guides/flows/conversational-flows.mdx`:
- Around line 257-279: Correct the Arabic orthography and tanwin forms within
the described paragraph: replace "مفتوحاً" with "مفتوحًا", "بدلاً" with "بدلًا",
"فوراً" with "فورًا", "من دون" (ensure spacing/word choice as "من دون") and
"مصدَّراً" with "مُصدَّرًا" throughout the text around the examples that
reference ConversationConfig, ConversationalFlow, handle_turn,
finalize_session_traces, ChatSession, and session.close so the snippet reads
with the corrected diacritics and spacing.
---
Nitpick comments:
In `@lib/crewai/tests/test_flow_conversation.py`:
- Around line 257-263: The test currently checks parent linkage for
method_events but doesn't assert that method_events is non-empty (so all(...)
vacuously passes); update the test to first assert that method_events is
non-empty (e.g., assert len(method_events) > 0 or assert method_events) before
the existing all(...) check, referencing the method_events list and keeping the
subsequent checks on parent_event_id against flow_started_events[0].event_id
unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 161bc5cb-80f6-450f-8b20-a1948a5b47b2
📒 Files selected for processing (6)
docs/ar/guides/flows/conversational-flows.mdxdocs/en/guides/flows/conversational-flows.mdxdocs/ko/guides/flows/conversational-flows.mdxdocs/pt-BR/guides/flows/conversational-flows.mdxlib/crewai/src/crewai/flow/flow.pylib/crewai/tests/test_flow_conversation.py
✅ Files skipped from review due to trivial changes (2)
- docs/ko/guides/flows/conversational-flows.mdx
- docs/pt-BR/guides/flows/conversational-flows.mdx
🚧 Files skipped from review as they are similar to previous changes (2)
- docs/en/guides/flows/conversational-flows.mdx
- lib/crewai/src/crewai/flow/flow.py
| # Pydantic rebuilds inherited Flow annotations in this module's namespace. | ||
| # Keep the forward reference resolvable without importing crewai.context here, | ||
| # which would create a cycle while crewai.experimental is importing. | ||
| ExecutionContext = Any |
| prepare_conversational_turn, | ||
| ) | ||
| from crewai.state import CheckpointConfig | ||
| from crewai.utilities.types import LLMMessage |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 5f180de. Configure here.
| self.batch_manager.batch_owner_id = getattr( | ||
| source, "id", str(uuid.uuid4()) | ||
| ) | ||
| self._initialize_batch(user_context, execution_metadata) |
There was a problem hiding this comment.
Action events silently dropped when flow batch missing
Low Severity
In _handle_action_event, when the batch is not initialized and _try_initialize_flow_batch_from_context returns False but _nested_in_flow_execution() returns True, neither branch creates a batch. The event is still buffered via add_event (line 879) into a manager with no current_batch. If the flow's FlowStartedEvent handler hasn't fired yet (timing-dependent), these orphaned events accumulate without a batch to finalize them. In deferred-trace sessions this is eventually resolved, but for non-deferred flows the events could be lost.
Reviewed by Cursor Bugbot for commit 5f180de. Configure here.


Note
Medium Risk
Touches core Flow kickoff, tracing batch lifecycle, and event listeners; mis-finalized traces or nested-execution edge cases could affect observability, though behavior is gated by deferral flags and experimental APIs.
Overview
Adds multi-turn conversational Flows centered on
kickoff(user_message=..., session_id=...)per user line (no separatechat()API), with newcrewai.flow.conversationhelpers (ChatState,ConversationalConfig, turn prep, message history) and exports fromcrewai.flow.Introduces experimental
ConversationalFlow(crewai.experimental) with LLM routing (RouterConfig), built-inconverse/endroutes,handle_turn(), and structuredConversationState(messages, events, agent threads).Tracing gains deferred session finalization (
defer_trace_finalization,finalize_session_traces()), idempotent batch finalize, and logic so nested crews/flows and repeated turns do not steal or close the parent session batch;suppress_flow_eventsnow skips Rich console panels while traces still record.Docs: new Conversational Flows guides (en/ar/ko/pt-BR), wired in
docs.json, plus cross-links from first-flow and mastering-flow-state guides.Reviewed by Cursor Bugbot for commit 5f180de. Bugbot is set up for automated code reviews on this repo. Configure here.
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests