diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 99b088d1f..b7916e62a 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -13,6 +13,11 @@ """ from typing import Optional, Set, TypedDict + +try: + from typing import NotRequired +except ImportError: + from typing_extensions import NotRequired from types import ModuleType from dataclasses import dataclass import importlib @@ -36,8 +41,11 @@ def _is_package_instrumented(package_name: str) -> bool: """Check if a package is already instrumented by looking at active instrumentors.""" + # Handle package.module names by converting dots to underscores for comparison + normalized_name = package_name.replace(".", "_").lower() return any( - instrumentor.__class__.__name__.lower().startswith(package_name.lower()) + instrumentor.__class__.__name__.lower().startswith(normalized_name) + or instrumentor.__class__.__name__.lower().startswith(package_name.split(".")[-1].lower()) for instrumentor in _active_instrumentors ) @@ -65,17 +73,14 @@ def _should_instrument_package(package_name: str) -> bool: if package_name in AGENTIC_LIBRARIES: _uninstrument_providers() _has_agentic_library = True - logger.debug(f"Uninstrumented all providers due to agentic library {package_name} detection") return True # Skip providers if an agentic library is already instrumented if package_name in PROVIDERS and _has_agentic_library: - logger.debug(f"Skipping provider {package_name} instrumentation as an agentic library is already instrumented") return False # Skip if already instrumented if _is_package_instrumented(package_name): - logger.debug(f"Package {package_name} is already instrumented") return False return True @@ -102,29 +107,54 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), Monitor imports and instrument packages as they are imported. This replaces the built-in import function to intercept package imports. """ - global _instrumenting_packages - root = name.split(".", 1)[0] + global _instrumenting_packages, _has_agentic_library - # Skip providers if an agentic library is already instrumented - if _has_agentic_library and root in PROVIDERS: + # If an agentic library is already instrumented, skip all further instrumentation + if _has_agentic_library: return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) - # Check if this is a package we should instrument - if ( - root in TARGET_PACKAGES - and root not in _instrumenting_packages - and not _is_package_instrumented(root) # Check if already instrumented before adding - ): - logger.debug(f"Detected import of {root}") - _instrumenting_packages.add(root) - try: - _perform_instrumentation(root) - except Exception as e: - logger.error(f"Error instrumenting {root}: {str(e)}") - finally: - _instrumenting_packages.discard(root) - - return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) + # First, do the actual import + module = _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) + + # Check for exact matches first (handles package.module like google.adk) + packages_to_check = set() + + # Check the imported module itself + if name in TARGET_PACKAGES: + packages_to_check.add(name) + else: + # Check if any target package is a prefix of the import name + for target in TARGET_PACKAGES: + if name.startswith(target + ".") or name == target: + packages_to_check.add(target) + + # For "from X import Y" style imports, also check submodules + if fromlist: + for item in fromlist: + full_name = f"{name}.{item}" + if full_name in TARGET_PACKAGES: + packages_to_check.add(full_name) + else: + # Check if any target package matches this submodule + for target in TARGET_PACKAGES: + if full_name == target or full_name.startswith(target + "."): + packages_to_check.add(target) + + # Instrument all matching packages + for package_to_check in packages_to_check: + if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check): + _instrumenting_packages.add(package_to_check) + try: + _perform_instrumentation(package_to_check) + # If we just instrumented an agentic library, stop + if _has_agentic_library: + break + except Exception as e: + logger.error(f"Error instrumenting {package_to_check}: {str(e)}") + finally: + _instrumenting_packages.discard(package_to_check) + + return module # Define the structure for instrumentor configurations @@ -132,6 +162,7 @@ class InstrumentorConfig(TypedDict): module_name: str class_name: str min_version: str + package_name: NotRequired[str] # Optional: actual pip package name if different from module # Configuration for supported LLM providers @@ -146,16 +177,17 @@ class InstrumentorConfig(TypedDict): "class_name": "AnthropicInstrumentor", "min_version": "0.32.0", }, - "google.genai": { - "module_name": "agentops.instrumentation.google_generativeai", - "class_name": "GoogleGenerativeAIInstrumentor", - "min_version": "0.1.0", - }, "ibm_watsonx_ai": { "module_name": "agentops.instrumentation.ibm_watsonx_ai", "class_name": "IBMWatsonXInstrumentor", "min_version": "0.1.0", }, + "google.genai": { + "module_name": "agentops.instrumentation.google_generativeai", + "class_name": "GoogleGenerativeAIInstrumentor", + "min_version": "0.1.0", + "package_name": "google-genai", # Actual pip package name + }, } # Configuration for supported agentic libraries @@ -171,6 +203,11 @@ class InstrumentorConfig(TypedDict): "class_name": "OpenAIAgentsInstrumentor", "min_version": "0.0.1", }, + "google.adk": { + "module_name": "agentops.instrumentation.google_adk", + "class_name": "GoogleADKInstrumentor", + "min_version": "0.1.0", + }, } # Combine all target packages for monitoring @@ -190,6 +227,7 @@ class InstrumentorLoader: module_name: str class_name: str min_version: str + package_name: Optional[str] = None # Optional: actual pip package name @property def module(self) -> ModuleType: @@ -200,7 +238,11 @@ def module(self) -> ModuleType: def should_activate(self) -> bool: """Check if the package is available and meets version requirements.""" try: - provider_name = self.module_name.split(".")[-1] + # Use explicit package_name if provided, otherwise derive from module_name + if self.package_name: + provider_name = self.package_name + else: + provider_name = self.module_name.split(".")[-1] module_version = version(provider_name) return module_version is not None and Version(module_version) >= parse(self.min_version) except ImportError: @@ -233,24 +275,44 @@ def instrument_all(): # Check if active_instrumentors is empty, as a proxy for not started. if not _active_instrumentors: builtins.__import__ = _import_monitor - global _instrumenting_packages + global _instrumenting_packages, _has_agentic_library + + # If an agentic library is already instrumented, don't instrument anything else + if _has_agentic_library: + return + for name in list(sys.modules.keys()): + # Stop if an agentic library gets instrumented during the loop + if _has_agentic_library: + break + module = sys.modules.get(name) if not isinstance(module, ModuleType): continue - root = name.split(".", 1)[0] - if _has_agentic_library and root in PROVIDERS: - continue - - if root in TARGET_PACKAGES and root not in _instrumenting_packages and not _is_package_instrumented(root): - _instrumenting_packages.add(root) + # Check for exact matches first (handles package.module like google.adk) + package_to_check = None + if name in TARGET_PACKAGES: + package_to_check = name + else: + # Check if any target package is a prefix of the module name + for target in TARGET_PACKAGES: + if name.startswith(target + ".") or name == target: + package_to_check = target + break + + if ( + package_to_check + and package_to_check not in _instrumenting_packages + and not _is_package_instrumented(package_to_check) + ): + _instrumenting_packages.add(package_to_check) try: - _perform_instrumentation(root) + _perform_instrumentation(package_to_check) except Exception as e: - logger.error(f"Error instrumenting {root}: {str(e)}") + logger.error(f"Error instrumenting {package_to_check}: {str(e)}") finally: - _instrumenting_packages.discard(root) + _instrumenting_packages.discard(package_to_check) def uninstrument_all(): @@ -269,8 +331,19 @@ def get_active_libraries() -> set[str]: Get all actively used libraries in the current execution context. Returns a set of package names that are currently imported and being monitored. """ - return { - name.split(".")[0] - for name, module in sys.modules.items() - if isinstance(module, ModuleType) and name.split(".")[0] in TARGET_PACKAGES - } + active_libs = set() + for name, module in sys.modules.items(): + if not isinstance(module, ModuleType): + continue + + # Check for exact matches first + if name in TARGET_PACKAGES: + active_libs.add(name) + else: + # Check if any target package is a prefix of the module name + for target in TARGET_PACKAGES: + if name.startswith(target + ".") or name == target: + active_libs.add(target) + break + + return active_libs diff --git a/agentops/instrumentation/google_adk/__init__.py b/agentops/instrumentation/google_adk/__init__.py new file mode 100644 index 000000000..ac8bcd215 --- /dev/null +++ b/agentops/instrumentation/google_adk/__init__.py @@ -0,0 +1,20 @@ +"""Google ADK Instrumentation for AgentOps + +This module provides instrumentation for Google's Agent Development Kit (ADK), +capturing agent execution, LLM calls, tool calls, and other ADK-specific events. +""" + +from importlib.metadata import version, PackageNotFoundError + +try: + __version__ = version("google-adk") +except PackageNotFoundError: + __version__ = "0.0.0" + +LIBRARY_NAME = "agentops.instrumentation.google_adk" +LIBRARY_VERSION = __version__ + +from agentops.instrumentation.google_adk.instrumentor import GoogleADKInstrumentor # noqa: E402 +from agentops.instrumentation.google_adk import patch # noqa: E402 + +__all__ = ["LIBRARY_NAME", "LIBRARY_VERSION", "GoogleADKInstrumentor", "patch"] diff --git a/agentops/instrumentation/google_adk/instrumentor.py b/agentops/instrumentation/google_adk/instrumentor.py new file mode 100644 index 000000000..000b58073 --- /dev/null +++ b/agentops/instrumentation/google_adk/instrumentor.py @@ -0,0 +1,78 @@ +"""Google ADK Instrumentation for AgentOps + +This module provides instrumentation for Google's Agent Development Kit (ADK). +It uses a patching approach to: +1. Disable ADK's built-in telemetry to prevent duplicate spans +2. Create AgentOps spans that mirror ADK's telemetry structure +3. Extract and properly index LLM messages and tool calls +""" + +from typing import Collection +from opentelemetry.trace import get_tracer +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter + +from agentops.logging import logger +from agentops.instrumentation.google_adk import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.google_adk.patch import patch_adk, unpatch_adk +from agentops.semconv import Meters + + +class GoogleADKInstrumentor(BaseInstrumentor): + """An instrumentor for Google Agent Development Kit (ADK). + + This instrumentor patches Google ADK to: + - Prevent ADK from creating its own telemetry spans + - Create AgentOps spans for agent runs, LLM calls, and tool calls + - Properly extract and index message content and tool interactions + """ + + def instrumentation_dependencies(self) -> Collection[str]: + """Return packages required for instrumentation.""" + return ["google-adk >= 0.1.0"] + + def _instrument(self, **kwargs): + """Instrument the Google ADK. + + This method: + 1. Disables ADK's built-in telemetry + 2. Patches key ADK methods to create AgentOps spans + 3. Sets up metrics for tracking token usage and operation duration + """ + # Set up tracer and meter + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) + + # Create metrics + meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used with Google ADK", + ) + + meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="Google ADK operation duration", + ) + + meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during Google ADK operations", + ) + + # Apply patches + patch_adk(tracer) + logger.info("Google ADK instrumentation enabled") + + def _uninstrument(self, **kwargs): + """Remove instrumentation from Google ADK. + + This method removes all patches and restores ADK's original behavior. + """ + unpatch_adk() + logger.info("Google ADK instrumentation disabled") diff --git a/agentops/instrumentation/google_adk/patch.py b/agentops/instrumentation/google_adk/patch.py new file mode 100644 index 000000000..a51a81c7b --- /dev/null +++ b/agentops/instrumentation/google_adk/patch.py @@ -0,0 +1,741 @@ +"""Patch functions for Google ADK instrumentation. + +This module patches key methods in Google ADK to: +1. Prevent ADK from creating its own spans +2. Create AgentOps spans that mirror ADK's telemetry +3. Extract and set proper attributes on spans +""" + +import json +import wrapt +from typing import Any +from opentelemetry import trace as opentelemetry_api_trace +from opentelemetry.trace import SpanKind as SpanKind + +from agentops.logging import logger +from agentops.semconv import SpanAttributes, ToolAttributes, MessageAttributes, AgentAttributes + + +_wrapped_methods = [] + + +class NoOpSpan: + """A no-op span that does nothing.""" + + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def set_attribute(self, *args, **kwargs): + pass + + def set_attributes(self, *args, **kwargs): + pass + + def add_event(self, *args, **kwargs): + pass + + def set_status(self, *args, **kwargs): + pass + + def update_name(self, *args, **kwargs): + pass + + def is_recording(self): + return False + + def end(self, *args, **kwargs): + pass + + def record_exception(self, *args, **kwargs): + pass + + +class NoOpTracer: + """A tracer that creates no-op spans to prevent ADK from creating real spans.""" + + def start_as_current_span(self, *args, **kwargs): + """Return a no-op context manager.""" + return NoOpSpan() + + def start_span(self, *args, **kwargs): + """Return a no-op span.""" + return NoOpSpan() + + def use_span(self, *args, **kwargs): + """Return a no-op context manager.""" + return NoOpSpan() + + +def _build_llm_request_for_trace(llm_request) -> dict: + """Build a dictionary representation of the LLM request for tracing.""" + from google.genai import types + + result = { + "model": llm_request.model, + "config": llm_request.config.model_dump(exclude_none=True, exclude="response_schema"), + "contents": [], + } + + for content in llm_request.contents: + parts = [part for part in content.parts if not hasattr(part, "inline_data") or not part.inline_data] + result["contents"].append(types.Content(role=content.role, parts=parts).model_dump(exclude_none=True)) + return result + + +def _extract_messages_from_contents(contents: list) -> dict: + """Extract messages from LLM contents for proper indexing.""" + attributes = {} + + for i, content in enumerate(contents): + # Get role and normalize it + raw_role = content.get("role", "user") + + # Hardcode role mapping for consistency + if raw_role == "model": + role = "assistant" + elif raw_role == "user": + role = "user" + elif raw_role == "system": + role = "system" + else: + role = raw_role # Keep original if not recognized + + parts = content.get("parts", []) + + # Set role + attributes[MessageAttributes.PROMPT_ROLE.format(i=i)] = role + + # Extract content from parts + text_parts = [] + for part in parts: + if "text" in part: + text_parts.append(part["text"]) + elif "function_call" in part: + # Function calls in prompts are typically from the model's previous responses + func_call = part["function_call"] + # Store as a generic attribute since MessageAttributes doesn't have prompt tool calls + attributes[f"gen_ai.prompt.{i}.function_call.name"] = func_call.get("name", "") + attributes[f"gen_ai.prompt.{i}.function_call.args"] = json.dumps(func_call.get("args", {})) + if "id" in func_call: + attributes[f"gen_ai.prompt.{i}.function_call.id"] = func_call["id"] + elif "function_response" in part: + # Function responses are typically user messages with tool results + func_resp = part["function_response"] + attributes[f"gen_ai.prompt.{i}.function_response.name"] = func_resp.get("name", "") + attributes[f"gen_ai.prompt.{i}.function_response.result"] = json.dumps(func_resp.get("response", {})) + if "id" in func_resp: + attributes[f"gen_ai.prompt.{i}.function_response.id"] = func_resp["id"] + + # Combine text parts + if text_parts: + attributes[MessageAttributes.PROMPT_CONTENT.format(i=i)] = "\n".join(text_parts) + + return attributes + + +def _extract_llm_attributes(llm_request_dict: dict, llm_response: Any) -> dict: + """Extract attributes from LLM request and response.""" + attributes = {} + + # Model + if "model" in llm_request_dict: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = llm_request_dict["model"] + + # Config + if "config" in llm_request_dict: + config = llm_request_dict["config"] + + # System instruction - commented out, now handled as a system role message + # if "system_instruction" in config: + # attributes[SpanAttributes.LLM_REQUEST_SYSTEM_INSTRUCTION] = config["system_instruction"] + + # Temperature + if "temperature" in config: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = config["temperature"] + + # Max output tokens + if "max_output_tokens" in config: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = config["max_output_tokens"] + + # Top P + if "top_p" in config: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = config["top_p"] + + # Top K + if "top_k" in config: + attributes[SpanAttributes.LLM_REQUEST_TOP_K] = config["top_k"] + + # Candidate count + if "candidate_count" in config: + attributes[SpanAttributes.LLM_REQUEST_CANDIDATE_COUNT] = config["candidate_count"] + + # Stop sequences + if "stop_sequences" in config: + attributes[SpanAttributes.LLM_REQUEST_STOP_SEQUENCES] = json.dumps(config["stop_sequences"]) + + # Response MIME type + if "response_mime_type" in config: + attributes["gen_ai.request.response_mime_type"] = config["response_mime_type"] + + # Tools/Functions + if "tools" in config: + # Extract tool definitions + for i, tool in enumerate(config["tools"]): + if "function_declarations" in tool: + for j, func in enumerate(tool["function_declarations"]): + attributes[f"gen_ai.request.tools.{j}.name"] = func.get("name", "") + attributes[f"gen_ai.request.tools.{j}.description"] = func.get("description", "") + + # Messages - handle system instruction and regular contents + message_index = 0 + + # First, add system instruction as a system role message if present + # TODO: This is not Chat Completions format but doing this for frontend rendering consistency + if "config" in llm_request_dict and "system_instruction" in llm_request_dict["config"]: + system_instruction = llm_request_dict["config"]["system_instruction"] + attributes[MessageAttributes.PROMPT_ROLE.format(i=message_index)] = "system" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=message_index)] = system_instruction + message_index += 1 + + # Then add regular contents with proper indexing + if "contents" in llm_request_dict: + for content in llm_request_dict["contents"]: + # Get role and normalize it + raw_role = content.get("role", "user") + + # Hardcode role mapping for consistency + if raw_role == "model": + role = "assistant" + elif raw_role == "user": + role = "user" + elif raw_role == "system": + role = "system" + else: + role = raw_role # Keep original if not recognized + + parts = content.get("parts", []) + + # Set role + attributes[MessageAttributes.PROMPT_ROLE.format(i=message_index)] = role + + # Extract content from parts + text_parts = [] + for part in parts: + if "text" in part: + text_parts.append(part["text"]) + elif "function_call" in part: + # Function calls in prompts are typically from the model's previous responses + func_call = part["function_call"] + # Store as a generic attribute since MessageAttributes doesn't have prompt tool calls + attributes[f"gen_ai.prompt.{message_index}.function_call.name"] = func_call.get("name", "") + attributes[f"gen_ai.prompt.{message_index}.function_call.args"] = json.dumps( + func_call.get("args", {}) + ) + if "id" in func_call: + attributes[f"gen_ai.prompt.{message_index}.function_call.id"] = func_call["id"] + elif "function_response" in part: + # Function responses are typically user messages with tool results + func_resp = part["function_response"] + attributes[f"gen_ai.prompt.{message_index}.function_response.name"] = func_resp.get("name", "") + attributes[f"gen_ai.prompt.{message_index}.function_response.result"] = json.dumps( + func_resp.get("response", {}) + ) + if "id" in func_resp: + attributes[f"gen_ai.prompt.{message_index}.function_response.id"] = func_resp["id"] + + # Combine text parts + if text_parts: + attributes[MessageAttributes.PROMPT_CONTENT.format(i=message_index)] = "\n".join(text_parts) + + message_index += 1 + + # Response + if llm_response: + try: + response_dict = json.loads(llm_response) if isinstance(llm_response, str) else llm_response + + # Response model + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + + # Usage metadata + if "usage_metadata" in response_dict: + usage = response_dict["usage_metadata"] + if "prompt_token_count" in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["prompt_token_count"] + if "candidates_token_count" in usage: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage["candidates_token_count"] + if "total_token_count" in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_token_count"] + + # Additional token details if available + if "prompt_tokens_details" in usage: + for detail in usage["prompt_tokens_details"]: + if "modality" in detail and "token_count" in detail: + attributes[f'gen_ai.usage.prompt_tokens.{detail["modality"].lower()}'] = detail[ + "token_count" + ] + + if "candidates_tokens_details" in usage: + for detail in usage["candidates_tokens_details"]: + if "modality" in detail and "token_count" in detail: + attributes[f'gen_ai.usage.completion_tokens.{detail["modality"].lower()}'] = detail[ + "token_count" + ] + + # Response content + if "content" in response_dict and "parts" in response_dict["content"]: + parts = response_dict["content"]["parts"] + + # Set completion role and content - hardcode role as 'assistant' for consistency + attributes[MessageAttributes.COMPLETION_ROLE.format(i=0)] = "assistant" + + text_parts = [] + tool_call_index = 0 + for part in parts: + if "text" in part: + text_parts.append(part["text"]) + elif "function_call" in part: + # This is a function call in the response + func_call = part["function_call"] + attributes[ + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=tool_call_index) + ] = func_call.get("name", "") + attributes[ + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=0, j=tool_call_index) + ] = json.dumps(func_call.get("args", {})) + if "id" in func_call: + attributes[ + MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=0, j=tool_call_index) + ] = func_call["id"] + tool_call_index += 1 + + if text_parts: + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=0)] = "\n".join(text_parts) + + # Finish reason + if "finish_reason" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_FINISH_REASON] = response_dict["finish_reason"] + + # Response ID + if "id" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_ID] = response_dict["id"] + + except Exception as e: + logger.debug(f"Failed to extract response attributes: {e}") + + return attributes + + +# Wrapper for Runner.run_async - REMOVED per user request +# We just pass through without creating a span +def _runner_run_async_wrapper(agentops_tracer): + def actual_decorator(wrapped, instance, args, kwargs): + async def new_function(): + # Just pass through without creating a span + async_gen = wrapped(*args, **kwargs) + async for item in async_gen: + yield item + + return new_function() + + return actual_decorator + + +# Wrapper for BaseAgent.run_async +def _base_agent_run_async_wrapper(agentops_tracer): + def actual_decorator(wrapped, instance, args, kwargs): + async def new_function(): + agent_name = instance.name if hasattr(instance, "name") else "unknown" + span_name = f"adk.agent.{agent_name}" + + with agentops_tracer.start_as_current_span(span_name, kind=SpanKind.CLIENT) as span: + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "agent") + span.set_attribute(SpanAttributes.LLM_SYSTEM, "gcp.vertex.agent") + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "agent") + + # Use AgentAttributes from semconv + span.set_attribute(AgentAttributes.AGENT_NAME, agent_name) + if hasattr(instance, "description"): + span.set_attribute("agent.description", instance.description) + if hasattr(instance, "model"): + span.set_attribute("agent.model", instance.model) + + # Extract invocation context if available + if len(args) > 0 and hasattr(args[0], "invocation_id"): + span.set_attribute("adk.invocation_id", args[0].invocation_id) + + async_gen = wrapped(*args, **kwargs) + async for item in async_gen: + yield item + + return new_function() + + return actual_decorator + + +# Wrapper for BaseLlmFlow._call_llm_async +def _base_llm_flow_call_llm_async_wrapper(agentops_tracer): + def actual_decorator(wrapped, instance, args, kwargs): + async def new_function(): + # Extract model info and llm_request if available + model_name = "unknown" + llm_request = None + + if len(args) > 1: + llm_request = args[1] + if hasattr(llm_request, "model"): + model_name = llm_request.model + + span_name = f"adk.llm.{model_name}" + + with agentops_tracer.start_as_current_span(span_name, kind=SpanKind.CLIENT) as span: + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "request") + span.set_attribute(SpanAttributes.LLM_SYSTEM, "gcp.vertex.agent") + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "request") + + # Extract and set attributes from llm_request before the call + if llm_request: + llm_request_dict = _build_llm_request_for_trace(llm_request) + # Only extract request attributes here, response will be set later by _finalize_model_response_event + llm_attrs = _extract_llm_attributes(llm_request_dict, None) + for key, value in llm_attrs.items(): + span.set_attribute(key, value) + + # Note: The actual LLM response attributes will be set by + # _finalize_model_response_event_wrapper when ADK finalizes the response + + async_gen = wrapped(*args, **kwargs) + async for item in async_gen: + yield item + + return new_function() + + return actual_decorator + + +# Wrapper for ADK telemetry functions - these add attributes to current span +def _adk_trace_tool_call_wrapper(agentops_tracer): + @wrapt.decorator + def wrapper(wrapped, instance, args, kwargs): + # Call original to preserve ADK behavior + result = wrapped(*args, **kwargs) + + tool_args = args[0] if args else kwargs.get("args") + current_span = opentelemetry_api_trace.get_current_span() + if current_span.is_recording() and tool_args is not None: + current_span.set_attribute(SpanAttributes.LLM_SYSTEM, "gcp.vertex.agent") + current_span.set_attribute("gcp.vertex.agent.tool_call_args", json.dumps(tool_args)) + return result + + return wrapper + + +def _adk_trace_tool_response_wrapper(agentops_tracer): + @wrapt.decorator + def wrapper(wrapped, instance, args, kwargs): + # Call original to preserve ADK behavior + result = wrapped(*args, **kwargs) + + invocation_context = args[0] if len(args) > 0 else kwargs.get("invocation_context") + event_id = args[1] if len(args) > 1 else kwargs.get("event_id") + function_response_event = args[2] if len(args) > 2 else kwargs.get("function_response_event") + + current_span = opentelemetry_api_trace.get_current_span() + if current_span.is_recording(): + current_span.set_attribute(SpanAttributes.LLM_SYSTEM, "gcp.vertex.agent") + if invocation_context: + current_span.set_attribute("gcp.vertex.agent.invocation_id", invocation_context.invocation_id) + if event_id: + current_span.set_attribute("gcp.vertex.agent.event_id", event_id) + if function_response_event: + current_span.set_attribute( + "gcp.vertex.agent.tool_response", function_response_event.model_dump_json(exclude_none=True) + ) + current_span.set_attribute("gcp.vertex.agent.llm_request", "{}") + current_span.set_attribute("gcp.vertex.agent.llm_response", "{}") + return result + + return wrapper + + +def _adk_trace_call_llm_wrapper(agentops_tracer): + @wrapt.decorator + def wrapper(wrapped, instance, args, kwargs): + # Call the original first to ensure ADK's behavior is preserved + result = wrapped(*args, **kwargs) + + invocation_context = args[0] if len(args) > 0 else kwargs.get("invocation_context") + event_id = args[1] if len(args) > 1 else kwargs.get("event_id") + llm_request = args[2] if len(args) > 2 else kwargs.get("llm_request") + llm_response = args[3] if len(args) > 3 else kwargs.get("llm_response") + + current_span = opentelemetry_api_trace.get_current_span() + if current_span.is_recording(): + current_span.set_attribute(SpanAttributes.LLM_SYSTEM, "gcp.vertex.agent") + if llm_request: + current_span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, llm_request.model) + if invocation_context: + current_span.set_attribute("gcp.vertex.agent.invocation_id", invocation_context.invocation_id) + current_span.set_attribute("gcp.vertex.agent.session_id", invocation_context.session.id) + if event_id: + current_span.set_attribute("gcp.vertex.agent.event_id", event_id) + + if llm_request: + llm_request_dict = _build_llm_request_for_trace(llm_request) + current_span.set_attribute("gcp.vertex.agent.llm_request", json.dumps(llm_request_dict)) + + # Extract and set all attributes including usage + llm_response_json = None + if llm_response: + llm_response_json = llm_response.model_dump_json(exclude_none=True) + current_span.set_attribute("gcp.vertex.agent.llm_response", llm_response_json) + + llm_attrs = _extract_llm_attributes(llm_request_dict, llm_response_json) + for key, value in llm_attrs.items(): + current_span.set_attribute(key, value) + + return result + + return wrapper + + +def _adk_trace_send_data_wrapper(agentops_tracer): + @wrapt.decorator + def wrapper(wrapped, instance, args, kwargs): + # Call original to preserve ADK behavior + result = wrapped(*args, **kwargs) + + invocation_context = args[0] if len(args) > 0 else kwargs.get("invocation_context") + event_id = args[1] if len(args) > 1 else kwargs.get("event_id") + data = args[2] if len(args) > 2 else kwargs.get("data") + + current_span = opentelemetry_api_trace.get_current_span() + if current_span.is_recording(): + if invocation_context: + current_span.set_attribute("gcp.vertex.agent.invocation_id", invocation_context.invocation_id) + if event_id: + current_span.set_attribute("gcp.vertex.agent.event_id", event_id) + if data: + from google.genai import types + + current_span.set_attribute( + "gcp.vertex.agent.data", + json.dumps( + [ + types.Content(role=content.role, parts=content.parts).model_dump(exclude_none=True) + for content in data + ] + ), + ) + return result + + return wrapper + + +# Wrapper for _finalize_model_response_event to capture response attributes +def _finalize_model_response_event_wrapper(agentops_tracer): + def actual_decorator(wrapped, instance, args, kwargs): + # Call the original method + result = wrapped(*args, **kwargs) + + # Extract llm_request and llm_response from args + llm_request = args[0] if len(args) > 0 else kwargs.get("llm_request") + llm_response = args[1] if len(args) > 1 else kwargs.get("llm_response") + + # Get the current span and set response attributes + current_span = opentelemetry_api_trace.get_current_span() + if current_span.is_recording() and llm_request and llm_response: + span_name = getattr(current_span, "name", "") + if "adk.llm" in span_name: + # Build request dict + llm_request_dict = _build_llm_request_for_trace(llm_request) + + # Extract response attributes + llm_response_json = llm_response.model_dump_json(exclude_none=True) + llm_attrs = _extract_llm_attributes(llm_request_dict, llm_response_json) + + # Only set response-related attributes (request attrs already set) + for key, value in llm_attrs.items(): + if "usage" in key or "completion" in key or "response" in key: + current_span.set_attribute(key, value) + + return result + + return actual_decorator + + +# Wrapper for tool execution that creates a single merged span +def _call_tool_async_wrapper(agentops_tracer): + """Wrapper that creates a single span for tool call and response.""" + + def actual_decorator(wrapped, instance, args, kwargs): + async def new_function(): + # Extract tool info from args + tool = args[0] if args else kwargs.get("tool") + tool_args = args[1] if len(args) > 1 else kwargs.get("args", {}) + tool_context = args[2] if len(args) > 2 else kwargs.get("tool_context") + + tool_name = getattr(tool, "name", "unknown_tool") + span_name = f"adk.tool.{tool_name}" + + with agentops_tracer.start_as_current_span(span_name, kind=SpanKind.CLIENT) as span: + span.set_attribute(SpanAttributes.AGENTOPS_SPAN_KIND, "tool") + span.set_attribute(SpanAttributes.LLM_SYSTEM, "gcp.vertex.agent") + span.set_attribute(SpanAttributes.AGENTOPS_ENTITY_NAME, "tool") + + # Set tool call attributes + span.set_attribute(ToolAttributes.TOOL_NAME, tool_name) + span.set_attribute(ToolAttributes.TOOL_PARAMETERS, json.dumps(tool_args)) + + if tool_context and hasattr(tool_context, "function_call_id"): + span.set_attribute("tool.call_id", tool_context.function_call_id) + if tool_context and hasattr(tool_context, "invocation_context"): + span.set_attribute("adk.invocation_id", tool_context.invocation_context.invocation_id) + + # Execute the tool + result = await wrapped(*args, **kwargs) + + # Set tool response attributes + if result: + if isinstance(result, dict): + span.set_attribute(ToolAttributes.TOOL_RESULT, json.dumps(result)) + else: + span.set_attribute(ToolAttributes.TOOL_RESULT, str(result)) + + return result + + return new_function() + + return actual_decorator + + +def _patch(module_name: str, object_name: str, method_name: str, wrapper_function, agentops_tracer): + """Helper to apply a patch and keep track of it.""" + try: + module = __import__(module_name, fromlist=[object_name]) + obj = getattr(module, object_name) + wrapt.wrap_function_wrapper(obj, method_name, wrapper_function(agentops_tracer)) + _wrapped_methods.append((obj, method_name)) + logger.debug(f"Successfully wrapped {module_name}.{object_name}.{method_name}") + except Exception as e: + logger.warning(f"Could not wrap {module_name}.{object_name}.{method_name}: {e}") + + +def _patch_module_function(module_name: str, function_name: str, wrapper_function, agentops_tracer): + """Helper to patch module-level functions.""" + try: + module = __import__(module_name, fromlist=[function_name]) + wrapt.wrap_function_wrapper(module, function_name, wrapper_function(agentops_tracer)) + _wrapped_methods.append((module, function_name)) + logger.debug(f"Successfully wrapped {module_name}.{function_name}") + except Exception as e: + logger.warning(f"Could not wrap {module_name}.{function_name}: {e}") + + +def patch_adk(agentops_tracer): + """Apply all patches to Google ADK modules.""" + logger.debug("Applying Google ADK patches for AgentOps instrumentation") + + # First, disable ADK's own tracer by replacing it with our NoOpTracer + noop_tracer = NoOpTracer() + try: + import google.adk.telemetry as adk_telemetry + + # Replace the tracer with our no-op version + adk_telemetry.tracer = noop_tracer + logger.debug("Replaced ADK's tracer with NoOpTracer") + except Exception as e: + logger.warning(f"Failed to replace ADK tracer: {e}") + + # Also replace the tracer in all modules that have already imported it + modules_to_patch = [ + "google.adk.runners", + "google.adk.agents.base_agent", + "google.adk.flows.llm_flows.base_llm_flow", + "google.adk.flows.llm_flows.functions", + ] + + import sys + + for module_name in modules_to_patch: + if module_name in sys.modules: + try: + module = sys.modules[module_name] + if hasattr(module, "tracer"): + module.tracer = noop_tracer + logger.debug(f"Replaced tracer in {module_name}") + except Exception as e: + logger.warning(f"Failed to replace tracer in {module_name}: {e}") + + # Patch methods that create top-level AgentOps spans + # Skip runner patching - we don't want adk.runner spans + _patch("google.adk.agents.base_agent", "BaseAgent", "run_async", _base_agent_run_async_wrapper, agentops_tracer) + + # Patch ADK's telemetry functions to add attributes to AgentOps spans + _patch_module_function("google.adk.telemetry", "trace_tool_call", _adk_trace_tool_call_wrapper, agentops_tracer) + _patch_module_function( + "google.adk.telemetry", "trace_tool_response", _adk_trace_tool_response_wrapper, agentops_tracer + ) + _patch_module_function("google.adk.telemetry", "trace_call_llm", _adk_trace_call_llm_wrapper, agentops_tracer) + + _patch_module_function("google.adk.telemetry", "trace_send_data", _adk_trace_send_data_wrapper, agentops_tracer) + + # Patch method that creates nested spans + _patch( + "google.adk.flows.llm_flows.base_llm_flow", + "BaseLlmFlow", + "_call_llm_async", + _base_llm_flow_call_llm_async_wrapper, + agentops_tracer, + ) + + # Also patch _finalize_model_response_event to capture response attributes + _patch( + "google.adk.flows.llm_flows.base_llm_flow", + "BaseLlmFlow", + "_finalize_model_response_event", + _finalize_model_response_event_wrapper, + agentops_tracer, + ) + + # Patch tool execution to create merged tool spans + _patch_module_function( + "google.adk.flows.llm_flows.functions", "__call_tool_async", _call_tool_async_wrapper, agentops_tracer + ) + + logger.info("Google ADK patching complete") + + +def unpatch_adk(): + """Remove all patches from Google ADK modules.""" + logger.debug("Removing Google ADK patches") + + # Restore ADK's tracer + try: + import google.adk.telemetry as adk_telemetry + from opentelemetry import trace + + adk_telemetry.tracer = trace.get_tracer("gcp.vertex.agent") + logger.debug("Restored ADK's built-in tracer") + except Exception as e: + logger.warning(f"Failed to restore ADK tracer: {e}") + + # Unwrap all methods + for obj, method_name in _wrapped_methods: + try: + if hasattr(getattr(obj, method_name), "__wrapped__"): + original = getattr(obj, method_name).__wrapped__ + setattr(obj, method_name, original) + logger.debug(f"Successfully unwrapped {obj}.{method_name}") + except Exception as e: + logger.warning(f"Failed to unwrap {obj}.{method_name}: {e}") + + _wrapped_methods.clear() + logger.info("Google ADK unpatching complete")