Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/praisonai/praisonai/agents_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,6 @@ def _run_praisonai(self, config, topic, tools_dict):
if acp_enabled or lsp_enabled:
try:
import asyncio
import os
from praisonai.cli.features.interactive_runtime import InteractiveRuntime, RuntimeConfig
from praisonai.cli.features.agent_tools import create_agent_centric_tools
import nest_asyncio
Expand Down
17 changes: 15 additions & 2 deletions src/praisonai/praisonai/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,23 @@ def _setup_langextract_observability(*, verbose: bool = False) -> None:
# Ensure sink is closed on exit to write the trace file
atexit.register(sink.close)

# Set up action-level trace emitter
# Set up action-level trace emitter (covers RouterAgent / PlanningAgent)
emitter = TraceEmitter(sink=sink, enabled=True)
set_default_emitter(emitter)


# Bridge the context emitter so regular Agent.start / tool calls / LLM
# responses are captured as well. Without this, typical single-agent
# flows produce an empty trace (no agent_start/end, no tool events).
def warn_handler(msg: str):
if verbose:
typer.echo(f"Warning: {msg}", err=True)

LangextractSink.bridge_context_events(
sink=sink,
session_id="praisonai-cli",
warn_callback=warn_handler
)

except ImportError:
# Gracefully degrade if langextract not installed
if verbose:
Expand Down
14 changes: 14 additions & 0 deletions src/praisonai/praisonai/cli/commands/langextract.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ def render(
# Set up trace emitter for the duration of the run
emitter = TraceEmitter(sink=sink, enabled=True)
set_default_emitter(emitter)

# Also bridge the context emitter so real agent runtime events
# (agent_start/end, tool_call_*, llm_response) are captured.
from praisonai.observability.langextract import LangextractSink

def warn_handler(msg: str):
# Warn user about bridge failure since this command specifically generates traces
typer.echo(f"Warning: {msg}", err=True)

LangextractSink.bridge_context_events(
sink=sink,
session_id="praisonai-langextract-render",
warn_callback=warn_handler
)

try:
# Run the workflow
Expand Down
122 changes: 122 additions & 0 deletions src/praisonai/praisonai/observability/langextract.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,88 @@
)


class _ContextToActionBridge:
"""
Adapter that implements ``ContextTraceSinkProtocol`` and forwards
``ContextEvent``s to a ``LangextractSink`` as equivalent ``ActionEvent``s.

The base agent runtime (``chat_mixin``, ``tool_execution``,
``unified_execution_mixin``) emits lifecycle events via
``ContextTraceEmitter`` only. This bridge lets the langextract sink
observe those events without touching the core SDK.
"""

__slots__ = ("_sink",)

# Subset of ContextEventType values we care about (strings to avoid
# importing ContextEventType at module load time).
_CTX_AGENT_START = "agent_start"
_CTX_AGENT_END = "agent_end"
_CTX_TOOL_START = "tool_call_start"
_CTX_TOOL_END = "tool_call_end"
_CTX_LLM_RESPONSE = "llm_response"

def __init__(self, sink: "LangextractSink") -> None:
self._sink = sink

def emit(self, event: Any) -> None: # ContextEvent duck-typed
et = getattr(event, "event_type", None)
et_value = et.value if hasattr(et, "value") else et
data = getattr(event, "data", {}) or {}
ts = getattr(event, "timestamp", 0.0)
agent = getattr(event, "agent_name", None)

if et_value == self._CTX_AGENT_START:
self._sink.emit(ActionEvent(
event_type=ActionEventType.AGENT_START.value,
timestamp=ts,
agent_name=agent,
metadata={"input": data.get("input") or data.get("goal") or ""},
))
Comment on lines +60 to +66
elif et_value == self._CTX_AGENT_END:
self._sink.emit(ActionEvent(
event_type=ActionEventType.AGENT_END.value,
timestamp=ts,
agent_name=agent,
status="ok",
))
elif et_value == self._CTX_TOOL_START:
self._sink.emit(ActionEvent(
event_type=ActionEventType.TOOL_START.value,
timestamp=ts,
agent_name=agent,
tool_name=data.get("tool_name"),
tool_args=data.get("arguments"),
))
elif et_value == self._CTX_TOOL_END:
self._sink.emit(ActionEvent(
event_type=ActionEventType.TOOL_END.value,
timestamp=ts,
agent_name=agent,
tool_name=data.get("tool_name"),
duration_ms=(data.get("duration_ms") or 0.0),
status=data.get("status") or "ok",
tool_result_summary=str(data.get("result"))[:500] if data.get("result") is not None else None,
))
elif et_value == self._CTX_LLM_RESPONSE:
# Treat LLM response as an OUTPUT event so the final text shows
# up in the rendered HTML.
content = data.get("response_content") or data.get("content") or ""
self._sink.emit(ActionEvent(
event_type=ActionEventType.OUTPUT.value,
timestamp=ts,
agent_name=agent,
tool_result_summary=content,
))

def flush(self) -> None:
pass

def close(self) -> None:
# The owning sink handles render/close — nothing to do here.
pass


@dataclass
class LangextractSinkConfig:
"""Configuration for the langextract trace sink."""
Expand Down Expand Up @@ -61,6 +143,46 @@ def __init__(self, config: Optional[LangextractSinkConfig] = None) -> None:
self._source_text: Optional[str] = None
self._closed = False

# ---- Context-emitter bridge -------------------------------------------

def context_sink(self) -> "_ContextToActionBridge":
"""
Return a ``ContextTraceSinkProtocol`` adapter that forwards core
``ContextEvent``s into this sink as ``ActionEvent``s. Use with
``praisonaiagents.trace.context_events.set_context_emitter`` (or
``trace_context``) to capture real agent runtime events.
"""
return _ContextToActionBridge(self)

@staticmethod
def bridge_context_events(sink: "LangextractSink", session_id: str, warn_callback=None) -> None:
"""
Helper method to set up context event bridging for the given sink.

Args:
sink: LangextractSink instance to bridge
session_id: Session ID for the context emitter
warn_callback: Optional callback function for warnings, called with message string
"""
try:
from praisonaiagents.trace.context_events import (
ContextTraceEmitter,
set_context_emitter,
)
context_emitter = ContextTraceEmitter(
sink=sink.context_sink(),
session_id=session_id,
enabled=True,
)
set_context_emitter(context_emitter)
except ImportError:
# Context emitter bridging is optional if not available
if warn_callback:
warn_callback("ContextTraceEmitter not available")
except Exception as e:
if warn_callback:
warn_callback(f"could not bridge context emitter: {e}")
Comment on lines +157 to +184
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unconditionally clobbers any pre-existing context emitter, and discards the reset token.

set_context_emitter(context_emitter) returns a token that callers can use with reset_context_emitter(token) to restore prior state (see src/praisonai/praisonai/recipe/core.py lines 197-206). Here the token is thrown away, which has two consequences:

  1. Silent clobbering: If another subsystem (e.g. the replay ContextTraceWriter in recipe/core.py) has already installed a context emitter, this call replaces it with no warning — events meant for replay stop being captured, or the order of setup determines who "wins". At minimum, detect an existing emitter (get_context_emitter()) and either compose or log a warning via warn_callback.
  2. No way to unwind: For tests and for any caller that wants to scope the bridge to a single run, the installed emitter is permanent for the process. Returning the token (or an "uninstall" callable) from this helper would also simplify the follow-up test that currently has to save/restore state manually.
🛡️ Suggested shape
     `@staticmethod`
-    def bridge_context_events(sink: "LangextractSink", session_id: str, warn_callback=None) -> None:
+    def bridge_context_events(sink: "LangextractSink", session_id: str, warn_callback=None):
         """
         Helper method to set up context event bridging for the given sink.
-        ...
+
+        Returns:
+            The token from ``set_context_emitter`` (or ``None`` on failure) so
+            callers can later call ``reset_context_emitter(token)``.
         """
         try:
             from praisonaiagents.trace.context_events import (
                 ContextTraceEmitter,
+                get_context_emitter,
                 set_context_emitter,
             )
+            existing = get_context_emitter()
+            if existing is not None and getattr(existing, "enabled", False) and warn_callback:
+                warn_callback(
+                    "overriding pre-existing context emitter; prior observers will stop receiving events"
+                )
             context_emitter = ContextTraceEmitter(
                 sink=sink.context_sink(),
                 session_id=session_id,
                 enabled=True,
             )
-            set_context_emitter(context_emitter)
+            return set_context_emitter(context_emitter)
         except ImportError:
             # Context emitter bridging is optional if not available
             if warn_callback:
                 warn_callback("ContextTraceEmitter not available")
+            return None
         except Exception as e:
             if warn_callback:
                 warn_callback(f"could not bridge context emitter: {e}")
+            return None
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 182-182: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/observability/langextract.py` around lines 157 - 184,
The bridge_context_events helper currently calls
set_context_emitter(context_emitter) and drops the returned token, which
silently overwrites any existing emitter and prevents restoring prior state;
update bridge_context_events to first call get_context_emitter() and if an
emitter already exists call warn_callback (or compose emitters if composition is
supported) instead of blindly clobbering, then call set_context_emitter(...) and
capture the returned token and return that token (or return an uninstall
callable that calls reset_context_emitter(token)) so callers can restore the
previous emitter; reference ContextTraceEmitter, set_context_emitter,
get_context_emitter, and reset_context_emitter in your changes and ensure
ImportError handling/warn_callback behavior remains intact.


# ---- TraceSinkProtocol -------------------------------------------------

def emit(self, event: ActionEvent) -> None:
Expand Down
96 changes: 96 additions & 0 deletions src/praisonai/tests/unit/test_langextract_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,5 +342,101 @@ def test_observe_invalid_provider_error(self):
)


class TestLangextractContextBridge:
"""Regression tests for the ContextTraceEmitter bridge.

The base agent runtime (chat_mixin, tool_execution, unified_execution_mixin)
emits ``ContextEvent``s only. Without the bridge, a single-agent run
produces zero events in the langextract sink.
"""

def test_context_sink_returns_bridge(self):
from praisonai.observability import LangextractSink
sink = LangextractSink()
bridge = sink.context_sink()
assert hasattr(bridge, "emit")
assert hasattr(bridge, "flush")
assert hasattr(bridge, "close")

def test_bridge_maps_context_events_to_action_events(self):
from praisonai.observability import LangextractSink
from praisonaiagents.trace.context_events import (
ContextEvent,
ContextEventType,
)

sink = LangextractSink()
bridge = sink.context_sink()

bridge.emit(ContextEvent(
event_type=ContextEventType.AGENT_START,
timestamp=1.0, session_id="s",
agent_name="writer",
data={"input": "Write a haiku"},
))
bridge.emit(ContextEvent(
event_type=ContextEventType.TOOL_CALL_START,
timestamp=2.0, session_id="s",
agent_name="writer",
data={"tool_name": "search", "arguments": {"q": "x"}},
))
bridge.emit(ContextEvent(
event_type=ContextEventType.TOOL_CALL_END,
timestamp=3.0, session_id="s",
agent_name="writer",
data={"tool_name": "search", "result": "ok", "duration_ms": 12.0},
))
bridge.emit(ContextEvent(
event_type=ContextEventType.LLM_RESPONSE,
timestamp=4.0, session_id="s",
agent_name="writer",
data={"response_content": "final haiku"},
))
bridge.emit(ContextEvent(
event_type=ContextEventType.AGENT_END,
timestamp=5.0, session_id="s",
agent_name="writer",
data={},
))

types = [e.event_type for e in sink._events]
assert "agent_start" in types
assert "tool_start" in types
assert "tool_end" in types
assert "output" in types
assert "agent_end" in types
assert sink._source_text == "Write a haiku"

def test_setup_observability_registers_context_emitter(self):
"""`--observe langextract` must install the bridge on the context emitter."""
import praisonai.cli.app as cli_app
from praisonaiagents.trace.context_events import get_context_emitter, set_context_emitter

previous_emitter = get_context_emitter()
try:
# Make test deterministic even when optional dependency is not installed.
Comment on lines +410 to +417
with patch("importlib.util.find_spec", return_value=object()), \
patch("atexit.register"):
cli_app._setup_langextract_observability(verbose=False)
emitter = get_context_emitter()
assert emitter.enabled, "context emitter should be enabled after setup"
finally:
set_context_emitter(previous_emitter)
assert get_context_emitter() is previous_emitter

def test_setup_observability_without_langextract_leaves_context_emitter_unchanged(self):
"""Setup should be a no-op when optional langextract dependency is unavailable."""
import praisonai.cli.app as cli_app
from praisonaiagents.trace.context_events import get_context_emitter, set_context_emitter

previous_emitter = get_context_emitter()
try:
with patch("importlib.util.find_spec", return_value=None):
cli_app._setup_langextract_observability(verbose=False)
assert get_context_emitter() is previous_emitter
finally:
set_context_emitter(previous_emitter)


if __name__ == "__main__":
pytest.main([__file__, "-v"])