diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index b9cdcb226..b3e4ccbfc 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -72,6 +72,11 @@ def get_instance(self) -> BaseInstrumentor: class_name="OpenAIAgentsInstrumentor", provider_import_name="agents", ), + InstrumentorLoader( + module_name="agentops.instrumentation.google_generativeai", + class_name="GoogleGenerativeAIInstrumentor", + provider_import_name="google.genai", + ), ] diff --git a/agentops/instrumentation/google_generativeai/README.md b/agentops/instrumentation/google_generativeai/README.md new file mode 100644 index 000000000..f03fe9119 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/README.md @@ -0,0 +1,33 @@ +# Google Generative AI (Gemini) Instrumentation + +This module provides OpenTelemetry instrumentation for Google's Generative AI (Gemini) API. The instrumentation allows you to trace all API calls made using the `google-genai` Python SDK, capturing: + +- Model parameters (temperature, max_tokens, etc.) +- Prompt content (with privacy controls) +- Response text and token usage +- Streaming metrics +- Token counting +- Performance and error data + +## Supported Features + +The instrumentation covers all major API methods including: + +### Client-Based API +- `client.models.generate_content` +- `client.models.generate_content_stream` +- `client.models.count_tokens` +- `client.models.compute_tokens` +- And their corresponding async variants + +## Metrics + +The instrumentation captures the following metrics: + +- Input tokens used +- Output tokens generated +- Total tokens consumed +- Operation duration +- Exception counts + +These metrics are available as OpenTelemetry span attributes and can be viewed in your observability platform of choice when properly configured. \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/__init__.py b/agentops/instrumentation/google_generativeai/__init__.py new file mode 100644 index 000000000..d4b9e4073 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/__init__.py @@ -0,0 +1,38 @@ +"""Google Generative AI (Gemini) API instrumentation. + +This module provides instrumentation for the Google Generative AI (Gemini) API, +including content generation, streaming, and chat functionality. +""" + +import logging +from typing import Collection + +def get_version() -> str: + """Get the version of the Google Generative AI SDK, or 'unknown' if not found + + Attempts to retrieve the installed version of the Google Generative AI SDK using importlib.metadata. + Falls back to 'unknown' if the version cannot be determined. + + Returns: + The version string of the Google Generative AI SDK or 'unknown' + """ + try: + from importlib.metadata import version + return version("google-genai") + except ImportError: + logger.debug("Could not find Google Generative AI SDK version") + return "unknown" + +LIBRARY_NAME = "google-genai" +LIBRARY_VERSION: str = get_version() + +logger = logging.getLogger(__name__) + +# Import after defining constants to avoid circular imports +from agentops.instrumentation.google_generativeai.instrumentor import GoogleGenerativeAIInstrumentor # noqa: E402 + +__all__ = [ + "LIBRARY_NAME", + "LIBRARY_VERSION", + "GoogleGenerativeAIInstrumentor", +] \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/attributes/__init__.py b/agentops/instrumentation/google_generativeai/attributes/__init__.py new file mode 100644 index 000000000..629b48a58 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/attributes/__init__.py @@ -0,0 +1,25 @@ +"""Attribute extractors for Google Generative AI instrumentation.""" + +from agentops.instrumentation.google_generativeai.attributes.common import ( + get_common_instrumentation_attributes, + extract_request_attributes, +) +from agentops.instrumentation.google_generativeai.attributes.model import ( + get_model_attributes, + get_generate_content_attributes, + get_stream_attributes, + get_token_counting_attributes, +) +from agentops.instrumentation.google_generativeai.attributes.chat import ( + get_chat_attributes, +) + +__all__ = [ + "get_common_instrumentation_attributes", + "extract_request_attributes", + "get_model_attributes", + "get_generate_content_attributes", + "get_stream_attributes", + "get_chat_attributes", + "get_token_counting_attributes", +] \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/attributes/chat.py b/agentops/instrumentation/google_generativeai/attributes/chat.py new file mode 100644 index 000000000..bb10b5619 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/attributes/chat.py @@ -0,0 +1,125 @@ +"""Chat attribute extraction for Google Generative AI instrumentation.""" + +from typing import Dict, Any, Optional, Tuple, List, Union + +from agentops.logging import logger +from agentops.semconv import SpanAttributes, LLMRequestTypeValues, MessageAttributes +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.instrumentation.google_generativeai.attributes.common import ( + extract_request_attributes, + get_common_instrumentation_attributes, +) +from agentops.instrumentation.google_generativeai.attributes.model import ( + _extract_content_from_prompt, + _set_response_attributes, +) + + +def _extract_message_content(message: Any) -> str: + """Extract text content from a chat message. + + Handles the various message formats in the Gemini chat API. + + Args: + message: The message to extract content from + + Returns: + Extracted text as a string + """ + if isinstance(message, str): + return message + + if isinstance(message, dict): + if "content" in message: + return _extract_content_from_prompt(message["content"]) + if "text" in message: + return message["text"] + + if hasattr(message, "content"): + return _extract_content_from_prompt(message.content) + + if hasattr(message, "text"): + return message.text + + return "" + + +def _set_chat_history_attributes(attributes: AttributeMap, args: Tuple, kwargs: Dict[str, Any]) -> None: + """Extract and set chat history attributes from the request. + + Args: + attributes: The attribute dictionary to update + args: Positional arguments to the method + kwargs: Keyword arguments to the method + """ + messages = [] + if 'message' in kwargs: + messages = [kwargs['message']] + elif args and len(args) > 0: + messages = [args[0]] + elif 'messages' in kwargs: + messages = kwargs['messages'] + + if not messages: + return + + for i, message in enumerate(messages): + try: + content = _extract_message_content(message) + if content: + role = "user" + + if isinstance(message, dict) and "role" in message: + role = message["role"] + elif hasattr(message, "role"): + role = message.role + + attributes[MessageAttributes.PROMPT_CONTENT.format(i=i)] = content + attributes[MessageAttributes.PROMPT_ROLE.format(i=i)] = role + except Exception as e: + logger.debug(f"Error extracting chat message at index {i}: {e}") + + +def get_chat_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes for chat session methods. + + This function handles attribute extraction for chat session operations, + particularly the send_message method. + + Args: + args: Positional arguments to the method + kwargs: Keyword arguments to the method + return_value: Return value from the method + + Returns: + Dictionary of extracted attributes + """ + attributes = get_common_instrumentation_attributes() + attributes[SpanAttributes.LLM_SYSTEM] = "Gemini" + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.CHAT.value + + if kwargs: + kwargs_attributes = extract_request_attributes(kwargs) + attributes.update(kwargs_attributes) + + chat_session = None + if args and len(args) >= 1: + chat_session = args[0] + + if chat_session and hasattr(chat_session, "model"): + if isinstance(chat_session.model, str): + attributes[SpanAttributes.LLM_REQUEST_MODEL] = chat_session.model + elif hasattr(chat_session.model, "name"): + attributes[SpanAttributes.LLM_REQUEST_MODEL] = chat_session.model.name + + if args or kwargs: + _set_chat_history_attributes(attributes, args or (), kwargs or {}) + + if return_value is not None: + _set_response_attributes(attributes, return_value) + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/attributes/common.py b/agentops/instrumentation/google_generativeai/attributes/common.py new file mode 100644 index 000000000..8ae7284ac --- /dev/null +++ b/agentops/instrumentation/google_generativeai/attributes/common.py @@ -0,0 +1,80 @@ +"""Common attribute extraction for Google Generative AI instrumentation.""" + +from typing import Dict, Any, Optional + +from agentops.logging import logger +from agentops.semconv import InstrumentationAttributes, SpanAttributes, LLMRequestTypeValues +from agentops.instrumentation.common.attributes import AttributeMap, get_common_attributes, _extract_attributes_from_mapping +from agentops.instrumentation.google_generativeai import LIBRARY_NAME, LIBRARY_VERSION + +# Common mapping for config parameters +REQUEST_CONFIG_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_REQUEST_TEMPERATURE: "temperature", + SpanAttributes.LLM_REQUEST_MAX_TOKENS: "max_output_tokens", + SpanAttributes.LLM_REQUEST_TOP_P: "top_p", + SpanAttributes.LLM_REQUEST_TOP_K: "top_k", + SpanAttributes.LLM_REQUEST_SEED: "seed", + SpanAttributes.LLM_REQUEST_SYSTEM_INSTRUCTION: "system_instruction", + SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY: "presence_penalty", + SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY: "frequency_penalty", + SpanAttributes.LLM_REQUEST_STOP_SEQUENCES: "stop_sequences", + SpanAttributes.LLM_REQUEST_CANDIDATE_COUNT: "candidate_count", +} + +def get_common_instrumentation_attributes() -> AttributeMap: + """Get common instrumentation attributes for the Google Generative AI instrumentation. + + This combines the generic AgentOps attributes with Google Generative AI specific library attributes. + + Returns: + Dictionary of common instrumentation attributes + """ + attributes = get_common_attributes() + attributes.update({ + InstrumentationAttributes.LIBRARY_NAME: LIBRARY_NAME, + InstrumentationAttributes.LIBRARY_VERSION: LIBRARY_VERSION, + }) + return attributes + + +def extract_request_attributes(kwargs: Dict[str, Any]) -> AttributeMap: + """Extract request attributes from the function arguments. + + Extracts common request parameters that apply to both content generation + and chat completions, focusing on model parameters and generation settings. + + Args: + kwargs: Request keyword arguments + + Returns: + Dictionary of extracted request attributes + """ + attributes = {} + + if 'model' in kwargs: + model = kwargs["model"] + + # Handle string model names + if isinstance(model, str): + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model + # Handle model objects with _model_name or name attribute + elif hasattr(model, '_model_name'): + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model._model_name + elif hasattr(model, 'name'): + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model.name + + config = kwargs.get('config') + + if config: + try: + attributes.update(_extract_attributes_from_mapping( + config.__dict__ if hasattr(config, '__dict__') else config, + REQUEST_CONFIG_ATTRIBUTES + )) + except Exception as e: + logger.debug(f"Error extracting config parameters: {e}") + + if 'stream' in kwargs: + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs['stream'] + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/attributes/model.py b/agentops/instrumentation/google_generativeai/attributes/model.py new file mode 100644 index 000000000..6489eb601 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/attributes/model.py @@ -0,0 +1,274 @@ +"""Model attribute extraction for Google Generative AI instrumentation.""" + +from typing import Dict, Any, Optional, Tuple, List, Union + +from agentops.logging import logger +from agentops.semconv import SpanAttributes, LLMRequestTypeValues, MessageAttributes +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.instrumentation.google_generativeai.attributes.common import ( + extract_request_attributes, + get_common_instrumentation_attributes, +) + + +def _extract_content_from_prompt(content: Any) -> str: + """Extract prompt text from content. + + Handles the various content formats that Google's Generative AI SDK accepts, + including strings, ContentDict, lists of parts, etc. + + Args: + content: The content object to extract text from + + Returns: + Extracted text as a string + """ + # Direct string case + if isinstance(content, str): + return content + + # Lists of parts/content + if isinstance(content, list): + text = "" + for item in content: + if isinstance(item, str): + text += item + "\n" + elif isinstance(item, dict) and "text" in item: + text += item["text"] + "\n" + elif hasattr(item, "text"): + text += item.text + "\n" + # Handle content as a list with mixed types + elif hasattr(item, "parts"): + parts = item.parts + for part in parts: + if isinstance(part, str): + text += part + "\n" + elif hasattr(part, "text"): + text += part.text + "\n" + return text + + # Dict with text key + if isinstance(content, dict) and "text" in content: + return content["text"] + + # Content object with text attribute + if hasattr(content, "text"): + return content.text + + # Content object with parts attribute + if hasattr(content, "parts"): + text = "" + for part in content.parts: + if isinstance(part, str): + text += part + "\n" + elif hasattr(part, "text"): + text += part.text + "\n" + return text + + # Other object types - try to convert to string + try: + return str(content) + except Exception: + return "" + + +def _set_prompt_attributes(attributes: AttributeMap, args: Tuple, kwargs: Dict[str, Any]) -> None: + """Extract and set prompt attributes from the request. + + Respects privacy controls and handles the various ways prompts can be specified + in the Google Generative AI API. + + Args: + attributes: The attribute dictionary to update + args: Positional arguments to the method + kwargs: Keyword arguments to the method + """ + + content = None + if args and len(args) > 0: + content = args[0] + elif 'contents' in kwargs: + content = kwargs['contents'] + elif 'content' in kwargs: + content = kwargs['content'] + + if content is None: + return + + if isinstance(content, list): + for i, item in enumerate(content): + try: + extracted_text = _extract_content_from_prompt(item) + if extracted_text: + attributes[MessageAttributes.PROMPT_CONTENT.format(i=i)] = extracted_text + role = "user" + if isinstance(item, dict) and "role" in item: + role = item["role"] + elif hasattr(item, "role"): + role = item.role + attributes[MessageAttributes.PROMPT_ROLE.format(i=i)] = role + except Exception as e: + logger.debug(f"Error extracting prompt content at index {i}: {e}") + else: + try: + extracted_text = _extract_content_from_prompt(content) + if extracted_text: + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = extracted_text + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + except Exception as e: + logger.debug(f"Error extracting prompt content: {e}") + + +def _set_response_attributes(attributes: AttributeMap, response: Any) -> None: + """Extract and set response attributes from the completion response. + + Args: + attributes: The attribute dictionary to update + response: The response from the API + """ + if response is None: + return + + if hasattr(response, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response.model + + if hasattr(response, "usage_metadata"): + usage = response.usage_metadata + if hasattr(usage, "prompt_token_count"): + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage.prompt_token_count + if hasattr(usage, "candidates_token_count"): + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage.candidates_token_count + if hasattr(usage, "total_token_count"): + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage.total_token_count + + try: + if hasattr(response, "text"): + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=0)] = response.text + attributes[MessageAttributes.COMPLETION_ROLE.format(i=0)] = "assistant" + elif hasattr(response, "candidates"): + # List of candidates + for i, candidate in enumerate(response.candidates): + if hasattr(candidate, "content") and hasattr(candidate.content, "parts"): + parts = candidate.content.parts + text = "" + for part in parts: + if isinstance(part, str): + text += part + elif hasattr(part, "text"): + text += part.text + + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=i)] = text + attributes[MessageAttributes.COMPLETION_ROLE.format(i=i)] = "assistant" + + if hasattr(candidate, "finish_reason"): + attributes[MessageAttributes.COMPLETION_FINISH_REASON.format(i=i)] = candidate.finish_reason + except Exception as e: + logger.debug(f"Error extracting completion content: {e}") + + +def get_model_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes for GenerativeModel methods. + + This function handles attribute extraction for the general model operations, + focusing on the common parameters and pattern shared by multiple methods. + + Args: + args: Positional arguments to the method + kwargs: Keyword arguments to the method + return_value: Return value from the method + + Returns: + Dictionary of extracted attributes + """ + attributes = get_common_instrumentation_attributes() + attributes[SpanAttributes.LLM_SYSTEM] = "Gemini" + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.CHAT.value + + if kwargs: + kwargs_attributes = extract_request_attributes(kwargs) + attributes.update(kwargs_attributes) + + if args or kwargs: + _set_prompt_attributes(attributes, args or (), kwargs or {}) + + if return_value is not None: + _set_response_attributes(attributes, return_value) + + return attributes + + +def get_generate_content_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes for the generate_content method. + + This specialized extractor handles the generate_content method, + which is the primary way to interact with Gemini models. + + Args: + args: Positional arguments to the method + kwargs: Keyword arguments to the method + return_value: Return value from the method + + Returns: + Dictionary of extracted attributes + """ + return get_model_attributes(args, kwargs, return_value) + +def get_token_counting_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes for token counting operations. + + This specialized extractor handles token counting operations. + + Args: + args: Positional arguments to the method + kwargs: Keyword arguments to the method + return_value: Return value from the method + + Returns: + Dictionary of extracted attributes + """ + attributes = get_common_instrumentation_attributes() + attributes[SpanAttributes.LLM_SYSTEM] = "Gemini" + attributes[SpanAttributes.LLM_REQUEST_TYPE] = "token_count" + + # Process kwargs if available + if kwargs: + kwargs_attributes = extract_request_attributes(kwargs) + attributes.update(kwargs_attributes) + + # Set token count from response + if return_value is not None: + if hasattr(return_value, "total_tokens"): + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = return_value.total_tokens + elif hasattr(return_value, "total_token_count"): + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = return_value.total_token_count + + return attributes + + +def get_stream_attributes(stream: Any) -> AttributeMap: + """Extract attributes from a stream object. + + Args: + stream: The stream object to extract attributes from + + Returns: + Dictionary of extracted attributes + """ + attributes = {} + + if hasattr(stream, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = stream.model + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/instrumentor.py b/agentops/instrumentation/google_generativeai/instrumentor.py new file mode 100644 index 000000000..ea29dba69 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/instrumentor.py @@ -0,0 +1,195 @@ +"""Google Generative AI Instrumentation for AgentOps + +This module provides instrumentation for the Google Generative AI API, implementing OpenTelemetry +instrumentation for Gemini model requests and responses. + +We focus on instrumenting the following key endpoints: +- ChatSession.send_message - Chat message API +- Streaming responses - Special handling for streaming responses +""" +from typing import List, Optional, Collection +from opentelemetry.trace import get_tracer +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter +from wrapt import wrap_function_wrapper + +from agentops.logging import logger +from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.instrumentation.google_generativeai import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.google_generativeai.attributes.model import ( + get_generate_content_attributes, + get_token_counting_attributes, +) +from agentops.instrumentation.google_generativeai.attributes.chat import ( + get_chat_attributes, +) +from agentops.instrumentation.google_generativeai.stream_wrapper import ( + generate_content_stream_wrapper, + generate_content_stream_async_wrapper, +) +from agentops.semconv import Meters + +# Methods to wrap for instrumentation +WRAPPED_METHODS: List[WrapConfig] = [ + # Client-based API methods + WrapConfig( + trace_name="gemini.generate_content", + package="google.genai.models", + class_name="Models", + method_name="generate_content", + handler=get_generate_content_attributes, + ), + WrapConfig( + trace_name="gemini.count_tokens", + package="google.genai.models", + class_name="Models", + method_name="count_tokens", + handler=get_token_counting_attributes, + ), + WrapConfig( + trace_name="gemini.compute_tokens", + package="google.genai.models", + class_name="Models", + method_name="compute_tokens", + handler=get_token_counting_attributes, + ), + + # Async client-based API methods + WrapConfig( + trace_name="gemini.generate_content", + package="google.genai.models", + class_name="AsyncModels", + method_name="generate_content", + handler=get_generate_content_attributes, + is_async=True, + ), + WrapConfig( + trace_name="gemini.count_tokens", + package="google.genai.models", + class_name="AsyncModels", + method_name="count_tokens", + handler=get_token_counting_attributes, + is_async=True, + ), + WrapConfig( + trace_name="gemini.compute_tokens", + package="google.genai.models", + class_name="AsyncModels", + method_name="compute_tokens", + handler=get_token_counting_attributes, + is_async=True, + ), +] + +# Streaming methods that need special handling +STREAMING_METHODS = [ + # Client API + { + "module": "google.genai.models", + "class_method": "Models.generate_content_stream", + "wrapper": generate_content_stream_wrapper, + "is_async": False, + }, + { + "module": "google.genai.models", + "class_method": "AsyncModels.generate_content_stream", + "wrapper": generate_content_stream_async_wrapper, + "is_async": True, + }, +] + + +class GoogleGenerativeAIInstrumentor(BaseInstrumentor): + """An instrumentor for Google Generative AI (Gemini) API. + + This class provides instrumentation for Google's Generative AI API by wrapping key methods + in the client library and capturing telemetry data. It supports both synchronous and + asynchronous API calls, including streaming responses. + + It captures metrics including token usage, operation duration, and exceptions. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + """Return packages required for instrumentation. + + Returns: + A collection of package specifications required for this instrumentation. + """ + return ["google-genai >= 0.1.0"] + + def _instrument(self, **kwargs): + """Instrument the Google Generative AI API. + + This method wraps the key methods in the Google Generative AI client to capture + telemetry data for API calls. It sets up tracers, meters, and wraps the appropriate + methods for instrumentation. + + Args: + **kwargs: Configuration options for instrumentation. + """ + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) + + tokens_histogram = meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used with Google Generative AI models", + ) + + duration_histogram = meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="Google Generative AI operation duration", + ) + + exception_counter = meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during Google Generative AI completions", + ) + + # Standard method wrapping approach for regular methods + for wrap_config in WRAPPED_METHODS: + try: + wrap(wrap_config, tracer) + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Could not wrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}: {e}") + + # Special handling for streaming responses + for stream_method in STREAMING_METHODS: + try: + wrap_function_wrapper( + stream_method["module"], + stream_method["class_method"], + stream_method["wrapper"](tracer), + ) + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Failed to wrap {stream_method['module']}.{stream_method['class_method']}: {e}") + + def _uninstrument(self, **kwargs): + """Remove instrumentation from Google Generative AI API. + + This method unwraps all methods that were wrapped during instrumentation, + restoring the original behavior of the Google Generative AI API. + + Args: + **kwargs: Configuration options for uninstrumentation. + """ + # Unwrap standard methods + for wrap_config in WRAPPED_METHODS: + try: + unwrap(wrap_config) + except Exception as e: + logger.debug(f"Failed to unwrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}: {e}") + + # Unwrap streaming methods + from opentelemetry.instrumentation.utils import unwrap as otel_unwrap + for stream_method in STREAMING_METHODS: + try: + otel_unwrap(stream_method["module"], stream_method["class_method"]) + logger.debug(f"Unwrapped streaming method {stream_method['module']}.{stream_method['class_method']}") + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Failed to unwrap {stream_method['module']}.{stream_method['class_method']}: {e}") \ No newline at end of file diff --git a/agentops/instrumentation/google_generativeai/stream_wrapper.py b/agentops/instrumentation/google_generativeai/stream_wrapper.py new file mode 100644 index 000000000..b92a67bc5 --- /dev/null +++ b/agentops/instrumentation/google_generativeai/stream_wrapper.py @@ -0,0 +1,234 @@ +"""Google Generative AI stream wrapper implementation. + +This module provides wrappers for Google Generative AI's streaming functionality, +focusing on the generate_content_stream method for both sync and async operations. +It instruments streams to collect telemetry data for monitoring and analysis. +""" + +import logging +from typing import TypeVar, Any, Awaitable, Generator, AsyncGenerator + +from opentelemetry import context as context_api +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY + +from agentops.semconv import SpanAttributes, LLMRequestTypeValues, CoreAttributes, MessageAttributes +from agentops.instrumentation.common.wrappers import _with_tracer_wrapper +from agentops.instrumentation.google_generativeai.attributes.model import ( + get_generate_content_attributes, + get_stream_attributes, +) +from agentops.instrumentation.google_generativeai.attributes.common import ( + extract_request_attributes, +) + +logger = logging.getLogger(__name__) + +T = TypeVar('T') + + +@_with_tracer_wrapper +def generate_content_stream_wrapper(tracer, wrapped, instance, args, kwargs): + """Wrapper for the GenerativeModel.generate_content_stream method. + + This wrapper creates spans for tracking stream performance and processes + the streaming responses to collect telemetry data. + + Args: + tracer: The OpenTelemetry tracer to use + wrapped: The original stream method + instance: The instance the method is bound to + args: Positional arguments to the method + kwargs: Keyword arguments to the method + + Returns: + A wrapped generator that captures telemetry data + """ + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + span = tracer.start_span( + "gemini.generate_content_stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + # Extract request parameters and custom config + request_attributes = get_generate_content_attributes(args=args, kwargs=kwargs) + for key, value in request_attributes.items(): + span.set_attribute(key, value) + + # Mark as streaming request + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + # Extract custom parameters from config (if present) + if 'config' in kwargs: + config_attributes = extract_request_attributes({'config': kwargs['config']}) + for key, value in config_attributes.items(): + span.set_attribute(key, value) + + try: + stream = wrapped(*args, **kwargs) + + # Extract model information if available + stream_attributes = get_stream_attributes(stream) + for key, value in stream_attributes.items(): + span.set_attribute(key, value) + + def instrumented_stream(): + """Generator that wraps the original stream with instrumentation. + + Yields: + Items from the original stream with added instrumentation + """ + full_text = "" + last_chunk_with_metadata = None + + try: + for chunk in stream: + # Keep track of the last chunk that might have metadata + if hasattr(chunk, "usage_metadata") and chunk.usage_metadata: + last_chunk_with_metadata = chunk + + # Track token count (approximate by word count if metadata not available) + if hasattr(chunk, "text"): + full_text += chunk.text + + yield chunk + + # Set final content when complete + if full_text: + span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_text) + span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + + # Get token usage from the last chunk if available + if last_chunk_with_metadata and hasattr(last_chunk_with_metadata, "usage_metadata"): + metadata = last_chunk_with_metadata.usage_metadata + if hasattr(metadata, "prompt_token_count"): + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, metadata.prompt_token_count) + if hasattr(metadata, "candidates_token_count"): + span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, metadata.candidates_token_count) + if hasattr(metadata, "total_token_count"): + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metadata.total_token_count) + + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + finally: + span.end() + + return instrumented_stream() + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise + + +@_with_tracer_wrapper +async def generate_content_stream_async_wrapper(tracer, wrapped, instance, args, kwargs): + """Wrapper for the async GenerativeModel.generate_content_stream method. + + This wrapper creates spans for tracking async stream performance and processes + the streaming responses to collect telemetry data. + + Args: + tracer: The OpenTelemetry tracer to use + wrapped: The original async stream method + instance: The instance the method is bound to + args: Positional arguments to the method + kwargs: Keyword arguments to the method + + Returns: + A wrapped async generator that captures telemetry data + """ + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + span = tracer.start_span( + "gemini.generate_content_stream_async", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + # Extract request parameters and custom config + request_attributes = get_generate_content_attributes(args=args, kwargs=kwargs) + for key, value in request_attributes.items(): + span.set_attribute(key, value) + + # Mark as streaming request + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + # Extract custom parameters from config (if present) + if 'config' in kwargs: + config_attributes = extract_request_attributes({'config': kwargs['config']}) + for key, value in config_attributes.items(): + span.set_attribute(key, value) + + try: + stream = await wrapped(*args, **kwargs) + + # Extract model information if available + stream_attributes = get_stream_attributes(stream) + for key, value in stream_attributes.items(): + span.set_attribute(key, value) + + async def instrumented_stream(): + """Async generator that wraps the original stream with instrumentation. + + Yields: + Items from the original stream with added instrumentation + """ + full_text = "" + last_chunk_with_metadata = None + + try: + async for chunk in stream: + # Keep track of the last chunk that might have metadata + if hasattr(chunk, "usage_metadata") and chunk.usage_metadata: + last_chunk_with_metadata = chunk + + if hasattr(chunk, "text"): + full_text += chunk.text + + yield chunk + + # Set final content when complete + if full_text: + span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_text) + span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + + # Get token usage from the last chunk if available + if last_chunk_with_metadata and hasattr(last_chunk_with_metadata, "usage_metadata"): + metadata = last_chunk_with_metadata.usage_metadata + if hasattr(metadata, "prompt_token_count"): + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, metadata.prompt_token_count) + if hasattr(metadata, "candidates_token_count"): + span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, metadata.candidates_token_count) + if hasattr(metadata, "total_token_count"): + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metadata.total_token_count) + + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + finally: + span.end() + + return instrumented_stream() + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise \ No newline at end of file diff --git a/agentops/semconv/span_attributes.py b/agentops/semconv/span_attributes.py index 432f1843e..86101e1ff 100644 --- a/agentops/semconv/span_attributes.py +++ b/agentops/semconv/span_attributes.py @@ -31,6 +31,11 @@ class SpanAttributes: LLM_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens" LLM_REQUEST_TEMPERATURE = "gen_ai.request.temperature" LLM_REQUEST_TOP_P = "gen_ai.request.top_p" + LLM_REQUEST_TOP_K = "gen_ai.request.top_k" + LLM_REQUEST_SEED = "gen_ai.request.seed" + LLM_REQUEST_SYSTEM_INSTRUCTION = "gen_ai.request.system_instruction" + LLM_REQUEST_CANDIDATE_COUNT = "gen_ai.request.candidate_count" + LLM_REQUEST_STOP_SEQUENCES = "gen_ai.request.stop_sequences" LLM_REQUEST_TYPE = "gen_ai.request.type" LLM_REQUEST_STREAMING = "gen_ai.request.streaming" LLM_REQUEST_FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty" diff --git a/examples/gemini_examples/README.md b/examples/gemini_examples/README.md index fb0af7624..a0a81bf4a 100644 --- a/examples/gemini_examples/README.md +++ b/examples/gemini_examples/README.md @@ -6,7 +6,7 @@ This directory contains examples demonstrating how to use AgentOps with Google's - Python 3.7+ - `agentops` package installed (`pip install -U agentops`) -- `google-generativeai` package installed (`pip install -U google-generativeai>=0.1.0`) +- `google-genai` package installed (`pip install -U google-genai>=0.1.0`) - A Gemini API key (get one at [Google AI Studio](https://ai.google.dev/tutorials/setup)) - An AgentOps API key (get one at [AgentOps Dashboard](https://app.agentops.ai/settings/projects)) @@ -14,7 +14,7 @@ This directory contains examples demonstrating how to use AgentOps with Google's 1. Install required packages: ```bash -pip install -U agentops google-generativeai +pip install -U agentops google-genai ``` 2. Set your API keys as environment variables: @@ -27,37 +27,45 @@ export AGENTOPS_API_KEY='your-agentops-api-key' ### Synchronous and Streaming Example -The [gemini_example_sync.ipynb](./gemini_example_sync.ipynb) notebook demonstrates: +The [gemini_example.ipynb](./gemini_example.ipynb) notebook demonstrates: - Basic synchronous text generation - Streaming text generation with chunk handling +- Token counting operations - Automatic event tracking and token usage monitoring - Session management and statistics ```python -import google.generativeai as genai +from google import genai import agentops -# Configure API keys -genai.configure(api_key=GEMINI_API_KEY) - # Initialize AgentOps (provider detection is automatic) agentops.init() -# Create Gemini model -model = genai.GenerativeModel("gemini-1.5-flash") +# Create Gemini client +client = genai.Client(api_key='YOUR_GEMINI_API_KEY') # Generate text (synchronous) -response = model.generate_content("What are the three laws of robotics?") +response = client.models.generate_content( + model="gemini-1.5-flash", + contents="What are the three laws of robotics?" +) print(response.text) # Generate text (streaming) -response = model.generate_content( - "Explain machine learning in simple terms.", - stream=True +response_stream = client.models.generate_content_stream( + model="gemini-1.5-flash", + contents="Explain machine learning in simple terms." ) -for chunk in response: +for chunk in response_stream: print(chunk.text, end="") +# Token counting +token_response = client.models.count_tokens( + model="gemini-1.5-flash", + contents="This is a test sentence to count tokens." +) +print(f"Token count: {token_response.total_tokens}") + # End session and view stats agentops.end_session( end_state="Success", @@ -67,7 +75,7 @@ agentops.end_session( To run the example: 1. Make sure you have set up your environment variables -2. Open and run the notebook: `jupyter notebook gemini_example_sync.ipynb` +2. Open and run the notebook: `jupyter notebook gemini_example.ipynb` 3. View your session in the AgentOps dashboard using the URL printed at the end ## Features diff --git a/examples/gemini_examples/gemini_example.ipynb b/examples/gemini_examples/gemini_example.ipynb index 3e85414ee..31b0009ed 100644 --- a/examples/gemini_examples/gemini_example.ipynb +++ b/examples/gemini_examples/gemini_example.ipynb @@ -12,12 +12,12 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "d731924a", "metadata": {}, "outputs": [], "source": [ - "import google.generativeai as genai\n", + "from google import genai\n", "import agentops\n", "from dotenv import load_dotenv\n", "import os" @@ -33,9 +33,7 @@ "load_dotenv()\n", "\n", "GEMINI_API_KEY = os.getenv(\"GEMINI_API_KEY\") or \"your gemini api key\"\n", - "AGENTOPS_API_KEY = os.getenv(\"AGENTOPS_API_KEY\") or \"your agentops api key\"\n", - "\n", - "genai.configure(api_key=GEMINI_API_KEY)" + "AGENTOPS_API_KEY = os.getenv(\"AGENTOPS_API_KEY\") or \"your agentops api key\"" ] }, { @@ -45,9 +43,9 @@ "metadata": {}, "outputs": [], "source": [ - "# Initialize AgentOps and Gemini model\n", + "# Initialize AgentOps and Gemini client\n", "agentops.init()\n", - "model = genai.GenerativeModel(\"gemini-1.5-flash\")" + "client = genai.Client(api_key=GEMINI_API_KEY)" ] }, { @@ -59,9 +57,9 @@ "source": [ "# Test synchronous generation\n", "print(\"Testing synchronous generation:\")\n", - "response = model.generate_content(\n", - " \"What are the three laws of robotics?\",\n", - " # session=ao_client\n", + "response = client.models.generate_content(\n", + " model=\"gemini-1.5-flash\",\n", + " contents=\"What are the three laws of robotics?\"\n", ")\n", "print(response.text)" ] @@ -75,21 +73,20 @@ "source": [ "# Test streaming generation\n", "print(\"\\nTesting streaming generation:\")\n", - "response = model.generate_content(\n", - " \"Explain the concept of machine learning in simple terms.\",\n", - " stream=True,\n", - " # session=ao_client\n", + "response_stream = client.models.generate_content_stream(\n", + " model=\"gemini-1.5-flash\",\n", + " contents=\"Explain the concept of machine learning in simple terms.\"\n", ")\n", "\n", - "for chunk in response:\n", + "for chunk in response_stream:\n", " print(chunk.text, end=\"\")\n", "print() # Add newline after streaming output\n", "\n", "# Test another synchronous generation\n", "print(\"\\nTesting another synchronous generation:\")\n", - "response = model.generate_content(\n", - " \"What is the difference between supervised and unsupervised learning?\",\n", - " # session=ao_client\n", + "response = client.models.generate_content(\n", + " model=\"gemini-1.5-flash\",\n", + " contents=\"What is the difference between supervised and unsupervised learning?\"\n", ")\n", "print(response.text)" ] @@ -97,18 +94,23 @@ { "cell_type": "code", "execution_count": null, - "id": "c6a674c0", + "id": "fbb2a59c", "metadata": {}, "outputs": [], "source": [ - "# End session and check stats\n", - "agentops.end_session(end_state=\"Success\")" + "# Example of token counting\n", + "print(\"\\nTesting token counting:\")\n", + "token_response = client.models.count_tokens(\n", + " model=\"gemini-1.5-flash\",\n", + " contents=\"This is a test sentence to count tokens.\"\n", + ")\n", + "print(f\"Token count: {token_response.total_tokens}\")" ] } ], "metadata": { "kernelspec": { - "display_name": "ops", + "display_name": ".venv", "language": "python", "name": "python3" }, @@ -122,7 +124,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.16" + "version": "3.13.3" } }, "nbformat": 4,