Skip to content

Commit 2c7cbdf

Browse files
fix: eliminate streaming pause caused by synchronous telemetry tracking
- Add async_mode parameter to track_agent_execution() to enable background PostHog capture - Update telemetry integration to auto-detect streaming agents and use async mode - Maintains backward compatibility for non-streaming agents - Resolves pause issue after "execution tracked: success=True" message in streaming workflows Fixes #1031 Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
1 parent ab8932b commit 2c7cbdf

File tree

2 files changed

+50
-17
lines changed

2 files changed

+50
-17
lines changed

src/praisonai-agents/praisonaiagents/telemetry/integration.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@ def instrument_agent(agent: 'Agent', telemetry: Optional['MinimalTelemetry'] = N
4444
def instrumented_chat(*args, **kwargs):
4545
try:
4646
result = original_chat(*args, **kwargs)
47-
telemetry.track_agent_execution(agent.name, success=True)
47+
# Detect if agent is in streaming mode to prevent blocking
48+
is_streaming = getattr(agent, 'stream', False)
49+
telemetry.track_agent_execution(agent.name, success=True, async_mode=is_streaming)
4850
return result
4951
except Exception as e:
50-
telemetry.track_agent_execution(agent.name, success=False)
52+
# Always use async mode for error tracking to avoid further delays
53+
telemetry.track_agent_execution(agent.name, success=False, async_mode=True)
5154
telemetry.track_error(type(e).__name__)
5255
raise
5356

@@ -59,10 +62,13 @@ def instrumented_chat(*args, **kwargs):
5962
def instrumented_start(*args, **kwargs):
6063
try:
6164
result = original_start(*args, **kwargs)
62-
telemetry.track_agent_execution(agent.name, success=True)
65+
# Detect if agent is in streaming mode to prevent blocking
66+
is_streaming = getattr(agent, 'stream', False)
67+
telemetry.track_agent_execution(agent.name, success=True, async_mode=is_streaming)
6368
return result
6469
except Exception as e:
65-
telemetry.track_agent_execution(agent.name, success=False)
70+
# Always use async mode for error tracking to avoid further delays
71+
telemetry.track_agent_execution(agent.name, success=False, async_mode=True)
6672
telemetry.track_error(type(e).__name__)
6773
raise
6874

@@ -74,10 +80,13 @@ def instrumented_start(*args, **kwargs):
7480
def instrumented_run(*args, **kwargs):
7581
try:
7682
result = original_run(*args, **kwargs)
77-
telemetry.track_agent_execution(agent.name, success=True)
83+
# Detect if agent is in streaming mode to prevent blocking
84+
is_streaming = getattr(agent, 'stream', False)
85+
telemetry.track_agent_execution(agent.name, success=True, async_mode=is_streaming)
7886
return result
7987
except Exception as e:
80-
telemetry.track_agent_execution(agent.name, success=False)
88+
# Always use async mode for error tracking to avoid further delays
89+
telemetry.track_agent_execution(agent.name, success=False, async_mode=True)
8190
telemetry.track_error(type(e).__name__)
8291
raise
8392

src/praisonai-agents/praisonaiagents/telemetry/telemetry.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,32 +119,56 @@ def _get_framework_version(self) -> str:
119119
except (ImportError, KeyError, AttributeError):
120120
return "unknown"
121121

122-
def track_agent_execution(self, agent_name: str = None, success: bool = True):
122+
def track_agent_execution(self, agent_name: str = None, success: bool = True, async_mode: bool = False):
123123
"""
124124
Track an agent execution event.
125125
126126
Args:
127127
agent_name: Name of the agent (not logged, just for counting)
128128
success: Whether the execution was successful
129+
async_mode: If True, defer PostHog capture to prevent blocking in streaming scenarios
129130
"""
130131
if not self.enabled:
131132
return
132133

133134
with self._metrics_lock:
134135
self._metrics["agent_executions"] += 1
135136

137+
# Always log immediately for debugging
138+
self.logger.debug(f"Agent execution tracked: success={success}")
139+
136140
# Send event to PostHog
137141
if self._posthog:
138-
self._posthog.capture(
139-
distinct_id=self.session_id,
140-
event='agent_execution',
141-
properties={
142-
'success': success,
143-
'session_id': self.session_id
144-
}
145-
)
146-
147-
self.logger.debug(f"Agent execution tracked: success={success}")
142+
if async_mode:
143+
# Use a background thread to prevent blocking streaming responses
144+
def _async_capture():
145+
try:
146+
self._posthog.capture(
147+
distinct_id=self.session_id,
148+
event='agent_execution',
149+
properties={
150+
'success': success,
151+
'session_id': self.session_id
152+
}
153+
)
154+
except Exception as e:
155+
# Silently handle any telemetry errors to avoid disrupting user experience
156+
self.logger.debug(f"Async PostHog capture error: {e}")
157+
158+
# Execute in background thread with daemon flag for clean shutdown
159+
import threading
160+
thread = threading.Thread(target=_async_capture, daemon=True)
161+
thread.start()
162+
else:
163+
# Synchronous capture for backward compatibility
164+
self._posthog.capture(
165+
distinct_id=self.session_id,
166+
event='agent_execution',
167+
properties={
168+
'success': success,
169+
'session_id': self.session_id
170+
}
171+
)
148172

149173
def track_task_completion(self, task_name: str = None, success: bool = True):
150174
"""

0 commit comments

Comments
 (0)