Add tracking for tool execs, meetings sessions and errors. Stop agent…#3
Add tracking for tool execs, meetings sessions and errors. Stop agent…#3
Conversation
… on 'leave' tool call.
There was a problem hiding this comment.
Pull request overview
This PR adds comprehensive Datadog tracking for tool executions, LLM calls, meeting sessions, and errors, along with implementing agent shutdown when the 'leave_meeting' tool is called.
Key Changes:
- Introduced new context managers for tracking workflows, agents, tools, tasks, and LLM calls with Datadog LLM Observability
- Added metric tracking system for tool executions, LLM token usage, and errors
- Implemented meeting termination detection via tool callback to gracefully shutdown the agent when
leave_meetingis called
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
joinly/utils/datadog.py |
Major refactoring to add context managers for tracking different operation types (workflow, agent, tool, task, LLM), metrics tracking functions, and error tracking with Datadog LLM Observability |
joinly/session.py |
Updated to use new track_task context manager instead of legacy create_span API for join_meeting and speak_text operations |
joinly/server.py |
Migrated all MCP tool endpoints to use new tracking context managers (track_meeting_join, track_meeting_leave, track_speech, track_tool) with error tracking |
client/joinly_client/main.py |
Added event-driven meeting termination logic with callback to detect leave_meeting tool calls and shutdown agent gracefully |
client/joinly_client/datadog.py |
Duplicates tracking functionality from server's datadog.py with same context managers and metrics functions |
client/joinly_client/agent.py |
Refactored agent loop to track entire conversations as workflows, individual iterations as agent spans, and tool/LLM calls with metrics collection |
.pre-commit-config.yaml |
Commented out ruff formatter from pre-commit hooks |
Comments suppressed due to low confidence (1)
client/joinly_client/main.py:2
- Import of 'contextlib' is not used.
import contextlib
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -1,4 +1,5 @@ | |||
| import asyncio | |||
| import contextlib | |||
There was a problem hiding this comment.
The contextlib import on line 2 is unused. It was added in this PR but is never referenced in the code.
| import contextlib |
| # - repo: https://github.com/astral-sh/ruff-pre-commit | ||
| # rev: v0.11.6 | ||
| # hooks: | ||
| # # - id: ruff # Disabled - too strict for optional library handling | ||
| # - id: ruff-format |
There was a problem hiding this comment.
The ruff formatter has been commented out. This removes automatic code formatting from the pre-commit hooks, which can lead to inconsistent code style. If there are specific issues with ruff formatting that need to be addressed, consider configuring ruff to skip problematic rules rather than disabling formatting entirely.
| # - repo: https://github.com/astral-sh/ruff-pre-commit | |
| # rev: v0.11.6 | |
| # hooks: | |
| # # - id: ruff # Disabled - too strict for optional library handling | |
| # - id: ruff-format | |
| - repo: https://github.com/astral-sh/ruff-pre-commit | |
| rev: v0.11.6 | |
| hooks: | |
| # - id: ruff # Disabled - too strict for optional library handling | |
| - id: ruff-format |
| # Global state for tracking | ||
| _llmobs_enabled = False | ||
| _metrics: dict[str, float] = {} |
There was a problem hiding this comment.
Using module-level mutable global state (_llmobs_enabled and _metrics) can lead to issues in testing and concurrent scenarios. Consider encapsulating this state in a class or using a context manager pattern to make the state management more explicit and testable.
| def _patch_libraries(patch: Any) -> None: # noqa: ANN401 | ||
| """Patch common libraries for auto-instrumentation.""" | ||
| # OpenAI for LLM calls | ||
| for lib in ("openai", "anthropic", "httpx", "aiohttp"): | ||
| try: | ||
| patch(**{lib: True}) | ||
| logger.debug("Patched %s for Datadog", lib) | ||
| except Exception: # noqa: BLE001 | ||
| logger.debug("%s patching skipped", lib) | ||
|
|
||
|
|
||
| def is_llmobs_enabled() -> bool: | ||
| """Check if LLM Observability is enabled.""" | ||
| return _llmobs_enabled | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # LLM Observability Span Context Managers | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| @contextmanager | ||
| def track_workflow( | ||
| name: str, | ||
| *, | ||
| session_id: str | None = None, | ||
| metadata: dict[str, Any] | None = None, | ||
| ) -> Generator[dict[str, Any], None, None]: | ||
| """Track a workflow span (e.g., conversation session). | ||
|
|
||
| Args: | ||
| name: Name of the workflow | ||
| session_id: Optional session ID to link related spans | ||
| metadata: Optional metadata to attach | ||
|
|
||
| Yields: | ||
| A dict that can be updated with output_data and additional metadata. | ||
| """ | ||
| context: dict[str, Any] = { | ||
| "start_time": time.time(), | ||
| "metadata": metadata or {}, | ||
| "output_data": None, | ||
| "error": None, | ||
| "cancelled": False, | ||
| } | ||
|
|
||
| if not _llmobs_enabled: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError: | ||
| context["cancelled"] = True | ||
| raise | ||
| return | ||
|
|
||
| cancelled_error: asyncio.CancelledError | None = None | ||
| try: | ||
| patch(openai=True) | ||
| logger.debug("Patched OpenAI for Datadog") | ||
| except Exception: | ||
| logger.debug("OpenAI patching skipped") | ||
| from ddtrace.llmobs import LLMObs | ||
|
|
||
| with LLMObs.workflow(name=name, session_id=session_id) as span: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| context["metadata"]["cancelled"] = True | ||
| cancelled_error = e | ||
| except Exception as e: | ||
| context["error"] = e | ||
| raise | ||
| finally: | ||
| duration = time.time() - context["start_time"] | ||
| context["metadata"]["duration_seconds"] = round(duration, 3) | ||
|
|
||
| LLMObs.annotate( | ||
| span=span, | ||
| metadata=context["metadata"], | ||
| output_data=context.get("output_data"), | ||
| ) | ||
| except ImportError: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| cancelled_error = e | ||
|
|
||
| if cancelled_error is not None: | ||
| raise cancelled_error | ||
|
|
||
|
|
||
| # Anthropic for LLM calls | ||
| @contextmanager | ||
| def track_agent( | ||
| name: str = "agent", | ||
| *, | ||
| session_id: str | None = None, | ||
| metadata: dict[str, Any] | None = None, | ||
| ) -> Generator[dict[str, Any], None, None]: | ||
| """Track an agent span (e.g., agent conversation run). | ||
|
|
||
| Args: | ||
| name: Name of the agent operation | ||
| session_id: Optional session ID to link related spans | ||
| metadata: Optional metadata to attach | ||
|
|
||
| Yields: | ||
| A dict that can be updated with output_data and additional metadata. | ||
| """ | ||
| context: dict[str, Any] = { | ||
| "start_time": time.time(), | ||
| "metadata": metadata or {}, | ||
| "output_data": None, | ||
| "input_data": None, | ||
| "error": None, | ||
| "cancelled": False, | ||
| } | ||
|
|
||
| if not _llmobs_enabled: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError: | ||
| context["cancelled"] = True | ||
| raise | ||
| return | ||
|
|
||
| cancelled_error: asyncio.CancelledError | None = None | ||
| try: | ||
| patch(anthropic=True) | ||
| logger.debug("Patched Anthropic for Datadog") | ||
| except Exception: | ||
| logger.debug("Anthropic patching skipped") | ||
| from ddtrace.llmobs import LLMObs | ||
|
|
||
| with LLMObs.agent(name=name, session_id=session_id) as span: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| context["metadata"]["cancelled"] = True | ||
| cancelled_error = e | ||
| except Exception as e: | ||
| context["error"] = e | ||
| raise | ||
| finally: | ||
| duration = time.time() - context["start_time"] | ||
| context["metadata"]["duration_seconds"] = round(duration, 3) | ||
|
|
||
| LLMObs.annotate( | ||
| span=span, | ||
| input_data=context.get("input_data"), | ||
| output_data=context.get("output_data"), | ||
| metadata=context["metadata"], | ||
| ) | ||
| except ImportError: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| cancelled_error = e | ||
|
|
||
| if cancelled_error is not None: | ||
| raise cancelled_error | ||
|
|
||
| # httpx for async HTTP (used by Google/Anthropic providers in pydantic-ai) | ||
|
|
||
| @contextmanager | ||
| def track_tool( | ||
| name: str, | ||
| *, | ||
| arguments: dict[str, Any] | None = None, | ||
| metadata: dict[str, Any] | None = None, | ||
| ) -> Generator[dict[str, Any], None, None]: | ||
| """Track a tool execution span. | ||
|
|
||
| Args: | ||
| name: Name of the tool being executed | ||
| arguments: Tool input arguments | ||
| metadata: Optional metadata to attach | ||
|
|
||
| Yields: | ||
| A dict that can be updated with output_data and additional metadata. | ||
| """ | ||
| context: dict[str, Any] = { | ||
| "start_time": time.time(), | ||
| "metadata": metadata or {}, | ||
| "output_data": None, | ||
| "error": None, | ||
| "cancelled": False, | ||
| } | ||
|
|
||
| if not _llmobs_enabled: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError: | ||
| context["cancelled"] = True | ||
| raise | ||
| return | ||
|
|
||
| cancelled_error: asyncio.CancelledError | None = None | ||
| try: | ||
| patch(httpx=True) | ||
| logger.debug("Patched httpx for Datadog") | ||
| except Exception: | ||
| logger.debug("httpx patching skipped") | ||
| from ddtrace.llmobs import LLMObs | ||
|
|
||
| with LLMObs.tool(name=name) as span: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| context["metadata"]["cancelled"] = True | ||
| context["output_data"] = "Tool cancelled" | ||
| cancelled_error = e | ||
| except Exception as e: | ||
| context["error"] = e | ||
| context["metadata"]["error"] = True | ||
| context["metadata"]["error.type"] = type(e).__name__ | ||
| context["metadata"]["error.message"] = str(e) | ||
| raise | ||
| finally: | ||
| duration = time.time() - context["start_time"] | ||
| context["metadata"]["duration_seconds"] = round(duration, 3) | ||
|
|
||
| # Increment tool counter | ||
| increment_metric("tools.executed") | ||
|
|
||
| LLMObs.annotate( | ||
| span=span, | ||
| input_data=arguments, | ||
| output_data=context.get("output_data"), | ||
| metadata=context["metadata"], | ||
| ) | ||
| except ImportError: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| cancelled_error = e | ||
|
|
||
| if cancelled_error is not None: | ||
| raise cancelled_error | ||
|
|
||
|
|
||
| @contextmanager | ||
| def track_llm( | ||
| model_name: str, | ||
| model_provider: str, | ||
| *, | ||
| name: str = "chat", | ||
| metadata: dict[str, Any] | None = None, | ||
| ) -> Generator[dict[str, Any], None, None]: | ||
| """Track an LLM call span. | ||
|
|
||
| Args: | ||
| model_name: Name of the LLM model | ||
| model_provider: Provider of the model (openai, anthropic, google, etc.) | ||
| name: Name of the operation (default: "chat") | ||
| metadata: Optional metadata to attach | ||
|
|
||
| Yields: | ||
| A dict that can be updated with input/output messages and metrics. | ||
| """ | ||
| context: dict[str, Any] = { | ||
| "start_time": time.time(), | ||
| "metadata": metadata or {}, | ||
| "input_data": None, | ||
| "output_data": None, | ||
| "metrics": {}, | ||
| "error": None, | ||
| "cancelled": False, | ||
| } | ||
|
|
||
| if not _llmobs_enabled: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError: | ||
| context["cancelled"] = True | ||
| raise | ||
| return | ||
|
|
||
| cancelled_error: asyncio.CancelledError | None = None | ||
| try: | ||
| from ddtrace.llmobs import LLMObs | ||
|
|
||
| with LLMObs.llm( | ||
| model_name=model_name, | ||
| model_provider=model_provider, | ||
| name=name, | ||
| ) as span: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| context["metadata"]["cancelled"] = True | ||
| cancelled_error = e | ||
| except Exception as e: | ||
| context["error"] = e | ||
| context["metadata"]["error"] = True | ||
| context["metadata"]["error.type"] = type(e).__name__ | ||
| context["metadata"]["error.message"] = str(e) | ||
| increment_metric("errors.llm") | ||
| raise | ||
| finally: | ||
| duration = time.time() - context["start_time"] | ||
| context["metadata"]["duration_seconds"] = round(duration, 3) | ||
| context["metadata"]["model"] = model_name | ||
| context["metadata"]["provider"] = model_provider | ||
|
|
||
| # Track token metrics | ||
| metrics = context.get("metrics", {}) | ||
| if "input_tokens" in metrics: | ||
| increment_metric("llm.tokens.input", metrics["input_tokens"]) | ||
| if "output_tokens" in metrics: | ||
| increment_metric("llm.tokens.output", metrics["output_tokens"]) | ||
|
|
||
| LLMObs.annotate( | ||
| span=span, | ||
| input_data=context.get("input_data"), | ||
| output_data=context.get("output_data"), | ||
| metrics=metrics or None, | ||
| metadata=context["metadata"], | ||
| ) | ||
| except ImportError: | ||
| try: | ||
| yield context | ||
| except asyncio.CancelledError as e: | ||
| context["cancelled"] = True | ||
| cancelled_error = e | ||
|
|
||
| if cancelled_error is not None: | ||
| raise cancelled_error | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Metrics Tracking | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| def increment_metric(name: str, value: float = 1.0) -> None: | ||
| """Increment a custom metric. | ||
|
|
||
| Args: | ||
| name: Name of the metric | ||
| value: Value to increment by (default: 1.0) | ||
| """ | ||
| _metrics[name] = _metrics.get(name, 0.0) + value | ||
|
|
||
|
|
||
| def get_metric(name: str) -> float: | ||
| """Get the current value of a metric. | ||
|
|
||
| Args: | ||
| name: Name of the metric | ||
|
|
||
| Returns: | ||
| Current metric value (0.0 if not set) | ||
| """ | ||
| return _metrics.get(name, 0.0) | ||
|
|
||
|
|
||
| def get_all_metrics() -> dict[str, float]: | ||
| """Get all tracked metrics. | ||
|
|
||
| Returns: | ||
| Dict of all metric names to values. | ||
| """ | ||
| return _metrics.copy() | ||
|
|
||
|
|
||
| def reset_metrics() -> None: | ||
| """Reset all metrics to zero.""" | ||
| _metrics.clear() | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Error Tracking | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| def track_error( | ||
| error: Exception, | ||
| *, | ||
| operation: str | None = None, | ||
| context: dict[str, Any] | None = None, | ||
| ) -> None: | ||
| """Track an error event. | ||
|
|
||
| Args: | ||
| error: The exception that occurred | ||
| operation: Name of the operation where error occurred | ||
| context: Additional context about the error | ||
| """ | ||
| increment_metric("errors.total") | ||
| if operation: | ||
| increment_metric(f"errors.{operation}") | ||
|
|
||
| if not _llmobs_enabled: | ||
| return | ||
|
|
||
| # aiohttp for async HTTP | ||
| try: | ||
| patch(aiohttp=True) | ||
| logger.debug("Patched aiohttp for Datadog") | ||
| except Exception: | ||
| logger.debug("aiohttp patching skipped") | ||
| from ddtrace.llmobs import LLMObs | ||
|
|
||
| LLMObs.annotate( | ||
| metadata={ | ||
| "error": True, | ||
| "error.type": type(error).__name__, | ||
| "error.message": str(error), | ||
| "error.operation": operation, | ||
| **(context or {}), | ||
| } | ||
| ) | ||
| except Exception: # noqa: BLE001 | ||
| pass |
There was a problem hiding this comment.
There is significant code duplication between client/joinly_client/datadog.py and joinly/utils/datadog.py. The tracking context managers (track_workflow, track_agent, track_tool, track_llm) and metrics functions are duplicated across both files. Consider extracting this shared functionality into a common module to reduce maintenance burden and ensure consistency.
| def increment_metric(name: str, value: float = 1.0) -> None: | ||
| """Increment a custom metric. | ||
|
|
||
| Args: | ||
| name: Name of the metric | ||
| value: Value to increment by (default: 1.0) | ||
| """ | ||
| _metrics[name] = _metrics.get(name, 0.0) + value |
There was a problem hiding this comment.
The increment_metric function performs a read-modify-write operation on the shared _metrics dictionary without synchronization. In an async environment with concurrent operations, this can lead to race conditions where metric increments are lost. Consider using threading.Lock for thread safety or asyncio.Lock for async safety, depending on the execution context.
| ) | ||
| from joinly.utils.clock import Clock | ||
| from joinly.utils.datadog import create_span, set_span_tag | ||
| from joinly.utils.datadog import track_task, track_tool |
There was a problem hiding this comment.
Import of 'track_tool' is not used.
| from joinly.utils.datadog import track_task, track_tool | |
| from joinly.utils.datadog import track_task |
… on 'leave' tool call.