diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f9c43a7..6194324 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,11 +1,11 @@ default_stages: [pre-commit] repos: - - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.11.6 - hooks: - # - id: ruff # Disabled - too strict for optional library handling - - id: ruff-format + # - repo: https://github.com/astral-sh/ruff-pre-commit + # rev: v0.11.6 + # hooks: + # # - id: ruff # Disabled - too strict for optional library handling + # - id: ruff-format - repo: https://github.com/astral-sh/uv-pre-commit rev: 0.6.14 hooks: diff --git a/client/joinly_client/agent.py b/client/joinly_client/agent.py index 72dc7ea..46f0c69 100644 --- a/client/joinly_client/agent.py +++ b/client/joinly_client/agent.py @@ -2,6 +2,7 @@ import contextlib import json import logging +import time from dataclasses import replace from typing import Any, Self @@ -21,6 +22,13 @@ from pydantic_ai.settings import ModelSettings, merge_model_settings from pydantic_ai.tools import ToolDefinition +from joinly_client.datadog import ( + increment_metric, + track_agent, + track_llm, + track_tool, + track_workflow, +) from joinly_client.types import ToolExecutor, TranscriptSegment, Usage from joinly_client.utils import get_prompt @@ -116,38 +124,95 @@ async def _run_loop(self, segments: list[TranscriptSegment]) -> None: segments (list[TranscriptSegment]): The segments of the transcript to process. """ - self._messages.append( - ModelRequest( - parts=[ - UserPromptPart( - f"{segment.speaker or 'Participant'}: {segment.text}" - ) - for segment in segments - ] - ) + # Build user input from segments + user_input = " | ".join( + f"{s.speaker or 'Participant'}: {s.text}" for s in segments ) - iteration: int = 0 - self._messages = self._truncate_tool_results( - self._messages, max_chars=self._max_tool_result_chars - ) - self._messages = self._omit_binary_tool_results(self._messages) - while self._max_agent_iter is None or iteration < self._max_agent_iter: - self._messages = self._limit_messages( - self._messages, max_messages=self._max_messages + # Track the entire agent conversation as a workflow + session_id = f"utterance-{time.time_ns()}" + with track_workflow( + "agent.conversation", + session_id=session_id, + metadata={ + "segment_count": len(segments), + "model": self._llm.model_name, + "provider": self._llm.system, + }, + ) as workflow_ctx: + workflow_ctx["input_data"] = user_input + + self._messages.append( + ModelRequest( + parts=[ + UserPromptPart( + f"{segment.speaker or 'Participant'}: {segment.text}" + ) + for segment in segments + ] + ) ) + + iteration: int = 0 + total_input_tokens = 0 + total_output_tokens = 0 + tools_called: list[str] = [] + self._messages = self._truncate_tool_results( - self._messages, max_chars=self._max_ephemeral_tool_result_chars + self._messages, max_chars=self._max_tool_result_chars ) + self._messages = self._omit_binary_tool_results(self._messages) + + while self._max_agent_iter is None or iteration < self._max_agent_iter: + self._messages = self._limit_messages( + self._messages, max_messages=self._max_messages + ) + self._messages = self._truncate_tool_results( + self._messages, max_chars=self._max_ephemeral_tool_result_chars + ) + + # Track each iteration as an agent span + with track_agent( + f"iteration.{iteration}", + session_id=session_id, + metadata={"iteration": iteration}, + ) as agent_ctx: + response = await self._call_llm(self._messages) + request = await self._call_tools(response) + + # Collect metrics + total_input_tokens += response.usage.request_tokens or 0 + total_output_tokens += response.usage.response_tokens or 0 + + tool_calls = [ + p for p in response.parts if isinstance(p, ToolCallPart) + ] + tools_called.extend(tc.tool_name for tc in tool_calls) + + agent_ctx["metadata"]["tool_calls"] = len(tool_calls) + agent_ctx["metadata"]["input_tokens"] = ( + response.usage.request_tokens or 0 + ) + agent_ctx["metadata"]["output_tokens"] = ( + response.usage.response_tokens or 0 + ) + + self._messages.append(response) + if request: + self._messages.append(request) + if self._check_end_turn(response, request): + break + iteration += 1 - response = await self._call_llm(self._messages) - request = await self._call_tools(response) - self._messages.append(response) - if request: - self._messages.append(request) - if self._check_end_turn(response, request): - break - iteration += 1 + # Update workflow metadata + workflow_ctx["metadata"]["iterations"] = iteration + 1 + workflow_ctx["metadata"]["total_input_tokens"] = total_input_tokens + workflow_ctx["metadata"]["total_output_tokens"] = total_output_tokens + workflow_ctx["metadata"]["tools_called"] = tools_called + workflow_ctx["metadata"]["unique_tools"] = list(set(tools_called)) + + # Increment conversation metric + increment_metric("agent.conversations") def _format_llmobs_input( self, messages: list[ModelMessage] @@ -166,58 +231,6 @@ def _format_llmobs_input( result.append({"role": "user", "content": content}) return result - def _annotate_llmobs( - self, - llmobs_span: Any, # noqa: ANN401 - ddtrace context manager type - input_messages: list[dict[str, str]], - response: ModelResponse | None = None, - error: Exception | None = None, - ) -> None: - """Annotate LLMObs span with input/output data.""" - try: - from ddtrace.llmobs import LLMObs - - if error: - LLMObs.annotate( - input_data=input_messages, - metadata={ - "error": True, - "error.type": type(error).__name__, - "error.message": str(error), - }, - ) - llmobs_span.__exit__(type(error), error, error.__traceback__) - elif response: - response_texts = [ - content - for p in response.parts - if isinstance(content := getattr(p, "content", None), str) - ] - input_tokens = response.usage.request_tokens or 0 - output_tokens = response.usage.response_tokens or 0 - LLMObs.annotate( - input_data=input_messages, - output_data=[ - {"role": "assistant", "content": t} for t in response_texts - ] - or None, - metrics={ - "input_tokens": input_tokens, - "output_tokens": output_tokens, - "total_tokens": input_tokens + output_tokens, - }, - metadata={ - "temperature": 0.2, - "model": self._llm.model_name, - "provider": self._llm.system, - }, - ) - llmobs_span.__exit__(None, None, None) - except Exception as e: # noqa: BLE001 - ddtrace can raise various errors - logger.debug("Failed to annotate LLMObs: %s", e) - with contextlib.suppress(Exception): - llmobs_span.__exit__(None, None, None) - async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse: """Call the LLM with the current messages. @@ -232,24 +245,13 @@ async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse: # Format input for LLM Observability llmobs_input = self._format_llmobs_input(messages) - # Try to use LLMObs for proper LLM Observability - llmobs_span = None - llmobs_span_exited = False - try: - from ddtrace.llmobs import LLMObs - - llmobs_span = LLMObs.llm( - model_name=self._llm.model_name, - model_provider=self._llm.system, - name="chat", - ) - llmobs_span.__enter__() - except ImportError: - pass - except Exception as e: # noqa: BLE001 - ddtrace is optional - logger.debug("LLMObs not available: %s", e) + with track_llm( + model_name=self._llm.model_name, + model_provider=self._llm.system, + metadata={"temperature": 0.2, "message_count": len(messages)}, + ) as llm_ctx: + llm_ctx["input_data"] = llmobs_input - try: response = await model_request( self._llm, [ModelRequest(parts=[SystemPromptPart(self._prompt)]), *messages], @@ -270,11 +272,29 @@ async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse: allow_text_output=self._llm.model_name.startswith("gpt-5"), ), ) - except Exception as e: - if llmobs_span: - self._annotate_llmobs(llmobs_span, llmobs_input, error=e) - llmobs_span_exited = True - raise + + # Extract response content + response_texts = [ + content + for p in response.parts + if isinstance(content := getattr(p, "content", None), str) + ] + + # Set LLMObs output and metrics + llm_ctx["output_data"] = [ + {"role": "assistant", "content": t} for t in response_texts + ] or None + llm_ctx["metrics"] = { + "input_tokens": response.usage.request_tokens or 0, + "output_tokens": response.usage.response_tokens or 0, + "total_tokens": (response.usage.request_tokens or 0) + + (response.usage.response_tokens or 0), + } + + # Log tool calls if any + tool_calls = [p for p in response.parts if isinstance(p, ToolCallPart)] + if tool_calls: + llm_ctx["metadata"]["tool_calls"] = [tc.tool_name for tc in tool_calls] logger.debug( "LLM response received with %d parts, %d input tokens and %d output tokens", @@ -283,9 +303,6 @@ async def _call_llm(self, messages: list[ModelMessage]) -> ModelResponse: response.usage.response_tokens or 0, ) - if llmobs_span and not llmobs_span_exited: - self._annotate_llmobs(llmobs_span, llmobs_input, response=response) - self._usage.add( "llm", usage={ @@ -341,22 +358,45 @@ async def _call_tool( None, ) + tool_args = tool_call.args_as_dict() logger.info( "%s: %s", tool_call.tool_name, ", ".join( f'{k}="{v}"' if isinstance(v, str) else f"{k}={v}" - for k, v in tool_call.args_as_dict().items() + for k, v in tool_args.items() ), ) - try: - content = await self._tool_executor( - tool_call.tool_name, tool_call.args_as_dict() - ) - except Exception: - logger.exception("Error calling tool %s", tool_call.tool_name) - content = f"Error calling tool {tool_call.tool_name}" + # Track the tool execution + with track_tool( + tool_call.tool_name, + arguments=tool_args, + metadata={"tool_call_id": tool_call.tool_call_id}, + ) as tool_ctx: + try: + content = await self._tool_executor(tool_call.tool_name, tool_args) + tool_ctx["metadata"]["success"] = True + except Exception: + logger.exception("Error calling tool %s", tool_call.tool_name) + content = f"Error calling tool {tool_call.tool_name}" + tool_ctx["metadata"]["success"] = False + tool_ctx["metadata"]["error"] = True + + # Set output data for tracking + if isinstance(content, BinaryContent): + tool_ctx["output_data"] = ( + f"BinaryContent({content.media_type}, {len(content.data)} bytes)" + ) + tool_ctx["metadata"]["has_binary"] = True + elif isinstance(content, list): + tool_ctx["output_data"] = f"List with {len(content)} items" + else: + # Truncate long outputs for tracking + output_str = str(content) + tool_ctx["output_data"] = ( + output_str[:500] if len(output_str) > 500 else output_str + ) logger.info( "%s: %s", diff --git a/client/joinly_client/datadog.py b/client/joinly_client/datadog.py index 52c3985..685e8fb 100644 --- a/client/joinly_client/datadog.py +++ b/client/joinly_client/datadog.py @@ -1,11 +1,23 @@ """Datadog integration utilities for the client package.""" +from __future__ import annotations + +import asyncio import logging import os -from typing import Any +import time +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Generator logger = logging.getLogger(__name__) +# Global state for tracking +_llmobs_enabled = False +_metrics: dict[str, float] = {} + def initialize_datadog() -> None: """Initialize Datadog LLM Observability for the client. @@ -22,6 +34,8 @@ def initialize_datadog() -> None: DD_LLMOBS_ML_APP: ML application name (default: joinly-agent) DD_TRACE_ENABLED: Enable APM tracing - requires local agent (default: false) """ + global _llmobs_enabled + # Check for API key api_key = os.getenv("DD_API_KEY") if not api_key: @@ -37,9 +51,7 @@ def initialize_datadog() -> None: env = os.getenv("DD_ENV", "production") version = os.getenv("DD_VERSION", "0.1.18") - # ========================================================================= # Set environment variables BEFORE importing ddtrace - # ========================================================================= os.environ.setdefault("DD_SERVICE", service) os.environ.setdefault("DD_ENV", env) os.environ.setdefault("DD_VERSION", version) @@ -62,13 +74,11 @@ def initialize_datadog() -> None: llmobs_ml_app = os.getenv("DD_LLMOBS_ML_APP", "joinly-agent") if llmobs_enabled: - # Enable agentless mode for LLM Observability os.environ["DD_LLMOBS_ENABLED"] = "1" os.environ["DD_LLMOBS_ML_APP"] = llmobs_ml_app os.environ["DD_LLMOBS_AGENTLESS_ENABLED"] = "1" if not apm_enabled: - # Disable APM tracing when no local agent is available os.environ["DD_TRACE_ENABLED"] = "false" # NOW import ddtrace after environment variables are set @@ -94,6 +104,7 @@ def initialize_datadog() -> None: env=env, service=service, ) + _llmobs_enabled = True logger.info( "LLM Observability enabled: ml_app=%s, site=%s, agentless=True", llmobs_ml_app, @@ -104,7 +115,7 @@ def initialize_datadog() -> None: "ddtrace.llmobs not available. " "LLM Observability requires ddtrace >= 2.0.0" ) - except Exception as e: + except Exception as e: # noqa: BLE001 logger.warning("Failed to enable LLM Observability: %s", e) # Patch libraries for auto-instrumentation @@ -121,32 +132,404 @@ def initialize_datadog() -> None: ) -def _patch_libraries(patch: Any) -> None: +def _patch_libraries(patch: Any) -> None: # noqa: ANN401 """Patch common libraries for auto-instrumentation.""" - # OpenAI for LLM calls + for lib in ("openai", "anthropic", "httpx", "aiohttp"): + try: + patch(**{lib: True}) + logger.debug("Patched %s for Datadog", lib) + except Exception: # noqa: BLE001 + logger.debug("%s patching skipped", lib) + + +def is_llmobs_enabled() -> bool: + """Check if LLM Observability is enabled.""" + return _llmobs_enabled + + +# ============================================================================= +# LLM Observability Span Context Managers +# ============================================================================= + + +@contextmanager +def track_workflow( + name: str, + *, + session_id: str | None = None, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track a workflow span (e.g., conversation session). + + Args: + name: Name of the workflow + session_id: Optional session ID to link related spans + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None try: - patch(openai=True) - logger.debug("Patched OpenAI for Datadog") - except Exception: - logger.debug("OpenAI patching skipped") + from ddtrace.llmobs import LLMObs + + with LLMObs.workflow(name=name, session_id=session_id) as span: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + context["metadata"]["cancelled"] = True + cancelled_error = e + except Exception as e: + context["error"] = e + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + LLMObs.annotate( + span=span, + metadata=context["metadata"], + output_data=context.get("output_data"), + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error + - # Anthropic for LLM calls +@contextmanager +def track_agent( + name: str = "agent", + *, + session_id: str | None = None, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track an agent span (e.g., agent conversation run). + + Args: + name: Name of the agent operation + session_id: Optional session ID to link related spans + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "input_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None try: - patch(anthropic=True) - logger.debug("Patched Anthropic for Datadog") - except Exception: - logger.debug("Anthropic patching skipped") + from ddtrace.llmobs import LLMObs + + with LLMObs.agent(name=name, session_id=session_id) as span: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + context["metadata"]["cancelled"] = True + cancelled_error = e + except Exception as e: + context["error"] = e + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + LLMObs.annotate( + span=span, + input_data=context.get("input_data"), + output_data=context.get("output_data"), + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error - # httpx for async HTTP (used by Google/Anthropic providers in pydantic-ai) + +@contextmanager +def track_tool( + name: str, + *, + arguments: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track a tool execution span. + + Args: + name: Name of the tool being executed + arguments: Tool input arguments + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None try: - patch(httpx=True) - logger.debug("Patched httpx for Datadog") - except Exception: - logger.debug("httpx patching skipped") + from ddtrace.llmobs import LLMObs + + with LLMObs.tool(name=name) as span: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + context["metadata"]["cancelled"] = True + context["output_data"] = "Tool cancelled" + cancelled_error = e + except Exception as e: + context["error"] = e + context["metadata"]["error"] = True + context["metadata"]["error.type"] = type(e).__name__ + context["metadata"]["error.message"] = str(e) + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + # Increment tool counter + increment_metric("tools.executed") + + LLMObs.annotate( + span=span, + input_data=arguments, + output_data=context.get("output_data"), + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error + + +@contextmanager +def track_llm( + model_name: str, + model_provider: str, + *, + name: str = "chat", + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track an LLM call span. + + Args: + model_name: Name of the LLM model + model_provider: Provider of the model (openai, anthropic, google, etc.) + name: Name of the operation (default: "chat") + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with input/output messages and metrics. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "input_data": None, + "output_data": None, + "metrics": {}, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None + try: + from ddtrace.llmobs import LLMObs + + with LLMObs.llm( + model_name=model_name, + model_provider=model_provider, + name=name, + ) as span: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + context["metadata"]["cancelled"] = True + cancelled_error = e + except Exception as e: + context["error"] = e + context["metadata"]["error"] = True + context["metadata"]["error.type"] = type(e).__name__ + context["metadata"]["error.message"] = str(e) + increment_metric("errors.llm") + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + context["metadata"]["model"] = model_name + context["metadata"]["provider"] = model_provider + + # Track token metrics + metrics = context.get("metrics", {}) + if "input_tokens" in metrics: + increment_metric("llm.tokens.input", metrics["input_tokens"]) + if "output_tokens" in metrics: + increment_metric("llm.tokens.output", metrics["output_tokens"]) + + LLMObs.annotate( + span=span, + input_data=context.get("input_data"), + output_data=context.get("output_data"), + metrics=metrics or None, + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error + + +# ============================================================================= +# Metrics Tracking +# ============================================================================= + + +def increment_metric(name: str, value: float = 1.0) -> None: + """Increment a custom metric. + + Args: + name: Name of the metric + value: Value to increment by (default: 1.0) + """ + _metrics[name] = _metrics.get(name, 0.0) + value + + +def get_metric(name: str) -> float: + """Get the current value of a metric. + + Args: + name: Name of the metric + + Returns: + Current metric value (0.0 if not set) + """ + return _metrics.get(name, 0.0) + + +def get_all_metrics() -> dict[str, float]: + """Get all tracked metrics. + + Returns: + Dict of all metric names to values. + """ + return _metrics.copy() + + +def reset_metrics() -> None: + """Reset all metrics to zero.""" + _metrics.clear() + + +# ============================================================================= +# Error Tracking +# ============================================================================= + + +def track_error( + error: Exception, + *, + operation: str | None = None, + context: dict[str, Any] | None = None, +) -> None: + """Track an error event. + + Args: + error: The exception that occurred + operation: Name of the operation where error occurred + context: Additional context about the error + """ + increment_metric("errors.total") + if operation: + increment_metric(f"errors.{operation}") + + if not _llmobs_enabled: + return - # aiohttp for async HTTP try: - patch(aiohttp=True) - logger.debug("Patched aiohttp for Datadog") - except Exception: - logger.debug("aiohttp patching skipped") + from ddtrace.llmobs import LLMObs + + LLMObs.annotate( + metadata={ + "error": True, + "error.type": type(error).__name__, + "error.message": str(error), + "error.operation": operation, + **(context or {}), + } + ) + except Exception: # noqa: BLE001 + pass diff --git a/client/joinly_client/main.py b/client/joinly_client/main.py index 709bb60..9752554 100644 --- a/client/joinly_client/main.py +++ b/client/joinly_client/main.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import json import logging import warnings @@ -379,7 +380,27 @@ async def log_segments(segments: list[TranscriptSegment]) -> None: await stack.enter_async_context(additional_client) logger.debug("Connected to %s", client_name) - joinly_config = McpClientConfig(client=client.client, exclude=["join_meeting"]) + # Event to signal when the meeting has ended + meeting_ended = asyncio.Event() + + # Callback to detect when leave_meeting is called via tool + from mcp.types import CallToolResult + + async def on_tool_result( + tool_name: str, args: dict[str, Any], result: CallToolResult + ) -> CallToolResult: + """Handle tool results and detect meeting leave.""" + if tool_name == "leave_meeting" and not result.isError: + logger.info("Meeting left via tool, shutting down...") + client.joined = False + meeting_ended.set() + return result + + joinly_config = McpClientConfig( + client=client.client, + exclude=["join_meeting"], + post_callback=on_tool_result, + ) tools, tool_executor = await load_tools( joinly_config if not additional_clients @@ -402,10 +423,11 @@ async def log_segments(segments: list[TranscriptSegment]) -> None: ), ) client.add_utterance_callback(agent.on_utterance) + async with agent: await client.join_meeting(meeting_url) try: - await asyncio.Event().wait() + await meeting_ended.wait() finally: usage = agent.usage.merge(await client.get_usage()) if usage.root: diff --git a/joinly/server.py b/joinly/server.py index 1bbe64b..06b36a1 100644 --- a/joinly/server.py +++ b/joinly/server.py @@ -23,7 +23,13 @@ Transcript, Usage, ) -from joinly.utils.datadog import create_span, set_span_tag +from joinly.utils.datadog import ( + track_meeting_join, + track_meeting_leave, + track_speech, + track_tool, + track_error, +) from joinly.utils.usage import get_usage, reset_usage, set_usage logger = logging.getLogger(__name__) @@ -179,19 +185,20 @@ async def join_meeting( ] = None, ) -> str: """Join a meeting with the given URL and participant name.""" - with create_span( - "meeting.join", - resource="join_meeting", - tags={ - "meeting.has_url": meeting_url is not None, - "meeting.has_passcode": passcode is not None, - "participant.name": participant_name or "unknown", - }, - ): - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - await ms.join_meeting(meeting_url, participant_name, passcode) - set_span_tag("meeting.status", "joined") - return "Joined meeting." + with track_meeting_join( + meeting_url=meeting_url, + participant_name=participant_name, + ) as ctx_dd: + try: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + await ms.join_meeting(meeting_url, participant_name, passcode) + ctx_dd["output_data"] = "Joined meeting successfully" + ctx_dd["metadata"]["status"] = "joined" + ctx_dd["metadata"]["has_passcode"] = passcode is not None + return "Joined meeting." + except Exception as e: + track_error(e, operation="join_meeting") + raise @mcp.tool( @@ -202,9 +209,16 @@ async def leave_meeting( ctx: Context, ) -> str: """Leave the current meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - await ms.leave_meeting() - return "Left the meeting." + with track_meeting_leave() as ctx_dd: + try: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + await ms.leave_meeting() + ctx_dd["output_data"] = "Left meeting successfully" + ctx_dd["metadata"]["status"] = "left" + return "Left the meeting." + except Exception as e: + track_error(e, operation="leave_meeting") + raise @mcp.tool( @@ -216,23 +230,18 @@ async def speak_text( text: Annotated[str, Field(description="Text to be spoken")], ) -> str: """Speak the given text in the meeting using TTS.""" - with create_span( - "meeting.speak", - resource="speak_text", - tags={ - "text.length": len(text), - "text.preview": text[:100] if len(text) > 100 else text, - }, - ): + with track_speech(text) as ctx_dd: ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session try: await ms.speak_text(text) - set_span_tag("speech.status", "completed") + ctx_dd["output_data"] = "Finished speaking" + ctx_dd["metadata"]["status"] = "completed" return "Finished speaking." except SpeechInterruptedError as e: - set_span_tag("speech.status", "interrupted") - set_span_tag("error", True) - set_span_tag("error.type", "SpeechInterruptedError") + ctx_dd["output_data"] = str(e) + ctx_dd["metadata"]["status"] = "interrupted" + ctx_dd["metadata"]["interrupted"] = True + ctx_dd["metadata"]["spoken_text"] = e.spoken_text return str(e) @@ -245,9 +254,15 @@ async def send_chat_message( message: Annotated[str, Field(description="Message to be sent")], ) -> str: """Send a chat message in the meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - await ms.send_chat_message(message) - return "Sent message." + with track_tool( + "chat.send", + arguments={"message": message[:100] if len(message) > 100 else message}, + metadata={"message.length": len(message)}, + ) as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + await ms.send_chat_message(message) + ctx_dd["output_data"] = "Sent message" + return "Sent message." @mcp.tool( @@ -258,8 +273,12 @@ async def get_chat_history( ctx: Context, ) -> MeetingChatHistory: """Get the chat history from the meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - return await ms.get_chat_history() + with track_tool("chat.get_history") as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + history = await ms.get_chat_history() + ctx_dd["metadata"]["message_count"] = len(history.messages) + ctx_dd["output_data"] = f"Retrieved {len(history.messages)} messages" + return history @mcp.tool( @@ -290,12 +309,23 @@ async def get_transcript_tool( ] = 0, ) -> Transcript: """Get the transcript of the meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - if mode == "first": - return ms.transcript.before(minutes * 60).compact() - if mode == "latest": - return ms.transcript.after(ms.meeting_seconds - minutes * 60).compact() - return ms.transcript.compact() + with track_tool( + "transcript.get", + arguments={"mode": mode, "minutes": minutes}, + ) as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + if mode == "first": + transcript = ms.transcript.before(minutes * 60).compact() + elif mode == "latest": + transcript = ms.transcript.after( + ms.meeting_seconds - minutes * 60 + ).compact() + else: + transcript = ms.transcript.compact() + ctx_dd["metadata"]["segment_count"] = len(transcript.segments) + ctx_dd["metadata"]["speaker_count"] = len(transcript.speakers) + ctx_dd["output_data"] = f"Retrieved {len(transcript.segments)} segments" + return transcript @mcp.tool( @@ -306,8 +336,12 @@ async def get_participants( ctx: Context, ) -> MeetingParticipantList: """Get the list of participants in the meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - return MeetingParticipantList(await ms.get_participants()) + with track_tool("meeting.get_participants") as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + participants = await ms.get_participants() + ctx_dd["metadata"]["participant_count"] = len(participants) + ctx_dd["output_data"] = f"Retrieved {len(participants)} participants" + return MeetingParticipantList(participants) @mcp.tool( @@ -319,13 +353,19 @@ async def get_participants( ) async def get_video_snapshot(ctx: Context) -> ImageContent: """Get a snapshot of the current video feed.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - snapshot = await ms.get_video_snapshot() - return ImageContent( - type="image", - data=base64.b64encode(snapshot.data).decode(), - mimeType=snapshot.media_type, - ) + with track_tool("video.snapshot") as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + snapshot = await ms.get_video_snapshot() + ctx_dd["metadata"]["image.size_bytes"] = len(snapshot.data) + ctx_dd["metadata"]["image.media_type"] = snapshot.media_type + ctx_dd["output_data"] = ( + f"Captured {snapshot.media_type} ({len(snapshot.data)} bytes)" + ) + return ImageContent( + type="image", + data=base64.b64encode(snapshot.data).decode(), + mimeType=snapshot.media_type, + ) @mcp.tool( @@ -336,9 +376,11 @@ async def mute_yourself( ctx: Context, ) -> str: """Mute yourself in the meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - await ms.mute() - return "Muted yourself." + with track_tool("audio.mute") as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + await ms.mute() + ctx_dd["output_data"] = "Muted successfully" + return "Muted yourself." @mcp.tool( @@ -349,9 +391,11 @@ async def unmute_yourself( ctx: Context, ) -> str: """Unmute yourself in the meeting.""" - ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session - await ms.unmute() - return "Unmuted yourself." + with track_tool("audio.unmute") as ctx_dd: + ms: MeetingSession = ctx.request_context.lifespan_context.meeting_session + await ms.unmute() + ctx_dd["output_data"] = "Unmuted successfully" + return "Unmuted yourself." @mcp.custom_route("/health", methods=["GET"]) diff --git a/joinly/session.py b/joinly/session.py index 7970351..a648865 100644 --- a/joinly/session.py +++ b/joinly/session.py @@ -15,7 +15,7 @@ VideoSnapshot, ) from joinly.utils.clock import Clock -from joinly.utils.datadog import create_span, set_span_tag +from joinly.utils.datadog import track_task, track_tool from joinly.utils.events import EventBus, EventType logger = logging.getLogger(__name__) @@ -95,14 +95,13 @@ async def join_meeting( passcode (str | None): The password or passcode for the meeting (if required). """ - with create_span( + with track_task( "session.join_meeting", - resource="join_meeting", - tags={ + metadata={ "meeting.has_url": meeting_url is not None, "participant.name": participant_name or "unknown", }, - ): + ) as ctx: await self._meeting_provider.join(meeting_url, participant_name, passcode) self._clock = Clock() self._transcript = Transcript() @@ -124,7 +123,8 @@ async def unmute_on_start() -> None: await self._speech_controller.start( self._clock, self._transcript, self._event_bus ) - set_span_tag("session.status", "active") + ctx["metadata"]["status"] = "active" + ctx["output_data"] = "Session started successfully" async def leave_meeting(self) -> None: """Leave the current meeting.""" @@ -138,14 +138,13 @@ async def speak_text(self, text: str) -> None: Args: text (str): The text to be spoken. """ - with create_span( + with track_task( "session.speak_text", - resource="speak_text", - tags={ - "text.length": len(text), - }, - ): + input_data=text, + metadata={"text.length": len(text)}, + ) as ctx: await self._speech_controller.speak_text(text) + ctx["output_data"] = "Speech completed" async def send_chat_message(self, message: str) -> None: """Send a chat message in the meeting. diff --git a/joinly/utils/datadog.py b/joinly/utils/datadog.py index c691574..7a815bf 100644 --- a/joinly/utils/datadog.py +++ b/joinly/utils/datadog.py @@ -1,11 +1,23 @@ """Datadog integration utilities for monitoring and LLM observability.""" +from __future__ import annotations + +import asyncio import logging import os -from typing import Any +import time +from contextlib import contextmanager +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Generator logger = logging.getLogger(__name__) +# Global state for tracking +_llmobs_enabled = False +_metrics: dict[str, float] = {} + def initialize_datadog() -> None: """Initialize Datadog LLM Observability. @@ -13,9 +25,6 @@ def initialize_datadog() -> None: This uses the SDK-based approach for LLM Observability which supports agentless mode (sending data directly to Datadog without a local agent). - For full APM tracing, you need to run the Datadog Agent locally. - This implementation focuses on LLM Observability which works agentless. - Environment variables used: DD_SITE: Datadog site (default: datadoghq.com) DD_API_KEY: Datadog API key (required) @@ -26,6 +35,8 @@ def initialize_datadog() -> None: DD_LLMOBS_ML_APP: ML application name (default: joinly-agent) DD_TRACE_ENABLED: Enable APM tracing - requires local agent (default: false) """ + global _llmobs_enabled + # Check for API key api_key = os.getenv("DD_API_KEY") if not api_key: @@ -41,9 +52,7 @@ def initialize_datadog() -> None: env = os.getenv("DD_ENV", "production") version = os.getenv("DD_VERSION", "0.5.2") - # ========================================================================= # Set environment variables BEFORE importing ddtrace - # ========================================================================= os.environ.setdefault("DD_SERVICE", service) os.environ.setdefault("DD_ENV", env) os.environ.setdefault("DD_VERSION", version) @@ -66,13 +75,11 @@ def initialize_datadog() -> None: llmobs_ml_app = os.getenv("DD_LLMOBS_ML_APP", "joinly-agent") if llmobs_enabled: - # Enable agentless mode for LLM Observability os.environ["DD_LLMOBS_ENABLED"] = "1" os.environ["DD_LLMOBS_ML_APP"] = llmobs_ml_app os.environ["DD_LLMOBS_AGENTLESS_ENABLED"] = "1" if not apm_enabled: - # Disable APM tracing when no local agent is available os.environ["DD_TRACE_ENABLED"] = "false" logger.debug( "APM tracing disabled (no local agent). " @@ -102,6 +109,7 @@ def initialize_datadog() -> None: env=env, service=service, ) + _llmobs_enabled = True logger.info( "LLM Observability enabled: ml_app=%s, site=%s, agentless=True", llmobs_ml_app, @@ -112,46 +120,12 @@ def initialize_datadog() -> None: "ddtrace.llmobs not available. " "LLM Observability requires ddtrace >= 2.0.0" ) - except Exception as e: + except Exception as e: # noqa: BLE001 logger.warning("Failed to enable LLM Observability: %s", e) - # Patch libraries for auto-instrumentation (useful even without APM) + # Patch libraries for auto-instrumentation _patch_libraries(patch) - # Configure sampling rate if APM is enabled - if apm_enabled: - try: - from ddtrace import config, tracer - - sample_rate = float(os.getenv("DD_TRACE_SAMPLE_RATE", "1.0")) - config.trace_sample_rate = sample_rate - - # Set global tags on tracer - tags: dict[str, str] = { - "service": service, - "env": env, - "version": version, - } - - # Parse additional tags from DD_TAGS - dd_tags = os.getenv("DD_TAGS", "") - if dd_tags: - for tag_str in dd_tags.split(","): - tag_str = tag_str.strip() - if ":" in tag_str: - key, value = tag_str.split(":", 1) - tags[key.strip()] = value.strip() - - tracer.set_tags(tags) - logger.info( - "APM tracing enabled: service=%s, env=%s, sample_rate=%s", - service, - env, - sample_rate, - ) - except Exception as e: - logger.warning("Failed to configure APM tracing: %s", e) - logger.info( "Datadog initialized: service=%s, env=%s, site=%s, " "llm_observability=%s, apm_tracing=%s", @@ -163,38 +137,619 @@ def initialize_datadog() -> None: ) -def _patch_libraries(patch: Any) -> None: +def _patch_libraries(patch: Any) -> None: # noqa: ANN401 """Patch common libraries for auto-instrumentation.""" - # OpenAI for LLM calls + for lib in ("openai", "anthropic", "httpx", "aiohttp"): + try: + patch(**{lib: True}) + logger.debug("Patched %s for Datadog", lib) + except Exception: # noqa: BLE001 + logger.debug("%s patching skipped", lib) + + +def is_llmobs_enabled() -> bool: + """Check if LLM Observability is enabled.""" + return _llmobs_enabled + + +# ============================================================================= +# LLM Observability Span Context Managers +# ============================================================================= + + +@contextmanager +def track_workflow( + name: str, + *, + session_id: str | None = None, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track a workflow span (e.g., meeting session, conversation). + + Args: + name: Name of the workflow (e.g., "meeting.session") + session_id: Optional session ID to link related spans + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None try: - patch(openai=True) - logger.debug("Patched OpenAI for Datadog") - except Exception: - logger.debug("OpenAI patching skipped") + from ddtrace.llmobs import LLMObs + + with LLMObs.workflow(name=name, session_id=session_id) as span: + try: + yield context + except asyncio.CancelledError as e: + # CancelledError is expected - don't mark as error + context["cancelled"] = True + context["metadata"]["cancelled"] = True + cancelled_error = e + except Exception as e: + context["error"] = e + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + LLMObs.annotate( + span=span, + metadata=context["metadata"], + output_data=context.get("output_data"), + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error + + +@contextmanager +def track_agent( + name: str = "agent", + *, + session_id: str | None = None, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track an agent span (e.g., agent conversation run). + + Args: + name: Name of the agent operation + session_id: Optional session ID to link related spans + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "input_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None + try: + from ddtrace.llmobs import LLMObs + + with LLMObs.agent(name=name, session_id=session_id) as span: + try: + yield context + except asyncio.CancelledError as e: + # CancelledError is expected - don't mark as error + context["cancelled"] = True + context["metadata"]["cancelled"] = True + cancelled_error = e + except Exception as e: + context["error"] = e + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + LLMObs.annotate( + span=span, + input_data=context.get("input_data"), + output_data=context.get("output_data"), + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error + + +@contextmanager +def track_tool( + name: str, + *, + arguments: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track a tool execution span. + + Args: + name: Name of the tool being executed + arguments: Tool input arguments + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None + try: + from ddtrace.llmobs import LLMObs + + with LLMObs.tool(name=name) as span: + try: + yield context + except asyncio.CancelledError as e: + # CancelledError is expected behavior - capture but don't re-raise + # inside span to prevent Datadog from marking as error + context["cancelled"] = True + context["metadata"]["cancelled"] = True + context["output_data"] = "Tool cancelled" + cancelled_error = e + except Exception as e: + context["error"] = e + context["metadata"]["error"] = True + context["metadata"]["error.type"] = type(e).__name__ + context["metadata"]["error.message"] = str(e) + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + # Increment tool counter + increment_metric("tools.executed") + + LLMObs.annotate( + span=span, + input_data=arguments, + output_data=context.get("output_data"), + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + # Re-raise CancelledError outside the span context so it doesn't show as error + if cancelled_error is not None: + raise cancelled_error + + +@contextmanager +def track_task( + name: str, + *, + input_data: Any = None, # noqa: ANN401 + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track a task span (e.g., transcription, speech synthesis). + + Args: + name: Name of the task + input_data: Input to the task + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with output_data and additional metadata. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "output_data": None, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return - # Anthropic for LLM calls + cancelled_error: asyncio.CancelledError | None = None try: - patch(anthropic=True) - logger.debug("Patched Anthropic for Datadog") - except Exception: - logger.debug("Anthropic patching skipped") + from ddtrace.llmobs import LLMObs + + with LLMObs.task(name=name) as span: + try: + yield context + except asyncio.CancelledError as e: + # CancelledError is expected behavior (e.g., speech interrupted) + # Capture it but don't re-raise inside the span - this prevents + # Datadog from marking it as an error + context["cancelled"] = True + context["metadata"]["cancelled"] = True + context["output_data"] = "Task cancelled" + cancelled_error = e + except Exception as e: + context["error"] = e + context["metadata"]["error"] = True + context["metadata"]["error.type"] = type(e).__name__ + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + + LLMObs.annotate( + span=span, + input_data=input_data, + output_data=context.get("output_data"), + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + # Re-raise CancelledError outside the span context so it doesn't show as error + if cancelled_error is not None: + raise cancelled_error + + +@contextmanager +def track_llm( + model_name: str, + model_provider: str, + *, + name: str = "chat", + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track an LLM call span. - # httpx for async HTTP (used by Google/Anthropic providers in pydantic-ai) + Args: + model_name: Name of the LLM model + model_provider: Provider of the model (openai, anthropic, google, etc.) + name: Name of the operation (default: "chat") + metadata: Optional metadata to attach + + Yields: + A dict that can be updated with input/output messages and metrics. + """ + context: dict[str, Any] = { + "start_time": time.time(), + "metadata": metadata or {}, + "input_data": None, + "output_data": None, + "metrics": {}, + "error": None, + "cancelled": False, + } + + if not _llmobs_enabled: + try: + yield context + except asyncio.CancelledError: + context["cancelled"] = True + raise + return + + cancelled_error: asyncio.CancelledError | None = None try: - patch(httpx=True) - logger.debug("Patched httpx for Datadog") - except Exception: - logger.debug("httpx patching skipped") + from ddtrace.llmobs import LLMObs + + with LLMObs.llm( + model_name=model_name, + model_provider=model_provider, + name=name, + ) as span: + try: + yield context + except asyncio.CancelledError as e: + # CancelledError is expected - don't mark as error + context["cancelled"] = True + context["metadata"]["cancelled"] = True + cancelled_error = e + except Exception as e: + context["error"] = e + context["metadata"]["error"] = True + context["metadata"]["error.type"] = type(e).__name__ + context["metadata"]["error.message"] = str(e) + increment_metric("errors.llm") + raise + finally: + duration = time.time() - context["start_time"] + context["metadata"]["duration_seconds"] = round(duration, 3) + context["metadata"]["model"] = model_name + context["metadata"]["provider"] = model_provider + + # Track token metrics + metrics = context.get("metrics", {}) + if "input_tokens" in metrics: + increment_metric("llm.tokens.input", metrics["input_tokens"]) + if "output_tokens" in metrics: + increment_metric("llm.tokens.output", metrics["output_tokens"]) + + LLMObs.annotate( + span=span, + input_data=context.get("input_data"), + output_data=context.get("output_data"), + metrics=metrics or None, + metadata=context["metadata"], + ) + except ImportError: + try: + yield context + except asyncio.CancelledError as e: + context["cancelled"] = True + cancelled_error = e + + if cancelled_error is not None: + raise cancelled_error + + +# ============================================================================= +# Meeting-Specific Tracking +# ============================================================================= + + +@contextmanager +def track_meeting_session( + meeting_url: str | None = None, + participant_name: str | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track a complete meeting session as a workflow. + + Args: + meeting_url: URL of the meeting + participant_name: Name of the participant + + Yields: + A dict for tracking session data. + """ + session_id = f"meeting-{time.time_ns()}" + metadata = { + "meeting.url": meeting_url or "unknown", + "participant.name": participant_name or "unknown", + } + + with track_workflow( + "meeting.session", + session_id=session_id, + metadata=metadata, + ) as ctx: + ctx["session_id"] = session_id + increment_metric("meetings.joined") + yield ctx + + +@contextmanager +def track_meeting_join( + meeting_url: str | None = None, + participant_name: str | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track joining a meeting as a tool operation. + + Args: + meeting_url: URL of the meeting + participant_name: Name of the participant + + Yields: + A dict for tracking join operation. + """ + with track_tool( + "meeting.join", + arguments={ + "meeting_url": meeting_url, + "participant_name": participant_name, + }, + metadata={"operation": "join"}, + ) as ctx: + yield ctx + + +@contextmanager +def track_meeting_leave() -> Generator[dict[str, Any], None, None]: + """Track leaving a meeting as a tool operation.""" + with track_tool( + "meeting.leave", + metadata={"operation": "leave"}, + ) as ctx: + yield ctx + + +# ============================================================================= +# Speech Tracking +# ============================================================================= + + +@contextmanager +def track_speech( + text: str, + *, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track text-to-speech as a task. + + Args: + text: Text being spoken + metadata: Additional metadata + + Yields: + A dict for tracking speech operation. + """ + with track_task( + "speech.tts", + input_data=text, + metadata={ + "text.length": len(text), + "text.preview": text[:100] if len(text) > 100 else text, + **(metadata or {}), + }, + ) as ctx: + yield ctx + + +@contextmanager +def track_transcription( + *, + metadata: dict[str, Any] | None = None, +) -> Generator[dict[str, Any], None, None]: + """Track speech-to-text transcription as a task. + + Args: + metadata: Additional metadata + + Yields: + A dict for tracking transcription. + """ + with track_task( + "speech.transcription", + metadata=metadata or {}, + ) as ctx: + yield ctx + + +# ============================================================================= +# Metrics Tracking +# ============================================================================= + + +def increment_metric(name: str, value: float = 1.0) -> None: + """Increment a custom metric. + + Args: + name: Name of the metric + value: Value to increment by (default: 1.0) + """ + _metrics[name] = _metrics.get(name, 0.0) + value + + +def get_metric(name: str) -> float: + """Get the current value of a metric. + + Args: + name: Name of the metric + + Returns: + Current metric value (0.0 if not set) + """ + return _metrics.get(name, 0.0) + + +def get_all_metrics() -> dict[str, float]: + """Get all tracked metrics. + + Returns: + Dict of all metric names to values. + """ + return _metrics.copy() + + +def reset_metrics() -> None: + """Reset all metrics to zero.""" + _metrics.clear() + + +# ============================================================================= +# Error Tracking +# ============================================================================= + + +def track_error( + error: Exception, + *, + operation: str | None = None, + context: dict[str, Any] | None = None, +) -> None: + """Track an error event. + + Args: + error: The exception that occurred + operation: Name of the operation where error occurred + context: Additional context about the error + """ + increment_metric("errors.total") + if operation: + increment_metric(f"errors.{operation}") + + if not _llmobs_enabled: + return - # aiohttp for async HTTP try: - patch(aiohttp=True) - logger.debug("Patched aiohttp for Datadog") - except Exception: - logger.debug("aiohttp patching skipped") + from ddtrace.llmobs import LLMObs + + # Annotate current span with error info if there is one + LLMObs.annotate( + metadata={ + "error": True, + "error.type": type(error).__name__, + "error.message": str(error), + "error.operation": operation, + **(context or {}), + } + ) + except Exception: # noqa: BLE001 + pass # Don't fail if annotation fails + + +# ============================================================================= +# Legacy API (for backward compatibility) +# ============================================================================= -def get_tracer() -> Any: +def get_tracer() -> Any: # noqa: ANN401 """Get the Datadog tracer instance. Returns: @@ -213,8 +768,8 @@ def create_span( service: str | None = None, resource: str | None = None, tags: dict[str, Any] | None = None, -) -> Any: - """Create a Datadog span for custom instrumentation. +) -> Any: # noqa: ANN401 + """Create a Datadog span for custom instrumentation (legacy). Args: operation_name: Name of the operation being traced. @@ -223,8 +778,7 @@ def create_span( tags: Additional tags to add to the span. Returns: - A Datadog span context manager, or a no-op context manager if Datadog - is not available. + A Datadog span context manager, or a no-op context manager. """ tracer = get_tracer() if tracer is None: @@ -245,7 +799,7 @@ def create_span( return span -def set_span_tag(key: str, value: Any) -> None: +def set_span_tag(key: str, value: Any) -> None: # noqa: ANN401 """Set a tag on the current active span.""" tracer = get_tracer() if tracer is None: