Skip to content

Commit 578fa8c

Browse files
authored
Lorenze/ephemeral trace ask (#3530)
* feat(tracing): implement first-time trace handling and improve event management - Added FirstTimeTraceHandler for managing first-time user trace collection and display. - Enhanced TraceBatchManager to support ephemeral trace URLs and improved event buffering. - Updated TraceCollectionListener to utilize the new FirstTimeTraceHandler. - Refactored type annotations across multiple files for consistency and clarity. - Improved error handling and logging for trace-related operations. - Introduced utility functions for trace viewing prompts and first execution checks. * brought back crew finalize batch events * refactor(trace): move instance variables to __init__ in TraceBatchManager - Refactored TraceBatchManager to initialize instance variables in the constructor instead of as class variables. - Improved clarity and encapsulation of the class state. * fix(tracing): improve error handling in user data loading and saving - Enhanced error handling in _load_user_data and _save_user_data functions to log warnings for JSON decoding and file access issues. - Updated documentation for trace usage to clarify the addition of tracing parameters in Crew and Flow initialization. - Refined state management in Flow class to ensure proper handling of state IDs when persistence is enabled. * add some tests * fix test * fix tests * refactor(tracing): enhance user input handling for trace viewing - Replaced signal-based timeout handling with threading for user input in prompt_user_for_trace_viewing function. - Improved user experience by allowing a configurable timeout for viewing execution traces. - Updated tests to mock threading behavior and verify timeout handling correctly. * fix(tracing): improve machine ID retrieval with error handling - Added error handling to the _get_machine_id function to log warnings when retrieving the machine ID fails. - Ensured that the function continues to provide a stable, privacy-preserving machine fingerprint even in case of errors. * refactor(flow): streamline state ID assignment in Flow class - Replaced direct attribute assignment with setattr for improved flexibility in handling state IDs. - Enhanced code readability by simplifying the logic for setting the state ID when persistence is enabled.
1 parent 6f5af2b commit 578fa8c

10 files changed

+1164
-251
lines changed

src/crewai/crew.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
)
3838
from crewai.events.listeners.tracing.utils import (
3939
is_tracing_enabled,
40+
should_auto_collect_first_time_traces,
4041
)
4142
from crewai.events.types.crew_events import (
4243
CrewKickoffCompletedEvent,
@@ -89,8 +90,8 @@ class Crew(FlowTrackable, BaseModel):
8990
tasks they should perform.
9091
9192
Attributes:
92-
tasks: List of tasks assigned to the crew.
93-
agents: List of agents part of this crew.
93+
tasks: list of tasks assigned to the crew.
94+
agents: list of agents part of this crew.
9495
manager_llm: The language model that will run manager agent.
9596
manager_agent: Custom agent that will be used as manager.
9697
memory: Whether the crew should use memory to store memories of it's
@@ -238,11 +239,11 @@ class Crew(FlowTrackable, BaseModel):
238239
)
239240
task_execution_output_json_files: list[str] | None = Field(
240241
default=None,
241-
description="List of file paths for task execution JSON files.",
242+
description="list of file paths for task execution JSON files.",
242243
)
243244
execution_logs: list[dict[str, Any]] = Field(
244245
default=[],
245-
description="List of execution logs for tasks",
246+
description="list of execution logs for tasks",
246247
)
247248
knowledge_sources: list[BaseKnowledgeSource] | None = Field(
248249
default=None,
@@ -296,12 +297,16 @@ def check_config_type(cls, v: Json | dict[str, Any]) -> Json | dict[str, Any]:
296297

297298
@model_validator(mode="after")
298299
def set_private_attrs(self) -> "Crew":
299-
"""Set private attributes."""
300+
"""set private attributes."""
300301

301302
self._cache_handler = CacheHandler()
302303
event_listener = EventListener()
303304

304-
if is_tracing_enabled() or self.tracing:
305+
if (
306+
is_tracing_enabled()
307+
or self.tracing
308+
or should_auto_collect_first_time_traces()
309+
):
305310
trace_listener = TraceCollectionListener()
306311
trace_listener.setup_listeners(crewai_event_bus)
307312
event_listener.verbose = self.verbose
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import logging
2+
import uuid
3+
4+
from rich.console import Console
5+
from rich.panel import Panel
6+
7+
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
8+
from crewai.events.listeners.tracing.utils import (
9+
mark_first_execution_completed,
10+
prompt_user_for_trace_viewing,
11+
should_auto_collect_first_time_traces,
12+
)
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class FirstTimeTraceHandler:
18+
"""Handles the first-time user trace collection and display flow."""
19+
20+
def __init__(self):
21+
self.is_first_time: bool = False
22+
self.collected_events: bool = False
23+
self.trace_batch_id: str | None = None
24+
self.ephemeral_url: str | None = None
25+
self.batch_manager: TraceBatchManager | None = None
26+
27+
def initialize_for_first_time_user(self) -> bool:
28+
"""Check if this is first time and initialize collection."""
29+
self.is_first_time = should_auto_collect_first_time_traces()
30+
return self.is_first_time
31+
32+
def set_batch_manager(self, batch_manager: TraceBatchManager):
33+
"""Set reference to batch manager for sending events."""
34+
self.batch_manager = batch_manager
35+
36+
def mark_events_collected(self):
37+
"""Mark that events have been collected during execution."""
38+
self.collected_events = True
39+
40+
def handle_execution_completion(self):
41+
"""Handle the completion flow as shown in your diagram."""
42+
if not self.is_first_time or not self.collected_events:
43+
return
44+
45+
try:
46+
user_wants_traces = prompt_user_for_trace_viewing(timeout_seconds=20)
47+
48+
if user_wants_traces:
49+
self._initialize_backend_and_send_events()
50+
51+
if self.ephemeral_url:
52+
self._display_ephemeral_trace_link()
53+
54+
mark_first_execution_completed()
55+
56+
except Exception as e:
57+
self._gracefully_fail(f"Error in trace handling: {e}")
58+
mark_first_execution_completed()
59+
60+
def _initialize_backend_and_send_events(self):
61+
"""Initialize backend batch and send collected events."""
62+
if not self.batch_manager:
63+
return
64+
65+
try:
66+
if not self.batch_manager.backend_initialized:
67+
original_metadata = (
68+
self.batch_manager.current_batch.execution_metadata
69+
if self.batch_manager.current_batch
70+
else {}
71+
)
72+
73+
user_context = {
74+
"privacy_level": "standard",
75+
"user_id": "first_time_user",
76+
"session_id": str(uuid.uuid4()),
77+
"trace_id": self.batch_manager.trace_batch_id,
78+
}
79+
80+
execution_metadata = {
81+
"execution_type": original_metadata.get("execution_type", "crew"),
82+
"crew_name": original_metadata.get(
83+
"crew_name", "First Time Execution"
84+
),
85+
"flow_name": original_metadata.get("flow_name"),
86+
"agent_count": original_metadata.get("agent_count", 1),
87+
"task_count": original_metadata.get("task_count", 1),
88+
"crewai_version": original_metadata.get("crewai_version"),
89+
}
90+
91+
self.batch_manager._initialize_backend_batch(
92+
user_context=user_context,
93+
execution_metadata=execution_metadata,
94+
use_ephemeral=True,
95+
)
96+
self.batch_manager.backend_initialized = True
97+
98+
if self.batch_manager.event_buffer:
99+
self.batch_manager._send_events_to_backend()
100+
101+
self.batch_manager.finalize_batch()
102+
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
103+
104+
if not self.ephemeral_url:
105+
self._show_local_trace_message()
106+
107+
except Exception as e:
108+
self._gracefully_fail(f"Backend initialization failed: {e}")
109+
110+
def _display_ephemeral_trace_link(self):
111+
"""Display the ephemeral trace link to the user."""
112+
console = Console()
113+
114+
panel_content = f"""
115+
🎉 Your First CrewAI Execution Trace is Ready!
116+
117+
View your execution details here:
118+
{self.ephemeral_url}
119+
120+
This trace shows:
121+
• Agent decisions and interactions
122+
• Task execution timeline
123+
• Tool usage and results
124+
• LLM calls and responses
125+
126+
To use traces add tracing=True to your Crew(tracing=True) / Flow(tracing=True)
127+
128+
📝 Note: This link will expire in 24 hours.
129+
""".strip()
130+
131+
panel = Panel(
132+
panel_content,
133+
title="🔍 Execution Trace Generated",
134+
border_style="bright_green",
135+
padding=(1, 2),
136+
)
137+
138+
console.print("\n")
139+
console.print(panel)
140+
console.print()
141+
142+
def _gracefully_fail(self, error_message: str):
143+
"""Handle errors gracefully without disrupting user experience."""
144+
console = Console()
145+
console.print(f"[yellow]Note: {error_message}[/yellow]")
146+
147+
logger.debug(f"First-time trace error: {error_message}")
148+
149+
def _show_local_trace_message(self):
150+
"""Show message when traces were collected locally but couldn't be uploaded."""
151+
console = Console()
152+
153+
panel_content = f"""
154+
📊 Your execution traces were collected locally!
155+
156+
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
157+
{len(self.batch_manager.event_buffer)} trace events
158+
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
159+
• Batch ID: {self.batch_manager.trace_batch_id}
160+
161+
The traces include agent decisions, task execution, and tool usage.
162+
Try running with CREWAI_TRACING_ENABLED=true next time for persistent traces.
163+
""".strip()
164+
165+
panel = Panel(
166+
panel_content,
167+
title="🔍 Local Traces Collected",
168+
border_style="yellow",
169+
padding=(1, 2),
170+
)
171+
172+
console.print("\n")
173+
console.print(panel)
174+
console.print()

0 commit comments

Comments
 (0)