Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
434 changes: 434 additions & 0 deletions docs/ar/guides/flows/conversational-flows.mdx

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/ar/guides/flows/first-flow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ crewai flow plot
3. استكشف دوال `and_` و`or_` لتنفيذ متوازٍ أكثر تعقيدًا
4. اربط Flow بواجهات API خارجية وقواعد بيانات وواجهات مستخدم
5. ادمج عدة Crews متخصصة في Flow واحد
6. أنشئ تطبيقات دردشة متعددة الجولات مع [تدفقات المحادثة](/ar/guides/flows/conversational-flows) (`kickoff` لكل رسالة، `ChatSession`، تأجيل التتبع)

<Check>
تهانينا! لقد بنيت بنجاح أول CrewAI Flow يجمع بين الكود العادي واستدعاءات LLM المباشرة ومعالجة Crew لإنشاء دليل شامل. هذه المهارات الأساسية تمكّنك من إنشاء تطبيقات AI متطورة بشكل متزايد.
Expand Down
2 changes: 2 additions & 0 deletions docs/ar/guides/flows/mastering-flow-state.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mode: "wide"
5. **توسيع تطبيقاتك** - دعم سير العمل المعقدة بتنظيم بيانات مناسب
6. **تمكين التطبيقات الحوارية** - تخزين والوصول إلى سجل المحادثات للتفاعلات الواعية بالسياق

للدردشة متعددة الجولات (`kickoff` لكل سطر مستخدم، `ChatState`، توجيه النية، تأجيل التتبع، و`ChatSession`)، راجع [تدفقات المحادثة](/ar/guides/flows/conversational-flows).

## أساسيات إدارة الحالة

### نهجان لإدارة الحالة
Expand Down
56 changes: 56 additions & 0 deletions docs/docs.json

Large diffs are not rendered by default.

437 changes: 437 additions & 0 deletions docs/en/guides/flows/conversational-flows.mdx

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/en/guides/flows/first-flow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ Now that you've built your first flow, you can:
3. Explore the `and_` and `or_` functions for more complex parallel execution
4. Connect your flow to external APIs, databases, or user interfaces
5. Combine multiple specialized crews in a single flow
6. Build multi-turn chat apps with [Conversational Flows](/en/guides/flows/conversational-flows) (`kickoff` per message, `ChatSession`, deferred tracing)

<Check>
Congratulations! You've successfully built your first CrewAI Flow that combines regular code, direct LLM calls, and crew-based processing to create a comprehensive guide. These foundational skills enable you to create increasingly sophisticated AI applications that can tackle complex, multi-stage problems through a combination of procedural control and collaborative intelligence.
Expand Down
2 changes: 2 additions & 0 deletions docs/en/guides/flows/mastering-flow-state.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Effective state management enables you to:
5. **Scale your applications** - Support complex workflows with proper data organization
6. **Enable conversational applications** - Store and access conversation history for context-aware AI interactions

For multi-turn chat (`kickoff` per user line, `ChatState`, intent routing, deferred tracing, and `ChatSession`), see [Conversational Flows](/en/guides/flows/conversational-flows).

Let's explore how to leverage these capabilities effectively.

## State Management Fundamentals
Expand Down
437 changes: 437 additions & 0 deletions docs/ko/guides/flows/conversational-flows.mdx

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/ko/guides/flows/first-flow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ result = ContentCrew().crew().kickoff(inputs={
3. 더 복잡한 병렬 실행을 위해 `and_` 및 `or_` 함수를 탐색해 보세요.
4. flow를 외부 API, 데이터베이스 또는 사용자 인터페이스에 연결해 보세요.
5. 여러 전문화된 crew를 하나의 flow에서 결합해 보세요.
6. [대화형 Flow](/ko/guides/flows/conversational-flows)로 멀티턴 채팅 앱 구축 (`kickoff` per message, `ChatSession`, 지연 트레이싱)

<Check>
축하합니다! 정규 코드, 직접적인 LLM 호출, crew 기반 처리를 결합하여 포괄적인 가이드를 생성하는 첫 번째 CrewAI Flow를 성공적으로 구축하셨습니다. 이러한 기초적인 역량을 바탕으로 절차적 제어와 협업적 인텔리전스를 결합하여 복잡하고 다단계의 문제를 해결할 수 있는 점점 더 정교한 AI 애플리케이션을 만들 수 있습니다.
Expand Down
2 changes: 2 additions & 0 deletions docs/ko/guides/flows/mastering-flow-state.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ State 관리는 모든 고급 AI 워크플로우의 중추입니다. CrewAI Flow
5. **애플리케이션 확장** - 적절한 데이터 조직을 통해 복잡한 워크플로를 지원할 수 있습니다.
6. **대화형 애플리케이션 활성화** - 컨텍스트 기반 AI 상호작용을 위해 대화 내역을 저장하고 접근할 수 있습니다.

멀티턴 채팅(`kickoff` per user line, `ChatState`, 의도 라우팅, 지연 트레이싱, `ChatSession`)은 [대화형 Flow](/ko/guides/flows/conversational-flows)를 참고하세요.

이러한 기능을 효과적으로 활용하는 방법을 살펴보겠습니다.

## 상태 관리 기본 사항
Expand Down
437 changes: 437 additions & 0 deletions docs/pt-BR/guides/flows/conversational-flows.mdx

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/pt-BR/guides/flows/first-flow.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ Agora que você construiu seu primeiro flow, pode:
3. Explorar as funções `and_` e `or_` para execuções paralelas e mais complexas
4. Conectar seu flow a APIs externas, bancos de dados ou interfaces de usuário
5. Combinar múltiplos crews especializados em um único flow
6. Criar apps de chat multi-turn com [Flows conversacionais](/pt-BR/guides/flows/conversational-flows) (`kickoff` por mensagem, `ChatSession`, tracing adiado)

<Check>
Parabéns! Você construiu seu primeiro CrewAI Flow que combina código regular, chamadas diretas a LLM e processamento baseado em crews para criar um guia abrangente. Essas habilidades fundamentais permitem criar aplicações de IA cada vez mais sofisticadas, capazes de resolver problemas complexos de múltiplas etapas por meio de controle procedural e inteligência colaborativa.
Expand Down
2 changes: 2 additions & 0 deletions docs/pt-BR/guides/flows/mastering-flow-state.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Um gerenciamento de estado efetivo possibilita que você:
5. **Escalone suas aplicações** – Ofereça suporte a workflows complexos com organização apropriada dos dados
6. **Habilite aplicações conversacionais** – Armazene e acesse o histórico da conversa para interações de IA com contexto

Para chat multi-turn (`kickoff` por linha do usuário, `ChatState`, roteamento por intenção, tracing adiado e `ChatSession`), veja [Flows conversacionais](/pt-BR/guides/flows/conversational-flows).

Vamos explorar como aproveitar essas capacidades de forma eficiente.

## Fundamentos do Gerenciamento de Estado
Expand Down
22 changes: 14 additions & 8 deletions lib/crewai/src/crewai/events/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,29 +306,35 @@ def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.formatter.handle_flow_created(event.flow_name, str(source.flow_id))
self.formatter.handle_flow_started(event.flow_name, str(source.flow_id))
if not getattr(source, "suppress_flow_events", False):
self.formatter.handle_flow_created(event.flow_name, str(source.flow_id))
self.formatter.handle_flow_started(event.flow_name, str(source.flow_id))

@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self.formatter.handle_flow_status(
event.flow_name,
source.flow_id,
)
if not getattr(source, "suppress_flow_events", False):
self.formatter.handle_flow_status(
event.flow_name,
source.flow_id,
)

@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(
_: Any, event: MethodExecutionStartedEvent
source: Any, event: MethodExecutionStartedEvent
) -> None:
if getattr(source, "suppress_flow_events", False):
return
self.formatter.handle_method_status(
event.method_name,
"running",
)

@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(
_: Any, event: MethodExecutionFinishedEvent
source: Any, event: MethodExecutionFinishedEvent
) -> None:
if getattr(source, "suppress_flow_events", False):
return
Comment thread
cursor[bot] marked this conversation as resolved.
self.formatter.handle_method_status(
event.method_name,
"completed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ def _reset_batch_state(self) -> None:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.defer_session_finalization = False
self.batch_manager._batch_finalized = False
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def __init__(self) -> None:
self.execution_start_times: dict[str, datetime] = {}
self.batch_owner_type: str | None = None
self.batch_owner_id: str | None = None
self.defer_session_finalization: bool = False
self._batch_finalized: bool = False
self.backend_initialized: bool = False
self.ephemeral_trace_url: str | None = None
try:
Expand Down Expand Up @@ -101,6 +103,7 @@ def initialize_batch(
user_context=user_context, execution_metadata=execution_metadata
)
self.is_current_batch_ephemeral = use_ephemeral
self._batch_finalized = False

self.record_start_time("execution")

Expand Down Expand Up @@ -312,6 +315,9 @@ def _send_events_to_backend(self) -> int:
def finalize_batch(self) -> TraceBatch | None:
"""Finalize batch and return it for sending"""

if self._batch_finalized:
return None

if not self.current_batch or not is_tracing_enabled_in_context():
return None

Expand Down Expand Up @@ -340,10 +346,8 @@ def finalize_batch(self) -> TraceBatch | None:
self.current_batch.events = sorted_events
events_sent_count = len(sorted_events)
if sorted_events:
original_buffer = self.event_buffer
self.event_buffer = sorted_events
events_sent_to_backend_status = self._send_events_to_backend()
self.event_buffer = original_buffer
Comment thread
cursor[bot] marked this conversation as resolved.
if events_sent_to_backend_status == 500 and self.trace_batch_id:
self._mark_batch_as_failed(
self.trace_batch_id, "Error sending events to backend"
Expand All @@ -360,6 +364,7 @@ def finalize_batch(self) -> TraceBatch | None:
self.event_buffer.clear()
self.trace_batch_id = None
self.is_current_batch_ephemeral = False
self._batch_finalized = True

self._cleanup_batch_data()

Expand All @@ -371,7 +376,7 @@ def _finalize_backend_batch(self, events_count: int = 0) -> None:
Args:
events_count: Number of events that were successfully sent
"""
if not self.plus_api or not self.trace_batch_id:
if self._batch_finalized or not self.plus_api or not self.trace_batch_id:
return

try:
Expand All @@ -390,6 +395,7 @@ def _finalize_backend_batch(self, events_count: int = 0) -> None:
)

if response.status_code == 200:
self._batch_finalized = True
access_code = response.json().get("access_code", None)
console = Console()
settings = Settings()
Expand Down
105 changes: 89 additions & 16 deletions lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Trace collection listener for orchestrating trace collection."""

from datetime import datetime, timezone
import os
from typing import Any, ClassVar
import uuid
Expand Down Expand Up @@ -230,7 +231,15 @@ def on_flow_created(source: Any, event: FlowCreatedEvent) -> None:

@event_bus.on(FlowStartedEvent)
def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
# Always call _initialize_flow_batch to claim ownership.
# Nested flows (e.g. AgentExecutor inside a conversational Flow) must
# not re-claim an open session batch owned by the parent kickoff.
if (
self.batch_manager.defer_session_finalization
and self.batch_manager.is_batch_initialized()
and self.batch_manager.batch_owner_type == "flow"
):
self._handle_trace_event("flow_started", source, event)
return
# If batch was already initialized by a concurrent action event
# (race condition), initialize_batch() returns early but
# batch_owner_type is still correctly set to "flow".
Expand Down Expand Up @@ -264,18 +273,20 @@ def _register_context_event_handlers(self, event_bus: CrewAIEventsBus) -> None:

@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
if self.batch_manager.batch_owner_type != "flow":
# Always call _initialize_crew_batch to claim ownership.
# If batch was already initialized by a concurrent action event
# (e.g. LLM/tool before crew_kickoff_started), initialize_batch()
# returns early but batch_owner_type is still correctly set to "crew".
# Skip only when a parent flow already owns the batch.
# Nested crew inside Flow.kickoff: never claim an existing flow session batch.
if not self._nested_in_flow_execution() and (
not self.batch_manager.is_batch_initialized()
):
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)

@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.defer_session_finalization:
return
if self._nested_in_flow_execution():
return
if self.batch_manager.batch_owner_type == "crew":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
Expand All @@ -286,10 +297,14 @@ def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self.batch_manager.defer_session_finalization:
return
if self._nested_in_flow_execution():
return
Comment thread
cursor[bot] marked this conversation as resolved.
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
elif self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch()

@event_bus.on(TaskStartedEvent)
Expand Down Expand Up @@ -707,8 +722,32 @@ def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
@on_signal
def handle_signal(source: Any, event: SignalEvent) -> None:
"""Flush trace batch on system signals to prevent data loss."""
if self.batch_manager.is_batch_initialized():
self.batch_manager.finalize_batch()
if not self.batch_manager.is_batch_initialized():
return
# Multi-turn flows defer batch finalization to finalize_session_traces().
if self.batch_manager.defer_session_finalization:
return
self.batch_manager.finalize_batch()

@staticmethod
def _is_inside_active_flow_context() -> bool:
"""True when ``kickoff_async`` has set ``current_flow_id`` (nested crew)."""
from crewai.flow.flow_context import current_flow_id

return current_flow_id.get() is not None

def _flow_owns_trace_batch(self) -> bool:
"""True when an in-flight conversational flow already owns the trace batch."""
if self.batch_manager.batch_owner_type == "flow":
return True
batch = self.batch_manager.current_batch
if batch is not None:
return batch.execution_metadata.get("execution_type") == "flow"
return False

def _nested_in_flow_execution(self) -> bool:
"""True when a crew runs inside a flow session (context or batch ownership)."""
return self._is_inside_active_flow_context() or self._flow_owns_trace_batch()

def _initialize_crew_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch.
Expand All @@ -729,6 +768,33 @@ def _initialize_crew_batch(self, source: Any, event: BaseEvent) -> None:

self._initialize_batch(user_context, execution_metadata)

def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
"""Claim a flow trace batch when an action event fires inside kickoff.

When ``suppress_flow_events=True``, console panels are hidden but
``FlowStartedEvent`` and method lifecycle events still emit; if no
batch exists yet, LLM/tool events must not fall back to implicit crew
batches.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name

flow_id = current_flow_id.get()
if flow_id is None:
return False

started_at = getattr(event, "timestamp", None) or datetime.now(timezone.utc)
user_context = self._get_user_context()
execution_metadata = {
"flow_name": current_flow_name.get() or "Unknown Flow",
"execution_start": started_at,
"crewai_version": get_crewai_version(),
"execution_type": "flow",
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = flow_id
self._initialize_batch(user_context, execution_metadata)
return True

def _initialize_flow_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch for Flow execution.

Expand Down Expand Up @@ -793,12 +859,19 @@ def _handle_action_event(self, event_type: str, source: Any, event: Any) -> None
event: Event object.
"""
if not self.batch_manager.is_batch_initialized():
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self._initialize_batch(user_context, execution_metadata)
if self._try_initialize_flow_batch_from_context(event):
pass
elif not self._nested_in_flow_execution():
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self.batch_manager.batch_owner_type = "crew"
self.batch_manager.batch_owner_id = getattr(
source, "id", str(uuid.uuid4())
)
self._initialize_batch(user_context, execution_metadata)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action events silently dropped when flow batch missing

Low Severity

In _handle_action_event, when the batch is not initialized and _try_initialize_flow_batch_from_context returns False but _nested_in_flow_execution() returns True, neither branch creates a batch. The event is still buffered via add_event (line 879) into a manager with no current_batch. If the flow's FlowStartedEvent handler hasn't fired yet (timing-dependent), these orphaned events accumulate without a batch to finalize them. In deferred-trace sessions this is eventually resolved, but for non-deferred flows the events could be lost.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 5f180de. Configure here.


self.batch_manager.begin_event_processing()
try:
Expand Down
16 changes: 16 additions & 0 deletions lib/crewai/src/crewai/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
from crewai.experimental.agent_executor import AgentExecutor, CrewAgentExecutorFlow
from crewai.experimental.conversational_flow import (
AgentMessage,
ConversationConfig,
ConversationEvent,
ConversationMessage,
ConversationState,
ConversationalFlow,
RouterConfig,
)
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
Expand All @@ -24,7 +33,13 @@
"AgentEvaluationResult",
"AgentEvaluator",
"AgentExecutor",
"AgentMessage",
"BaseEvaluator",
"ConversationConfig",
"ConversationEvent",
"ConversationMessage",
"ConversationState",
"ConversationalFlow",
"CrewAgentExecutorFlow", # Deprecated alias for AgentExecutor
"EvaluationScore",
"EvaluationTraceCallback",
Expand All @@ -35,6 +50,7 @@
"MetricCategory",
"ParameterExtractionEvaluator",
"ReasoningEfficiencyEvaluator",
"RouterConfig",
"SemanticQualityEvaluator",
"ToolInvocationEvaluator",
"ToolSelectionEvaluator",
Expand Down
Loading
Loading