diff --git a/agentops/instrumentation/openai/config.py b/agentops/instrumentation/openai/config.py new file mode 100644 index 000000000..85fe0d3ba --- /dev/null +++ b/agentops/instrumentation/openai/config.py @@ -0,0 +1,36 @@ +"""Configuration for OpenAI instrumentation. + +This module provides a global configuration object that can be used to customize +the behavior of OpenAI instrumentation across all components. +""" + +from typing import Callable, Optional, Dict +from typing_extensions import Protocol + + +class UploadImageCallable(Protocol): + """Protocol for the upload_base64_image function.""" + + async def __call__(self, trace_id: str, span_id: str, image_name: str, base64_string: str) -> str: + """Upload a base64 image and return the URL.""" + ... + + +class Config: + """Global configuration for OpenAI instrumentation. + + Attributes: + enrich_token_usage: Whether to calculate token usage for streaming responses + enrich_assistant: Whether to enrich assistant responses with additional data + exception_logger: Optional function to log exceptions + get_common_metrics_attributes: Function to get common attributes for metrics + upload_base64_image: Optional async function to upload base64 images + enable_trace_context_propagation: Whether to propagate trace context in headers + """ + + enrich_token_usage: bool = True + enrich_assistant: bool = True + exception_logger: Optional[Callable[[Exception], None]] = None + get_common_metrics_attributes: Callable[[], Dict[str, str]] = lambda: {} + upload_base64_image: Optional[UploadImageCallable] = None + enable_trace_context_propagation: bool = True diff --git a/agentops/instrumentation/openai/instrumentor.py b/agentops/instrumentation/openai/instrumentor.py index 3cf73e751..63c560d0c 100644 --- a/agentops/instrumentation/openai/instrumentor.py +++ b/agentops/instrumentation/openai/instrumentor.py @@ -1,97 +1,333 @@ """OpenAI API Instrumentation for AgentOps -This module provides instrumentation for the OpenAI API, extending the third-party -OpenTelemetry instrumentation to add support for OpenAI responses. - -We subclass the OpenAIV1Instrumentor from the third-party package and add our own -wrapper for the new `openai.responses` call pattern used in the Agents SDK. - -Notes on OpenAI Responses API structure: -- The module is located at openai.resources.responses -- The main class is Responses which inherits from SyncAPIResource -- The create() method generates model responses and returns a Response object -- Key parameters for create(): - - input: Union[str, ResponseInputParam] - Text or other input to the model - - model: Union[str, ChatModel] - The model to use - - tools: Iterable[ToolParam] - Tools for the model to use - - stream: Optional[Literal[False]] - Streaming is handled by a separate method -- The Response object contains response data including usage information - -When instrumenting, we need to: -1. Wrap the Responses.create method -2. Extract data from both the request parameters and response object -3. Create spans with appropriate attributes for observability +This module provides comprehensive instrumentation for the OpenAI API, including: +- Chat completions (streaming and non-streaming) +- Regular completions +- Embeddings +- Image generation +- Assistants API (create, runs, messages) +- Responses API (Agents SDK) + +The instrumentation supports both sync and async methods, metrics collection, +and distributed tracing. """ -from typing import List +from typing import List, Collection from opentelemetry.trace import get_tracer -from opentelemetry.instrumentation.openai.v1 import OpenAIV1Instrumentor as ThirdPartyOpenAIV1Instrumentor +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from agentops.logging import logger -from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.instrumentation.common.wrappers import WrapConfig from agentops.instrumentation.openai import LIBRARY_NAME, LIBRARY_VERSION from agentops.instrumentation.openai.attributes.common import get_response_attributes +from agentops.instrumentation.openai.config import Config +from agentops.instrumentation.openai.utils import is_openai_v1 +from agentops.instrumentation.openai.wrappers import ( + handle_chat_attributes, + handle_completion_attributes, + handle_embeddings_attributes, + handle_image_gen_attributes, + handle_assistant_attributes, + handle_run_attributes, + handle_run_retrieve_attributes, + handle_run_stream_attributes, + handle_messages_attributes, +) +from agentops.instrumentation.openai.v0 import OpenAIV0Instrumentor +from agentops.semconv import Meters +_instruments = ("openai >= 0.27.0",) -# Methods to wrap beyond what the third-party instrumentation handles -WRAPPED_METHODS: List[WrapConfig] = [ - WrapConfig( - trace_name="openai.responses.create", - package="openai.resources.responses", - class_name="Responses", - method_name="create", - handler=get_response_attributes, - ), - WrapConfig( - trace_name="openai.responses.create", - package="openai.resources.responses", - class_name="AsyncResponses", - method_name="create", - handler=get_response_attributes, - is_async=True, - ), -] - - -class OpenAIInstrumentor(ThirdPartyOpenAIV1Instrumentor): - """An instrumentor for OpenAI API that extends the third-party implementation.""" - - # TODO we should only activate the `responses` feature if we are above a certain version, - # otherwise fallback to the third-party implementation - # def instrumentation_dependencies(self) -> Collection[str]: - # """Return packages required for instrumentation.""" - # return ["openai >= 1.0.0"] - def _instrument(self, **kwargs): - """Instrument the OpenAI API, extending the third-party instrumentation. +class OpenAIInstrumentor(BaseInstrumentor): + """An instrumentor for OpenAI's client library with comprehensive coverage.""" + + def __init__( + self, + enrich_assistant: bool = False, + enrich_token_usage: bool = False, + exception_logger=None, + get_common_metrics_attributes=None, + upload_base64_image=None, + enable_trace_context_propagation: bool = True, + ): + super().__init__() + # Configure the global config with provided options + Config.enrich_assistant = enrich_assistant + Config.enrich_token_usage = enrich_token_usage + Config.exception_logger = exception_logger + Config.get_common_metrics_attributes = get_common_metrics_attributes or (lambda: {}) + Config.upload_base64_image = upload_base64_image + Config.enable_trace_context_propagation = enable_trace_context_propagation - This implementation calls the parent _instrument method to handle - standard OpenAI API endpoints, then adds our own instrumentation for - the responses module. - """ - super()._instrument(**kwargs) + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + def _instrument(self, **kwargs): + """Instrument the OpenAI API.""" + if not is_openai_v1(): + # For v0, use the legacy instrumentor + OpenAIV0Instrumentor().instrument(**kwargs) + return + + # Get tracer and meter tracer_provider = kwargs.get("tracer_provider") tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) - for wrap_config in WRAPPED_METHODS: + # Define all wrapped methods + wrapped_methods = self._get_wrapped_methods() + + # Apply all wrappers using the common wrapper infrastructure + from agentops.instrumentation.common.wrappers import wrap + + for wrap_config in wrapped_methods: try: wrap(wrap_config, tracer) - logger.debug(f"Successfully wrapped {wrap_config}") - except (AttributeError, ModuleNotFoundError) as e: - logger.debug(f"Failed to wrap {wrap_config}: {e}") - - logger.debug("Successfully instrumented OpenAI API with Response extensions") + except (AttributeError, ModuleNotFoundError): + # Some methods may not be available in all versions + pass def _uninstrument(self, **kwargs): """Remove instrumentation from OpenAI API.""" - super()._uninstrument(**kwargs) + if not is_openai_v1(): + OpenAIV0Instrumentor().uninstrument(**kwargs) + return + + # Get all wrapped methods + wrapped_methods = self._get_wrapped_methods() - for wrap_config in WRAPPED_METHODS: + # Remove all wrappers using the common wrapper infrastructure + from agentops.instrumentation.common.wrappers import unwrap + + for wrap_config in wrapped_methods: try: unwrap(wrap_config) - logger.debug(f"Successfully unwrapped {wrap_config}") - except Exception as e: - logger.debug(f"Failed to unwrap {wrap_config}: {e}") + except Exception: + # Some methods may not be wrapped + pass + + def _init_metrics(self, meter): + """Initialize metrics for instrumentation.""" + return { + "tokens_histogram": meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used", + ), + "chat_choice_counter": meter.create_counter( + name=Meters.LLM_GENERATION_CHOICES, + unit="choice", + description="Number of choices returned by chat completions call", + ), + "duration_histogram": meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="GenAI operation duration", + ), + "chat_exception_counter": meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during chat completions", + ), + "streaming_time_to_first_token": meter.create_histogram( + name=Meters.LLM_STREAMING_TIME_TO_FIRST_TOKEN, + unit="s", + description="Time to first token in streaming chat completions", + ), + "streaming_time_to_generate": meter.create_histogram( + name=Meters.LLM_STREAMING_TIME_TO_GENERATE, + unit="s", + description="Time between first token and completion in streaming chat completions", + ), + "embeddings_vector_size_counter": meter.create_counter( + name=Meters.LLM_EMBEDDINGS_VECTOR_SIZE, + unit="element", + description="The size of returned vector", + ), + "embeddings_exception_counter": meter.create_counter( + name=Meters.LLM_EMBEDDINGS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during embeddings operation", + ), + "image_gen_exception_counter": meter.create_counter( + name=Meters.LLM_IMAGE_GENERATIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during image generations operation", + ), + } + + def _get_wrapped_methods(self) -> List[WrapConfig]: + """Get all methods that should be wrapped.""" + wrapped_methods = [] + + # Chat completions + wrapped_methods.extend( + [ + WrapConfig( + trace_name="openai.chat.completion", + package="openai.resources.chat.completions", + class_name="Completions", + method_name="create", + handler=handle_chat_attributes, + ), + WrapConfig( + trace_name="openai.chat.completion", + package="openai.resources.chat.completions", + class_name="AsyncCompletions", + method_name="create", + handler=handle_chat_attributes, + is_async=True, + ), + ] + ) + + # Regular completions + wrapped_methods.extend( + [ + WrapConfig( + trace_name="openai.completion", + package="openai.resources.completions", + class_name="Completions", + method_name="create", + handler=handle_completion_attributes, + ), + WrapConfig( + trace_name="openai.completion", + package="openai.resources.completions", + class_name="AsyncCompletions", + method_name="create", + handler=handle_completion_attributes, + is_async=True, + ), + ] + ) + + # Embeddings + wrapped_methods.extend( + [ + WrapConfig( + trace_name="openai.embeddings", + package="openai.resources.embeddings", + class_name="Embeddings", + method_name="create", + handler=handle_embeddings_attributes, + ), + WrapConfig( + trace_name="openai.embeddings", + package="openai.resources.embeddings", + class_name="AsyncEmbeddings", + method_name="create", + handler=handle_embeddings_attributes, + is_async=True, + ), + ] + ) + + # Image generation + wrapped_methods.append( + WrapConfig( + trace_name="openai.images.generate", + package="openai.resources.images", + class_name="Images", + method_name="generate", + handler=handle_image_gen_attributes, + ) + ) + + # Beta APIs - these may not be available in all versions + beta_methods = [] + + # Assistants + beta_methods.append( + WrapConfig( + trace_name="openai.assistants.create", + package="openai.resources.beta.assistants", + class_name="Assistants", + method_name="create", + handler=handle_assistant_attributes, + ) + ) + + # Chat parse methods + beta_methods.extend( + [ + WrapConfig( + trace_name="openai.chat.completion", + package="openai.resources.beta.chat.completions", + class_name="Completions", + method_name="parse", + handler=handle_chat_attributes, + ), + WrapConfig( + trace_name="openai.chat.completion", + package="openai.resources.beta.chat.completions", + class_name="AsyncCompletions", + method_name="parse", + handler=handle_chat_attributes, + is_async=True, + ), + ] + ) + + # Runs + beta_methods.extend( + [ + WrapConfig( + trace_name="openai.runs.create", + package="openai.resources.beta.threads.runs", + class_name="Runs", + method_name="create", + handler=handle_run_attributes, + ), + WrapConfig( + trace_name="openai.runs.retrieve", + package="openai.resources.beta.threads.runs", + class_name="Runs", + method_name="retrieve", + handler=handle_run_retrieve_attributes, + ), + WrapConfig( + trace_name="openai.runs.create_and_stream", + package="openai.resources.beta.threads.runs", + class_name="Runs", + method_name="create_and_stream", + handler=handle_run_stream_attributes, + ), + ] + ) + + # Messages + beta_methods.append( + WrapConfig( + trace_name="openai.messages.list", + package="openai.resources.beta.threads.messages", + class_name="Messages", + method_name="list", + handler=handle_messages_attributes, + ) + ) + + # Add beta methods to wrapped methods (they might fail) + wrapped_methods.extend(beta_methods) + + # Responses API (Agents SDK) - our custom addition + wrapped_methods.extend( + [ + WrapConfig( + trace_name="openai.responses.create", + package="openai.resources.responses", + class_name="Responses", + method_name="create", + handler=get_response_attributes, + ), + WrapConfig( + trace_name="openai.responses.create", + package="openai.resources.responses", + class_name="AsyncResponses", + method_name="create", + handler=get_response_attributes, + is_async=True, + ), + ] + ) - logger.debug("Successfully removed OpenAI API instrumentation with Response extensions") + return wrapped_methods diff --git a/agentops/instrumentation/openai/utils.py b/agentops/instrumentation/openai/utils.py new file mode 100644 index 000000000..3eb0e7fbd --- /dev/null +++ b/agentops/instrumentation/openai/utils.py @@ -0,0 +1,44 @@ +"""Utilities for OpenAI instrumentation. + +This module provides utility functions used across the OpenAI instrumentation +components. +""" + +import os +from importlib.metadata import version + +from agentops.instrumentation.openai.config import Config + +# Get OpenAI version +try: + _OPENAI_VERSION = version("openai") +except Exception: + _OPENAI_VERSION = "0.0.0" + + +def is_openai_v1() -> bool: + """Check if the installed OpenAI version is v1 or later.""" + return _OPENAI_VERSION >= "1.0.0" + + +def is_azure_openai(instance) -> bool: + """Check if the instance is using Azure OpenAI.""" + if not is_openai_v1(): + return False + + try: + import openai + + return isinstance(instance._client, (openai.AsyncAzureOpenAI, openai.AzureOpenAI)) + except Exception: + return False + + +def is_metrics_enabled() -> bool: + """Check if metrics collection is enabled.""" + return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true" + + +def should_record_stream_token_usage() -> bool: + """Check if stream token usage should be recorded.""" + return Config.enrich_token_usage diff --git a/third_party/opentelemetry/instrumentation/openai/v0/__init__.py b/agentops/instrumentation/openai/v0.py similarity index 76% rename from third_party/opentelemetry/instrumentation/openai/v0/__init__.py rename to agentops/instrumentation/openai/v0.py index e8dca2373..5762a11f8 100644 --- a/third_party/opentelemetry/instrumentation/openai/v0/__init__.py +++ b/agentops/instrumentation/openai/v0.py @@ -1,40 +1,47 @@ -from typing import Collection +"""OpenAI v0 API Instrumentation for AgentOps + +This module provides instrumentation for OpenAI API v0 (before v1.0.0). +It's kept for backward compatibility. +""" +from typing import Collection from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.trace import get_tracer from opentelemetry.metrics import get_meter from wrapt import wrap_function_wrapper -from opentelemetry.instrumentation.openai.shared.chat_wrappers import ( +from agentops.instrumentation.openai import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.openai.utils import is_metrics_enabled +from agentops.semconv import Meters + +# Import our wrappers +from agentops.instrumentation.openai.v0_wrappers import ( chat_wrapper, achat_wrapper, -) -from opentelemetry.instrumentation.openai.shared.completion_wrappers import ( completion_wrapper, acompletion_wrapper, -) -from opentelemetry.instrumentation.openai.shared.embeddings_wrappers import ( embeddings_wrapper, aembeddings_wrapper, ) -from opentelemetry.instrumentation.openai.utils import is_metrics_enabled -from opentelemetry.instrumentation.openai.version import __version__ -from agentops.semconv import Meters _instruments = ("openai >= 0.27.0", "openai < 1.0.0") class OpenAIV0Instrumentor(BaseInstrumentor): + """An instrumentor for OpenAI API v0.""" + def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _instrument(self, **kwargs): + """Instrument the OpenAI API v0.""" tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(__name__, __version__, tracer_provider) + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) meter_provider = kwargs.get("meter_provider") - meter = get_meter(__name__, __version__, meter_provider) + meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) + # Initialize metrics if enabled if is_metrics_enabled(): tokens_histogram = meter.create_histogram( name=Meters.LLM_TOKEN_USAGE, @@ -65,27 +72,19 @@ def _instrument(self, **kwargs): unit="s", description="Time to first token in streaming chat completions", ) + streaming_time_to_generate = meter.create_histogram( name=Meters.LLM_STREAMING_TIME_TO_GENERATE, unit="s", description="Time between first token and completion in streaming chat completions", ) - else: - ( - tokens_histogram, - chat_choice_counter, - duration_histogram, - chat_exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ) = (None, None, None, None, None, None) - if is_metrics_enabled(): embeddings_vector_size_counter = meter.create_counter( name=Meters.LLM_EMBEDDINGS_VECTOR_SIZE, unit="element", - description="he size of returned vector", + description="The size of returned vector", ) + embeddings_exception_counter = meter.create_counter( name=Meters.LLM_EMBEDDINGS_EXCEPTIONS, unit="time", @@ -94,12 +93,20 @@ def _instrument(self, **kwargs): else: ( tokens_histogram, + chat_choice_counter, + duration_histogram, + chat_exception_counter, + streaming_time_to_first_token, + streaming_time_to_generate, embeddings_vector_size_counter, embeddings_exception_counter, - ) = (None, None, None) + ) = (None, None, None, None, None, None, None, None) + # Wrap Completion methods wrap_function_wrapper("openai", "Completion.create", completion_wrapper(tracer)) wrap_function_wrapper("openai", "Completion.acreate", acompletion_wrapper(tracer)) + + # Wrap ChatCompletion methods wrap_function_wrapper( "openai", "ChatCompletion.create", @@ -126,6 +133,8 @@ def _instrument(self, **kwargs): streaming_time_to_generate, ), ) + + # Wrap Embedding methods wrap_function_wrapper( "openai", "Embedding.create", @@ -150,4 +159,18 @@ def _instrument(self, **kwargs): ) def _uninstrument(self, **kwargs): - pass + """Remove instrumentation from OpenAI API v0.""" + # Unwrap all the methods + from opentelemetry.instrumentation.utils import unwrap + + # Unwrap Completion methods + unwrap("openai.Completion", "create") + unwrap("openai.Completion", "acreate") + + # Unwrap ChatCompletion methods + unwrap("openai.ChatCompletion", "create") + unwrap("openai.ChatCompletion", "acreate") + + # Unwrap Embedding methods + unwrap("openai.Embedding", "create") + unwrap("openai.Embedding", "acreate") diff --git a/agentops/instrumentation/openai/v0_wrappers.py b/agentops/instrumentation/openai/v0_wrappers.py new file mode 100644 index 000000000..6c445c47b --- /dev/null +++ b/agentops/instrumentation/openai/v0_wrappers.py @@ -0,0 +1,483 @@ +"""Wrapper functions for OpenAI v0 API instrumentation. + +This module provides wrapper functions for instrumenting OpenAI v0 API calls +(before v1.0.0). These wrappers extract attributes, create spans, and handle +metrics for the legacy API format. +""" + +import json +import time +from typing import Any, Dict +from opentelemetry.trace import Tracer, Status, StatusCode +from opentelemetry import context as context_api +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY + +from agentops.instrumentation.openai.utils import is_metrics_enabled +from agentops.instrumentation.openai.wrappers.shared import should_send_prompts +from agentops.semconv import SpanAttributes + + +def _extract_chat_messages(kwargs: Dict[str, Any]) -> list: + """Extract messages from chat completion kwargs.""" + messages = kwargs.get("messages", []) + if should_send_prompts(): + return messages + return [] + + +def _extract_chat_attributes(kwargs: Dict[str, Any], response: Any = None) -> Dict[str, Any]: + """Extract attributes from chat completion calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + SpanAttributes.LLM_REQUEST_TYPE: "chat", + } + + # Request attributes + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + if "temperature" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + if "max_tokens" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + if "n" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_NEW_TOKENS] = kwargs["n"] + + # Messages + messages = _extract_chat_messages(kwargs) + if messages: + attributes[SpanAttributes.LLM_PROMPTS] = json.dumps(messages) + + # Response attributes + if response: + if hasattr(response, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response.model + if hasattr(response, "choices") and response.choices: + choice = response.choices[0] + if hasattr(choice, "message") and choice.message: + if should_send_prompts(): + attributes[SpanAttributes.LLM_COMPLETIONS] = json.dumps( + [ + { + "role": choice.message.get("role", "assistant"), + "content": choice.message.get("content", ""), + } + ] + ) + + # Usage + if hasattr(response, "usage") and response.usage: + usage = response.usage + if hasattr(usage, "prompt_tokens"): + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage.prompt_tokens + if hasattr(usage, "completion_tokens"): + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage.completion_tokens + if hasattr(usage, "total_tokens"): + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage.total_tokens + + return attributes + + +def chat_wrapper( + tracer: Tracer, + tokens_histogram=None, + chat_choice_counter=None, + duration_histogram=None, + chat_exception_counter=None, + streaming_time_to_first_token=None, + streaming_time_to_generate=None, +): + """Create a wrapper for ChatCompletion.create.""" + + def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + start_time = time.time() + span_name = "openai.ChatCompletion.create" + + with tracer.start_as_current_span(span_name) as span: + try: + # Add request attributes + attributes = _extract_chat_attributes(kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Call the wrapped function + response = wrapped(*args, **kwargs) + + # Add response attributes + response_attributes = _extract_chat_attributes(kwargs, response) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + # Handle metrics + if is_metrics_enabled(): + duration = time.time() - start_time + if duration_histogram: + duration_histogram.record(duration, attributes) + + if hasattr(response, "usage") and response.usage: + if tokens_histogram: + if hasattr(response.usage, "prompt_tokens"): + tokens_histogram.record( + response.usage.prompt_tokens, attributes={**attributes, "token.type": "input"} + ) + if hasattr(response.usage, "completion_tokens"): + tokens_histogram.record( + response.usage.completion_tokens, attributes={**attributes, "token.type": "output"} + ) + + if chat_choice_counter and hasattr(response, "choices"): + chat_choice_counter.add(len(response.choices), attributes) + + span.set_status(Status(StatusCode.OK)) + return response + + except Exception as e: + if chat_exception_counter and is_metrics_enabled(): + chat_exception_counter.add(1, attributes) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + + return wrapper + + +def achat_wrapper( + tracer: Tracer, + tokens_histogram=None, + chat_choice_counter=None, + duration_histogram=None, + chat_exception_counter=None, + streaming_time_to_first_token=None, + streaming_time_to_generate=None, +): + """Create a wrapper for ChatCompletion.acreate.""" + + async def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + start_time = time.time() + span_name = "openai.ChatCompletion.acreate" + + with tracer.start_as_current_span(span_name) as span: + try: + # Add request attributes + attributes = _extract_chat_attributes(kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Call the wrapped function + response = await wrapped(*args, **kwargs) + + # Add response attributes + response_attributes = _extract_chat_attributes(kwargs, response) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + # Handle metrics (same as sync version) + if is_metrics_enabled(): + duration = time.time() - start_time + if duration_histogram: + duration_histogram.record(duration, attributes) + + if hasattr(response, "usage") and response.usage: + if tokens_histogram: + if hasattr(response.usage, "prompt_tokens"): + tokens_histogram.record( + response.usage.prompt_tokens, attributes={**attributes, "token.type": "input"} + ) + if hasattr(response.usage, "completion_tokens"): + tokens_histogram.record( + response.usage.completion_tokens, attributes={**attributes, "token.type": "output"} + ) + + if chat_choice_counter and hasattr(response, "choices"): + chat_choice_counter.add(len(response.choices), attributes) + + span.set_status(Status(StatusCode.OK)) + return response + + except Exception as e: + if chat_exception_counter and is_metrics_enabled(): + chat_exception_counter.add(1, attributes) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + + return wrapper + + +def _extract_completion_attributes(kwargs: Dict[str, Any], response: Any = None) -> Dict[str, Any]: + """Extract attributes from completion calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + SpanAttributes.LLM_REQUEST_TYPE: "completion", + } + + # Request attributes + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + if "temperature" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + if "max_tokens" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + + # Prompt + if "prompt" in kwargs and should_send_prompts(): + attributes[SpanAttributes.LLM_PROMPTS] = json.dumps([kwargs["prompt"]]) + + # Response attributes + if response: + if hasattr(response, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response.model + if hasattr(response, "choices") and response.choices: + choice = response.choices[0] + if hasattr(choice, "text") and should_send_prompts(): + attributes[SpanAttributes.LLM_COMPLETIONS] = json.dumps([choice.text]) + + # Usage + if hasattr(response, "usage") and response.usage: + usage = response.usage + if hasattr(usage, "prompt_tokens"): + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage.prompt_tokens + if hasattr(usage, "completion_tokens"): + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage.completion_tokens + if hasattr(usage, "total_tokens"): + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage.total_tokens + + return attributes + + +def completion_wrapper(tracer: Tracer): + """Create a wrapper for Completion.create.""" + + def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + span_name = "openai.Completion.create" + + with tracer.start_as_current_span(span_name) as span: + try: + # Add request attributes + attributes = _extract_completion_attributes(kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Call the wrapped function + response = wrapped(*args, **kwargs) + + # Add response attributes + response_attributes = _extract_completion_attributes(kwargs, response) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return response + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + + return wrapper + + +def acompletion_wrapper(tracer: Tracer): + """Create a wrapper for Completion.acreate.""" + + async def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + span_name = "openai.Completion.acreate" + + with tracer.start_as_current_span(span_name) as span: + try: + # Add request attributes + attributes = _extract_completion_attributes(kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Call the wrapped function + response = await wrapped(*args, **kwargs) + + # Add response attributes + response_attributes = _extract_completion_attributes(kwargs, response) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + return response + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + + return wrapper + + +def _extract_embeddings_attributes(kwargs: Dict[str, Any], response: Any = None) -> Dict[str, Any]: + """Extract attributes from embeddings calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + SpanAttributes.LLM_REQUEST_TYPE: "embedding", + } + + # Request attributes + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Input + if "input" in kwargs and should_send_prompts(): + input_data = kwargs["input"] + if isinstance(input_data, list): + attributes[SpanAttributes.LLM_PROMPTS] = json.dumps(input_data) + else: + attributes[SpanAttributes.LLM_PROMPTS] = json.dumps([input_data]) + + # Response attributes + if response: + if hasattr(response, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response.model + if hasattr(response, "data") and response.data: + attributes["llm.embeddings.count"] = len(response.data) + if response.data and hasattr(response.data[0], "embedding"): + attributes["llm.embeddings.vector_size"] = len(response.data[0].embedding) + + # Usage + if hasattr(response, "usage") and response.usage: + usage = response.usage + if hasattr(usage, "prompt_tokens"): + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage.prompt_tokens + if hasattr(usage, "total_tokens"): + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage.total_tokens + + return attributes + + +def embeddings_wrapper( + tracer: Tracer, + tokens_histogram=None, + embeddings_vector_size_counter=None, + duration_histogram=None, + embeddings_exception_counter=None, +): + """Create a wrapper for Embedding.create.""" + + def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + start_time = time.time() + span_name = "openai.Embedding.create" + + with tracer.start_as_current_span(span_name) as span: + try: + # Add request attributes + attributes = _extract_embeddings_attributes(kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Call the wrapped function + response = wrapped(*args, **kwargs) + + # Add response attributes + response_attributes = _extract_embeddings_attributes(kwargs, response) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + # Handle metrics + if is_metrics_enabled(): + duration = time.time() - start_time + if duration_histogram: + duration_histogram.record(duration, attributes) + + if embeddings_vector_size_counter and hasattr(response, "data") and response.data: + if response.data and hasattr(response.data[0], "embedding"): + embeddings_vector_size_counter.add( + len(response.data[0].embedding) * len(response.data), attributes + ) + + if tokens_histogram and hasattr(response, "usage") and response.usage: + if hasattr(response.usage, "prompt_tokens"): + tokens_histogram.record( + response.usage.prompt_tokens, attributes={**attributes, "token.type": "input"} + ) + + span.set_status(Status(StatusCode.OK)) + return response + + except Exception as e: + if embeddings_exception_counter and is_metrics_enabled(): + embeddings_exception_counter.add(1, attributes) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + + return wrapper + + +def aembeddings_wrapper( + tracer: Tracer, + tokens_histogram=None, + embeddings_vector_size_counter=None, + duration_histogram=None, + embeddings_exception_counter=None, +): + """Create a wrapper for Embedding.acreate.""" + + async def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + start_time = time.time() + span_name = "openai.Embedding.acreate" + + with tracer.start_as_current_span(span_name) as span: + try: + # Add request attributes + attributes = _extract_embeddings_attributes(kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Call the wrapped function + response = await wrapped(*args, **kwargs) + + # Add response attributes + response_attributes = _extract_embeddings_attributes(kwargs, response) + for key, value in response_attributes.items(): + span.set_attribute(key, value) + + # Handle metrics (same as sync version) + if is_metrics_enabled(): + duration = time.time() - start_time + if duration_histogram: + duration_histogram.record(duration, attributes) + + if embeddings_vector_size_counter and hasattr(response, "data") and response.data: + if response.data and hasattr(response.data[0], "embedding"): + embeddings_vector_size_counter.add( + len(response.data[0].embedding) * len(response.data), attributes + ) + + if tokens_histogram and hasattr(response, "usage") and response.usage: + if hasattr(response.usage, "prompt_tokens"): + tokens_histogram.record( + response.usage.prompt_tokens, attributes={**attributes, "token.type": "input"} + ) + + span.set_status(Status(StatusCode.OK)) + return response + + except Exception as e: + if embeddings_exception_counter and is_metrics_enabled(): + embeddings_exception_counter.add(1, attributes) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + + return wrapper diff --git a/agentops/instrumentation/openai/wrappers/__init__.py b/agentops/instrumentation/openai/wrappers/__init__.py new file mode 100644 index 000000000..ed9bd6a58 --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/__init__.py @@ -0,0 +1,28 @@ +"""OpenAI instrumentation wrappers. + +This package contains wrapper implementations for different OpenAI API endpoints. +""" + +from agentops.instrumentation.openai.wrappers.chat import handle_chat_attributes +from agentops.instrumentation.openai.wrappers.completion import handle_completion_attributes +from agentops.instrumentation.openai.wrappers.embeddings import handle_embeddings_attributes +from agentops.instrumentation.openai.wrappers.image_gen import handle_image_gen_attributes +from agentops.instrumentation.openai.wrappers.assistant import ( + handle_assistant_attributes, + handle_run_attributes, + handle_run_retrieve_attributes, + handle_run_stream_attributes, + handle_messages_attributes, +) + +__all__ = [ + "handle_chat_attributes", + "handle_completion_attributes", + "handle_embeddings_attributes", + "handle_image_gen_attributes", + "handle_assistant_attributes", + "handle_run_attributes", + "handle_run_retrieve_attributes", + "handle_run_stream_attributes", + "handle_messages_attributes", +] diff --git a/agentops/instrumentation/openai/wrappers/assistant.py b/agentops/instrumentation/openai/wrappers/assistant.py new file mode 100644 index 000000000..011f29a30 --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/assistant.py @@ -0,0 +1,277 @@ +"""Assistant API wrapper for OpenAI instrumentation. + +This module provides attribute extraction for OpenAI Assistant API endpoints. +""" + +import json +import logging +from typing import Any, Dict, Optional, Tuple + +from agentops.instrumentation.openai.utils import is_openai_v1 +from agentops.instrumentation.openai.wrappers.shared import ( + model_as_dict, + should_send_prompts, +) +from agentops.instrumentation.openai.config import Config +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes + +logger = logging.getLogger(__name__) + + +def handle_assistant_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from assistant creation calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + "gen_ai.operation.name": "assistant.create", + } + + # Extract request attributes from kwargs + if kwargs: + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + if "name" in kwargs: + attributes["gen_ai.assistant.name"] = kwargs["name"] + if "description" in kwargs: + attributes["gen_ai.assistant.description"] = kwargs["description"] + if "instructions" in kwargs: + attributes["gen_ai.assistant.instructions"] = kwargs["instructions"] + + # Tools + tools = kwargs.get("tools", []) + for i, tool in enumerate(tools): + if isinstance(tool, dict): + attributes[f"gen_ai.assistant.tools.{i}.type"] = tool.get("type") + else: + attributes[f"gen_ai.assistant.tools.{i}.type"] = str(tool) + + # Extract response attributes + if return_value: + response_dict = {} + if hasattr(return_value, "__dict__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + if "id" in response_dict: + attributes["gen_ai.assistant.id"] = response_dict["id"] + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + if "created_at" in response_dict: + attributes["gen_ai.assistant.created_at"] = response_dict["created_at"] + + if Config.enrich_assistant: + if "object" in response_dict: + attributes["gen_ai.assistant.object"] = response_dict["object"] + if "file_ids" in response_dict: + attributes["gen_ai.assistant.file_ids"] = json.dumps(response_dict["file_ids"]) + if "metadata" in response_dict: + attributes["gen_ai.assistant.metadata"] = json.dumps(response_dict["metadata"]) + + return attributes + + +def handle_run_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from run creation calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + "gen_ai.operation.name": "run.create", + } + + # Extract request attributes from kwargs + if kwargs: + if "thread_id" in kwargs: + attributes["gen_ai.thread.id"] = kwargs["thread_id"] + if "assistant_id" in kwargs: + attributes["gen_ai.assistant.id"] = kwargs["assistant_id"] + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + if "instructions" in kwargs: + attributes["gen_ai.run.instructions"] = kwargs["instructions"] + + # Additional messages + additional_messages = kwargs.get("additional_messages", []) + if additional_messages and should_send_prompts(): + for i, msg in enumerate(additional_messages): + prefix = f"gen_ai.run.additional_messages.{i}" + if "role" in msg: + attributes[f"{prefix}.role"] = msg["role"] + if "content" in msg: + attributes[f"{prefix}.content"] = msg["content"] + + # Extract response attributes + if return_value: + response_dict = {} + if hasattr(return_value, "__dict__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + if "id" in response_dict: + attributes["gen_ai.run.id"] = response_dict["id"] + if "status" in response_dict: + attributes["gen_ai.run.status"] = response_dict["status"] + if "thread_id" in response_dict: + attributes["gen_ai.thread.id"] = response_dict["thread_id"] + if "assistant_id" in response_dict: + attributes["gen_ai.assistant.id"] = response_dict["assistant_id"] + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + + # Usage + usage = response_dict.get("usage", {}) + if usage: + if is_openai_v1() and hasattr(usage, "__dict__"): + usage = usage.__dict__ + if "prompt_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["prompt_tokens"] + if "completion_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage["completion_tokens"] + if "total_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_tokens"] + + if Config.enrich_assistant: + if "created_at" in response_dict: + attributes["gen_ai.run.created_at"] = response_dict["created_at"] + if "started_at" in response_dict: + attributes["gen_ai.run.started_at"] = response_dict["started_at"] + if "completed_at" in response_dict: + attributes["gen_ai.run.completed_at"] = response_dict["completed_at"] + if "failed_at" in response_dict: + attributes["gen_ai.run.failed_at"] = response_dict["failed_at"] + if "metadata" in response_dict: + attributes["gen_ai.run.metadata"] = json.dumps(response_dict["metadata"]) + + return attributes + + +def handle_run_retrieve_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from run retrieval calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + "gen_ai.operation.name": "run.retrieve", + } + + # Extract run_id from args or kwargs + run_id = None + if args and len(args) > 0: + run_id = args[0] + elif kwargs: + run_id = kwargs.get("run_id") + + if run_id: + attributes["gen_ai.run.id"] = run_id + + # Response attributes are same as run creation + if return_value: + response_attrs = handle_run_attributes(None, None, return_value) + # Update with response attributes but keep our operation name + response_attrs.pop("gen_ai.operation.name", None) + attributes.update(response_attrs) + + return attributes + + +def handle_run_stream_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from run create_and_stream calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + "gen_ai.operation.name": "run.create_and_stream", + SpanAttributes.LLM_REQUEST_STREAMING: True, + } + + # Request attributes are same as run creation + if kwargs: + request_attrs = handle_run_attributes(None, kwargs, None) + # Update with request attributes but keep our operation name + request_attrs.pop("gen_ai.operation.name", None) + attributes.update(request_attrs) + + # For streaming, we don't have immediate response attributes + + return attributes + + +def handle_messages_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from messages list calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + "gen_ai.operation.name": "messages.list", + } + + # Extract thread_id + thread_id = None + if args and len(args) > 0: + thread_id = args[0] + elif kwargs: + thread_id = kwargs.get("thread_id") + + if thread_id: + attributes["gen_ai.thread.id"] = thread_id + + # Extract response attributes + if return_value: + response_dict = {} + if hasattr(return_value, "__dict__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + # For list responses, note the count + data = response_dict.get("data", []) + attributes["gen_ai.messages.count"] = len(data) + + if Config.enrich_assistant and should_send_prompts(): + # Include details of first few messages + for i, msg in enumerate(data[:10]): # Limit to first 10 + if isinstance(msg, dict): + msg_dict = msg + else: + msg_dict = model_as_dict(msg) + + prefix = f"gen_ai.messages.{i}" + if "id" in msg_dict: + attributes[f"{prefix}.id"] = msg_dict["id"] + if "role" in msg_dict: + attributes[f"{prefix}.role"] = msg_dict["role"] + if "created_at" in msg_dict: + attributes[f"{prefix}.created_at"] = msg_dict["created_at"] + + # Handle content + content = msg_dict.get("content", []) + if content and isinstance(content, list): + for j, content_item in enumerate(content): + try: + if isinstance(content_item, dict) and content_item.get("type") == "text": + text_obj = content_item.get("text") + if text_obj and isinstance(text_obj, dict): + text_value = text_obj.get("value", "") + attributes[f"{prefix}.content.{j}"] = text_value + elif hasattr(content_item, "text") and hasattr(content_item.text, "value"): + # Handle object-style content + attributes[f"{prefix}.content.{j}"] = content_item.text.value + except Exception: + # Continue processing other content items + continue + + return attributes diff --git a/agentops/instrumentation/openai/wrappers/chat.py b/agentops/instrumentation/openai/wrappers/chat.py new file mode 100644 index 000000000..bc2be1b73 --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/chat.py @@ -0,0 +1,195 @@ +"""Chat completions wrapper for OpenAI instrumentation. + +This module provides attribute extraction for OpenAI chat completions API, +compatible with the common wrapper pattern. +""" + +import json +import logging +from typing import Any, Dict, Optional, Tuple + +from agentops.instrumentation.openai.utils import is_openai_v1 +from agentops.instrumentation.openai.wrappers.shared import ( + model_as_dict, + should_send_prompts, +) +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, LLMRequestTypeValues + +logger = logging.getLogger(__name__) + +LLM_REQUEST_TYPE = LLMRequestTypeValues.CHAT + + +def handle_chat_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from chat completion calls. + + This function is designed to work with the common wrapper pattern, + extracting attributes from the method arguments and return value. + """ + attributes = { + SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value, + SpanAttributes.LLM_SYSTEM: "OpenAI", + } + + # Extract request attributes from kwargs + if kwargs: + # Model + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Request parameters + if "max_tokens" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + if "temperature" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + if "top_p" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"] + if "frequency_penalty" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY] = kwargs["frequency_penalty"] + if "presence_penalty" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY] = kwargs["presence_penalty"] + if "user" in kwargs: + attributes[SpanAttributes.LLM_USER] = kwargs["user"] + + # Streaming + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs.get("stream", False) + + # Headers + headers = kwargs.get("extra_headers") or kwargs.get("headers") + if headers: + attributes[SpanAttributes.LLM_REQUEST_HEADERS] = str(headers) + + # Messages + if should_send_prompts() and "messages" in kwargs: + messages = kwargs["messages"] + for i, msg in enumerate(messages): + prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" + if "role" in msg: + attributes[f"{prefix}.role"] = msg["role"] + if "content" in msg: + content = msg["content"] + if isinstance(content, list): + # Handle multi-modal content + content = json.dumps(content) + attributes[f"{prefix}.content"] = content + if "tool_call_id" in msg: + attributes[f"{prefix}.tool_call_id"] = msg["tool_call_id"] + + # Tool calls + if "tool_calls" in msg: + tool_calls = msg["tool_calls"] + for j, tool_call in enumerate(tool_calls): + if is_openai_v1() and hasattr(tool_call, "__dict__"): + tool_call = model_as_dict(tool_call) + function = tool_call.get("function", {}) + attributes[f"{prefix}.tool_calls.{j}.id"] = tool_call.get("id") + attributes[f"{prefix}.tool_calls.{j}.name"] = function.get("name") + attributes[f"{prefix}.tool_calls.{j}.arguments"] = function.get("arguments") + + # Functions + if "functions" in kwargs: + functions = kwargs["functions"] + for i, function in enumerate(functions): + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + attributes[f"{prefix}.name"] = function.get("name") + attributes[f"{prefix}.description"] = function.get("description") + attributes[f"{prefix}.parameters"] = json.dumps(function.get("parameters")) + + # Tools + if "tools" in kwargs: + tools = kwargs["tools"] + for i, tool in enumerate(tools): + function = tool.get("function", {}) + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + attributes[f"{prefix}.name"] = function.get("name") + attributes[f"{prefix}.description"] = function.get("description") + attributes[f"{prefix}.parameters"] = json.dumps(function.get("parameters")) + + # Extract response attributes from return value + if return_value: + # Note: For streaming responses, return_value might be a generator/stream + # In that case, we won't have the full response data here + + # Convert to dict if needed + response_dict = {} + if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + # Basic response attributes + if "id" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_ID] = response_dict["id"] + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + if "system_fingerprint" in response_dict: + attributes[SpanAttributes.LLM_OPENAI_RESPONSE_SYSTEM_FINGERPRINT] = response_dict["system_fingerprint"] + + # Usage + usage = response_dict.get("usage", {}) + if usage: + if is_openai_v1() and hasattr(usage, "__dict__"): + usage = usage.__dict__ + if "total_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_tokens"] + if "prompt_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["prompt_tokens"] + if "completion_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage["completion_tokens"] + + # Reasoning tokens + output_details = usage.get("output_tokens_details", {}) + if isinstance(output_details, dict) and "reasoning_tokens" in output_details: + attributes[SpanAttributes.LLM_USAGE_REASONING_TOKENS] = output_details["reasoning_tokens"] + + # Choices + if should_send_prompts() and "choices" in response_dict: + choices = response_dict["choices"] + for choice in choices: + index = choice.get("index", 0) + prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" + + if "finish_reason" in choice: + attributes[f"{prefix}.finish_reason"] = choice["finish_reason"] + + # Content filter + if "content_filter_results" in choice: + attributes[f"{prefix}.content_filter_results"] = json.dumps(choice["content_filter_results"]) + + # Message + message = choice.get("message", {}) + if message: + if "role" in message: + attributes[f"{prefix}.role"] = message["role"] + if "content" in message: + attributes[f"{prefix}.content"] = message["content"] + if "refusal" in message: + attributes[f"{prefix}.refusal"] = message["refusal"] + + # Function call + if "function_call" in message: + function_call = message["function_call"] + attributes[f"{prefix}.tool_calls.0.name"] = function_call.get("name") + attributes[f"{prefix}.tool_calls.0.arguments"] = function_call.get("arguments") + + # Tool calls + if "tool_calls" in message: + tool_calls = message["tool_calls"] + for i, tool_call in enumerate(tool_calls): + function = tool_call.get("function", {}) + attributes[f"{prefix}.tool_calls.{i}.id"] = tool_call.get("id") + attributes[f"{prefix}.tool_calls.{i}.name"] = function.get("name") + attributes[f"{prefix}.tool_calls.{i}.arguments"] = function.get("arguments") + + # Prompt filter results + if "prompt_filter_results" in response_dict: + attributes[f"{SpanAttributes.LLM_PROMPTS}.prompt_filter_results"] = json.dumps( + response_dict["prompt_filter_results"] + ) + + return attributes diff --git a/agentops/instrumentation/openai/wrappers/completion.py b/agentops/instrumentation/openai/wrappers/completion.py new file mode 100644 index 000000000..0a1f0512b --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/completion.py @@ -0,0 +1,109 @@ +"""Completion wrapper for OpenAI instrumentation. + +This module provides attribute extraction for OpenAI text completions API. +""" + +import logging +from typing import Any, Dict, Optional, Tuple + +from agentops.instrumentation.openai.utils import is_openai_v1 +from agentops.instrumentation.openai.wrappers.shared import ( + model_as_dict, + should_send_prompts, +) +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, LLMRequestTypeValues + +logger = logging.getLogger(__name__) + +LLM_REQUEST_TYPE = LLMRequestTypeValues.COMPLETION + + +def handle_completion_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from completion calls.""" + attributes = { + SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value, + SpanAttributes.LLM_SYSTEM: "OpenAI", + } + + # Extract request attributes from kwargs + if kwargs: + # Model + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Request parameters + if "max_tokens" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + if "temperature" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + if "top_p" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"] + if "frequency_penalty" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY] = kwargs["frequency_penalty"] + if "presence_penalty" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY] = kwargs["presence_penalty"] + if "user" in kwargs: + attributes[SpanAttributes.LLM_USER] = kwargs["user"] + + # Streaming + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs.get("stream", False) + + # Headers + headers = kwargs.get("extra_headers") or kwargs.get("headers") + if headers: + attributes[SpanAttributes.LLM_REQUEST_HEADERS] = str(headers) + + # Prompt + if should_send_prompts() and "prompt" in kwargs: + prompt = kwargs["prompt"] + if isinstance(prompt, list): + for i, p in enumerate(prompt): + attributes[f"{SpanAttributes.LLM_PROMPTS}.{i}.content"] = p + else: + attributes[f"{SpanAttributes.LLM_PROMPTS}.0.content"] = prompt + + # Extract response attributes from return value + if return_value: + # Convert to dict if needed + response_dict = {} + if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + # Basic response attributes + if "id" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_ID] = response_dict["id"] + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + + # Usage + usage = response_dict.get("usage", {}) + if usage: + if is_openai_v1() and hasattr(usage, "__dict__"): + usage = usage.__dict__ + if "total_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_tokens"] + if "prompt_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["prompt_tokens"] + if "completion_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage["completion_tokens"] + + # Choices + if should_send_prompts() and "choices" in response_dict: + choices = response_dict["choices"] + for choice in choices: + index = choice.get("index", 0) + prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" + + if "finish_reason" in choice: + attributes[f"{prefix}.finish_reason"] = choice["finish_reason"] + if "text" in choice: + attributes[f"{prefix}.content"] = choice["text"] + + return attributes diff --git a/agentops/instrumentation/openai/wrappers/embeddings.py b/agentops/instrumentation/openai/wrappers/embeddings.py new file mode 100644 index 000000000..84546c8a9 --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/embeddings.py @@ -0,0 +1,89 @@ +"""Embeddings wrapper for OpenAI instrumentation. + +This module provides attribute extraction for OpenAI embeddings API. +""" + +import logging +from typing import Any, Dict, Optional, Tuple + +from agentops.instrumentation.openai.utils import is_openai_v1 +from agentops.instrumentation.openai.wrappers.shared import ( + model_as_dict, + should_send_prompts, +) +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, LLMRequestTypeValues + +logger = logging.getLogger(__name__) + +LLM_REQUEST_TYPE = LLMRequestTypeValues.EMBEDDING + + +def handle_embeddings_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from embeddings calls.""" + attributes = { + SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value, + SpanAttributes.LLM_SYSTEM: "OpenAI", + } + + # Extract request attributes from kwargs + if kwargs: + # Model + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Headers + headers = kwargs.get("extra_headers") or kwargs.get("headers") + if headers: + attributes[SpanAttributes.LLM_REQUEST_HEADERS] = str(headers) + + # Input + if should_send_prompts() and "input" in kwargs: + input_param = kwargs["input"] + if isinstance(input_param, str): + attributes[f"{SpanAttributes.LLM_PROMPTS}.0.content"] = input_param + elif isinstance(input_param, list): + for i, inp in enumerate(input_param): + if isinstance(inp, str): + attributes[f"{SpanAttributes.LLM_PROMPTS}.{i}.content"] = inp + elif isinstance(inp, (int, list)): + # Token inputs - convert to string representation + attributes[f"{SpanAttributes.LLM_PROMPTS}.{i}.content"] = str(inp) + + # Extract response attributes from return value + if return_value: + # Convert to dict if needed + response_dict = {} + if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + # Basic response attributes + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + + # Usage + usage = response_dict.get("usage", {}) + if usage: + if is_openai_v1() and hasattr(usage, "__dict__"): + usage = usage.__dict__ + if "total_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_tokens"] + if "prompt_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["prompt_tokens"] + + # Embeddings data + if should_send_prompts() and "data" in response_dict: + data = response_dict["data"] + for i, item in enumerate(data): + embedding = item.get("embedding", []) + if embedding: + # We don't store the full embedding vector, just metadata + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{i}.embedding_length"] = len(embedding) + + return attributes diff --git a/agentops/instrumentation/openai/wrappers/image_gen.py b/agentops/instrumentation/openai/wrappers/image_gen.py new file mode 100644 index 000000000..4fd4aa211 --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/image_gen.py @@ -0,0 +1,75 @@ +"""Image generation wrapper for OpenAI instrumentation. + +This module provides attribute extraction for OpenAI image generation API. +""" + +import logging +from typing import Any, Dict, Optional, Tuple + +from agentops.instrumentation.openai.wrappers.shared import model_as_dict +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes + +logger = logging.getLogger(__name__) + + +def handle_image_gen_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from image generation calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + "gen_ai.operation.name": "image_generation", + } + + # Extract request attributes from kwargs + if kwargs: + # Model + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Image parameters + if "prompt" in kwargs: + attributes["gen_ai.request.image_prompt"] = kwargs["prompt"] + if "size" in kwargs: + attributes["gen_ai.request.image_size"] = kwargs["size"] + if "quality" in kwargs: + attributes["gen_ai.request.image_quality"] = kwargs["quality"] + if "style" in kwargs: + attributes["gen_ai.request.image_style"] = kwargs["style"] + if "n" in kwargs: + attributes["gen_ai.request.image_count"] = kwargs["n"] + if "response_format" in kwargs: + attributes["gen_ai.request.image_response_format"] = kwargs["response_format"] + + # Headers + headers = kwargs.get("extra_headers") or kwargs.get("headers") + if headers: + attributes[SpanAttributes.LLM_REQUEST_HEADERS] = str(headers) + + # Extract response attributes from return value + if return_value: + # Convert to dict if needed + response_dict = {} + if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + + # Response data + if "created" in response_dict: + attributes["gen_ai.response.created"] = response_dict["created"] + + # Images data + if "data" in response_dict: + data = response_dict["data"] + attributes["gen_ai.response.image_count"] = len(data) + + # We don't typically store the full image data, but we can store metadata + for i, item in enumerate(data): + if "revised_prompt" in item: + attributes[f"gen_ai.response.images.{i}.revised_prompt"] = item["revised_prompt"] + + return attributes diff --git a/agentops/instrumentation/openai/wrappers/shared.py b/agentops/instrumentation/openai/wrappers/shared.py new file mode 100644 index 000000000..c969437f1 --- /dev/null +++ b/agentops/instrumentation/openai/wrappers/shared.py @@ -0,0 +1,81 @@ +"""Shared utilities for OpenAI instrumentation wrappers. + +This module contains common functions and utilities used across different +OpenAI API endpoint wrappers. +""" + +import os +import types +import logging +from typing import Any, Dict, Optional +from importlib.metadata import version + +import openai +from opentelemetry import context as context_api + +from agentops.instrumentation.openai.utils import is_openai_v1 + +logger = logging.getLogger(__name__) + +# Pydantic version for model serialization +_PYDANTIC_VERSION = version("pydantic") + +# Cache for tiktoken encodings +tiktoken_encodings = {} + + +def should_send_prompts() -> bool: + """Check if prompt content should be sent in traces.""" + return (os.getenv("TRACELOOP_TRACE_CONTENT") or "true").lower() == "true" or context_api.get_value( + "override_enable_content_tracing" + ) + + +def is_streaming_response(response: Any) -> bool: + """Check if a response is a streaming response.""" + if is_openai_v1(): + return isinstance(response, openai.Stream) or isinstance(response, openai.AsyncStream) + return isinstance(response, types.GeneratorType) or isinstance(response, types.AsyncGeneratorType) + + +def model_as_dict(model: Any) -> Dict[str, Any]: + """Convert a model object to a dictionary.""" + if model is None: + return {} + if isinstance(model, dict): + return model + if _PYDANTIC_VERSION < "2.0.0": + return model.dict() + if hasattr(model, "model_dump"): + return model.model_dump() + elif hasattr(model, "parse"): # Raw API response + return model_as_dict(model.parse()) + else: + return model if isinstance(model, dict) else {} + + +def get_token_count_from_string(string: str, model_name: str) -> Optional[int]: + """Get token count from a string using tiktoken.""" + from agentops.instrumentation.openai.utils import should_record_stream_token_usage + + if not should_record_stream_token_usage(): + return None + + try: + import tiktoken + except ImportError: + return None + + if tiktoken_encodings.get(model_name) is None: + try: + encoding = tiktoken.encoding_for_model(model_name) + except KeyError as ex: + logger.warning(f"Failed to get tiktoken encoding for model_name {model_name}, error: {str(ex)}") + return None + + tiktoken_encodings[model_name] = encoding + else: + encoding = tiktoken_encodings.get(model_name) + + token_count = len(encoding.encode(string)) + return token_count diff --git a/pyproject.toml b/pyproject.toml index b0833c9b3..937f7df01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -165,12 +165,6 @@ exclude = [ [tool.hatch.build.targets.wheel] packages = ["agentops"] -[tool.hatch.build.targets.wheel.force-include] -"third_party" = "." - -[tool.hatch.build.targets.wheel.sources] -"third_party" = "third_party" - [tool.hatch.build] exclude = [ "docs/*", diff --git a/tests/unit/instrumentation/openai_core/test_instrumentor.py b/tests/unit/instrumentation/openai_core/test_instrumentor.py index ce364ed91..161576cec 100644 --- a/tests/unit/instrumentation/openai_core/test_instrumentor.py +++ b/tests/unit/instrumentation/openai_core/test_instrumentor.py @@ -44,8 +44,8 @@ def instrumentor(self): # To avoid timing issues with the fixture, we need to ensure patch # objects are created before being used in the test - mock_wrap = patch("agentops.instrumentation.openai.instrumentor.wrap").start() - mock_unwrap = patch("agentops.instrumentation.openai.instrumentor.unwrap").start() + mock_wrap = patch("agentops.instrumentation.common.wrappers.wrap").start() + mock_unwrap = patch("agentops.instrumentation.common.wrappers.unwrap").start() mock_instrument = patch.object(instrumentor, "_instrument", wraps=instrumentor._instrument).start() mock_uninstrument = patch.object(instrumentor, "_uninstrument", wraps=instrumentor._uninstrument).start() @@ -72,33 +72,39 @@ def test_instrumentor_initialization(self): instrumentor = OpenAIInstrumentor() assert instrumentor.__class__.__name__ == "OpenAIInstrumentor" - # Verify it inherits from the third-party OpenAIV1Instrumentor - from opentelemetry.instrumentation.openai.v1 import OpenAIV1Instrumentor + # Verify it inherits from BaseInstrumentor + from opentelemetry.instrumentation.instrumentor import BaseInstrumentor - assert isinstance(instrumentor, OpenAIV1Instrumentor) + assert isinstance(instrumentor, BaseInstrumentor) def test_instrument_method_wraps_response_api(self, instrumentor): """Test the _instrument method wraps the Response API methods""" mock_wrap = instrumentor["mock_wrap"] - # Verify wrap was called for each method in WRAPPED_METHODS - assert mock_wrap.call_count == 2 - - # Check the first call arguments for Responses.create - first_call_args = mock_wrap.call_args_list[0][0] - assert isinstance(first_call_args[0], WrapConfig) - assert first_call_args[0].trace_name == "openai.responses.create" - assert first_call_args[0].package == "openai.resources.responses" - assert first_call_args[0].class_name == "Responses" - assert first_call_args[0].method_name == "create" - - # Check the second call arguments for AsyncResponses.create - second_call_args = mock_wrap.call_args_list[1][0] - assert isinstance(second_call_args[0], WrapConfig) - assert second_call_args[0].trace_name == "openai.responses.create" - assert second_call_args[0].package == "openai.resources.responses" - assert second_call_args[0].class_name == "AsyncResponses" - assert second_call_args[0].method_name == "create" + # Verify wrap was called multiple times (we wrap many methods) + assert mock_wrap.call_count > 0 + + # Find Response API calls in the wrapped methods + response_api_calls = [] + for call in mock_wrap.call_args_list: + wrap_config = call[0][0] + if isinstance(wrap_config, WrapConfig) and wrap_config.package == "openai.resources.responses": + response_api_calls.append(wrap_config) + + # Verify we have both sync and async Response API methods + assert len(response_api_calls) == 2 + + # Check sync Responses.create + sync_response = next((cfg for cfg in response_api_calls if cfg.class_name == "Responses"), None) + assert sync_response is not None + assert sync_response.trace_name == "openai.responses.create" + assert sync_response.method_name == "create" + + # Check async AsyncResponses.create + async_response = next((cfg for cfg in response_api_calls if cfg.class_name == "AsyncResponses"), None) + assert async_response is not None + assert async_response.trace_name == "openai.responses.create" + assert async_response.method_name == "create" def test_uninstrument_method_unwraps_response_api(self, instrumentor): """Test the _uninstrument method unwraps the Response API methods""" @@ -118,19 +124,19 @@ def test_uninstrument_method_unwraps_response_api(self, instrumentor): assert mock_unwrap.called, "unwrap was not called during _uninstrument" def test_calls_parent_instrument(self, instrumentor): - """Test that the instrumentor calls the parent class's _instrument method""" + """Test that the instrumentor properly instruments methods""" mock_instrument = instrumentor["mock_instrument"] - # Verify super()._instrument was called + # Verify _instrument was called assert mock_instrument.called - # Verify the tracer provider was passed to the parent method + # Verify the tracer provider was passed call_kwargs = mock_instrument.call_args[1] assert "tracer_provider" in call_kwargs assert call_kwargs["tracer_provider"] == instrumentor["tracer_provider"] def test_calls_parent_uninstrument(self, instrumentor): - """Test that the instrumentor calls the parent class's _uninstrument method""" + """Test that the instrumentor properly uninstruments methods""" instrumentor_obj = instrumentor["instrumentor"] mock_uninstrument = instrumentor["mock_uninstrument"] @@ -140,8 +146,8 @@ def test_calls_parent_uninstrument(self, instrumentor): # Directly call uninstrument instrumentor_obj._uninstrument() - # Now verify the method was called at least once - assert mock_uninstrument.called, "Parent _uninstrument was not called" + # Now verify the method was called + assert mock_uninstrument.called, "_uninstrument was not called" def test_wrapper_error_handling(self): """Test that the instrumentor handles errors when wrapping methods""" @@ -149,16 +155,15 @@ def test_wrapper_error_handling(self): instrumentor = OpenAIInstrumentor() # Mock wrap to raise an exception - with patch("agentops.instrumentation.openai.instrumentor.wrap") as mock_wrap: + with patch("agentops.instrumentation.common.wrappers.wrap") as mock_wrap: mock_wrap.side_effect = AttributeError("Module not found") - # Mock the parent class's _instrument method - with patch.object(instrumentor, "_instrument") as mock_instrument: - # Instrument should not raise exceptions even if wrapping fails + # Instrument should not raise exceptions even if wrapping fails + # The instrumentor should handle errors gracefully + try: instrumentor._instrument(tracer_provider=MagicMock()) - - # Verify the parent method was still called - assert mock_instrument.called + except Exception: + pytest.fail("Instrumentor should handle wrapping errors gracefully") def test_unwrapper_error_handling(self): """Test that the instrumentor handles errors when unwrapping methods""" @@ -166,16 +171,15 @@ def test_unwrapper_error_handling(self): instrumentor = OpenAIInstrumentor() # Mock unwrap to raise an exception - with patch("agentops.instrumentation.openai.instrumentor.unwrap") as mock_unwrap: + with patch("agentops.instrumentation.common.wrappers.unwrap") as mock_unwrap: mock_unwrap.side_effect = Exception("Failed to unwrap") - # Mock the parent class's _uninstrument method - with patch.object(instrumentor, "_uninstrument") as mock_uninstrument: - # Uninstrument should not raise exceptions even if unwrapping fails + # Uninstrument should not raise exceptions even if unwrapping fails + # The instrumentor should handle errors gracefully + try: instrumentor._uninstrument() - - # Verify the parent method was still called - assert mock_uninstrument.called + except Exception: + pytest.fail("Instrumentor should handle unwrapping errors gracefully") def test_instrumentation_with_tracer(self): """Test that the instrumentor gets a tracer with the correct name and version""" diff --git a/third_party/opentelemetry/instrumentation/openai/LICENSE b/third_party/opentelemetry/instrumentation/openai/LICENSE deleted file mode 100644 index 0f2a333f0..000000000 --- a/third_party/opentelemetry/instrumentation/openai/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2023 openllmetry - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/third_party/opentelemetry/instrumentation/openai/NOTICE.md b/third_party/opentelemetry/instrumentation/openai/NOTICE.md deleted file mode 100644 index ca711b794..000000000 --- a/third_party/opentelemetry/instrumentation/openai/NOTICE.md +++ /dev/null @@ -1,8 +0,0 @@ -This package contains code derived from the OpenLLMetry project, which is licensed under the Apache License, Version 2.0. - -Original repository: https://github.com/traceloop/openllmetry - -Copyright notice from the original project: -Copyright (c) Traceloop (https://traceloop.com) - -The Apache 2.0 license can be found in the LICENSE file in this directory. diff --git a/third_party/opentelemetry/instrumentation/openai/__init__.py b/third_party/opentelemetry/instrumentation/openai/__init__.py deleted file mode 100644 index 8a5db1bc1..000000000 --- a/third_party/opentelemetry/instrumentation/openai/__init__.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Callable, Collection, Optional -from typing_extensions import Coroutine - -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor - -from opentelemetry.instrumentation.openai.shared.config import Config -from opentelemetry.instrumentation.openai.utils import is_openai_v1 - -_instruments = ("openai >= 0.27.0",) - - -class OpenAIInstrumentor(BaseInstrumentor): - """An instrumentor for OpenAI's client library.""" - - def __init__( - self, - enrich_assistant: bool = False, - enrich_token_usage: bool = False, - exception_logger=None, - get_common_metrics_attributes: Callable[[], dict] = lambda: {}, - upload_base64_image: Optional[Callable[[str, str, str, str], Coroutine[None, None, str]]] = lambda *args: "", - enable_trace_context_propagation: bool = True, - ): - super().__init__() - Config.enrich_assistant = enrich_assistant - Config.enrich_token_usage = enrich_token_usage - Config.exception_logger = exception_logger - Config.get_common_metrics_attributes = get_common_metrics_attributes - Config.upload_base64_image = upload_base64_image - Config.enable_trace_context_propagation = enable_trace_context_propagation - - def instrumentation_dependencies(self) -> Collection[str]: - return _instruments - - def _instrument(self, **kwargs): - if is_openai_v1(): - from opentelemetry.instrumentation.openai.v1 import OpenAIV1Instrumentor - - OpenAIV1Instrumentor().instrument(**kwargs) - else: - from opentelemetry.instrumentation.openai.v0 import OpenAIV0Instrumentor - - OpenAIV0Instrumentor().instrument(**kwargs) - - def _uninstrument(self, **kwargs): - if is_openai_v1(): - from opentelemetry.instrumentation.openai.v1 import OpenAIV1Instrumentor - - OpenAIV1Instrumentor().uninstrument(**kwargs) - else: - from opentelemetry.instrumentation.openai.v0 import OpenAIV0Instrumentor - - OpenAIV0Instrumentor().uninstrument(**kwargs) diff --git a/third_party/opentelemetry/instrumentation/openai/shared/__init__.py b/third_party/opentelemetry/instrumentation/openai/shared/__init__.py deleted file mode 100644 index 7cc83cfd0..000000000 --- a/third_party/opentelemetry/instrumentation/openai/shared/__init__.py +++ /dev/null @@ -1,296 +0,0 @@ -import os -import openai -import json -import types -import logging - -from importlib.metadata import version - -from opentelemetry import context as context_api -from opentelemetry.trace.propagation import set_span_in_context -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - -from opentelemetry.instrumentation.openai.shared.config import Config -from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( - GEN_AI_RESPONSE_ID, -) -from agentops.semconv import SpanAttributes -from opentelemetry.instrumentation.openai.utils import ( - dont_throw, - is_openai_v1, - should_record_stream_token_usage, -) - -OPENAI_LLM_USAGE_TOKEN_TYPES = ["prompt_tokens", "completion_tokens"] -PROMPT_FILTER_KEY = "prompt_filter_results" -PROMPT_ERROR = "prompt_error" - -_PYDANTIC_VERSION = version("pydantic") - -# tiktoken encodings map for different model, key is model_name, value is tiktoken encoding -tiktoken_encodings = {} - -logger = logging.getLogger(__name__) - - -def should_send_prompts(): - return (os.getenv("TRACELOOP_TRACE_CONTENT") or "true").lower() == "true" or context_api.get_value( - "override_enable_content_tracing" - ) - - -def _set_span_attribute(span, name, value): - if value is None or value == "": - return - - if hasattr(openai, "NOT_GIVEN") and value == openai.NOT_GIVEN: - return - - span.set_attribute(name, value) - - -def _set_client_attributes(span, instance): - if not span.is_recording(): - return - - if not is_openai_v1(): - return - - client = instance._client # pylint: disable=protected-access - if isinstance(client, (openai.AsyncOpenAI, openai.OpenAI)): - _set_span_attribute(span, SpanAttributes.LLM_OPENAI_API_BASE, str(client.base_url)) - if isinstance(client, (openai.AsyncAzureOpenAI, openai.AzureOpenAI)): - _set_span_attribute(span, SpanAttributes.LLM_OPENAI_API_VERSION, client._api_version) # pylint: disable=protected-access - - -def _set_api_attributes(span): - if not span.is_recording(): - return - - if is_openai_v1(): - return - - base_url = openai.base_url if hasattr(openai, "base_url") else openai.api_base - - _set_span_attribute(span, SpanAttributes.LLM_OPENAI_API_BASE, base_url) - _set_span_attribute(span, SpanAttributes.LLM_OPENAI_API_TYPE, openai.api_type) - _set_span_attribute(span, SpanAttributes.LLM_OPENAI_API_VERSION, openai.api_version) - - return - - -def _set_functions_attributes(span, functions): - if not functions: - return - - for i, function in enumerate(functions): - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - _set_span_attribute(span, f"{prefix}.name", function.get("name")) - _set_span_attribute(span, f"{prefix}.description", function.get("description")) - _set_span_attribute(span, f"{prefix}.parameters", json.dumps(function.get("parameters"))) - - -def set_tools_attributes(span, tools): - if not tools: - return - - for i, tool in enumerate(tools): - function = tool.get("function") - if not function: - continue - - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - _set_span_attribute(span, f"{prefix}.name", function.get("name")) - _set_span_attribute(span, f"{prefix}.description", function.get("description")) - _set_span_attribute(span, f"{prefix}.parameters", json.dumps(function.get("parameters"))) - - -def _set_request_attributes(span, kwargs): - if not span.is_recording(): - return - - _set_api_attributes(span) - _set_span_attribute(span, SpanAttributes.LLM_SYSTEM, "OpenAI") - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, kwargs.get("model")) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_MAX_TOKENS, kwargs.get("max_tokens")) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_TEMPERATURE, kwargs.get("temperature")) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_TOP_P, kwargs.get("top_p")) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY, kwargs.get("frequency_penalty")) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY, kwargs.get("presence_penalty")) - _set_span_attribute(span, SpanAttributes.LLM_USER, kwargs.get("user")) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_HEADERS, str(kwargs.get("headers"))) - # The new OpenAI SDK removed the `headers` and create new field called `extra_headers` - if kwargs.get("extra_headers") is not None: - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_HEADERS, str(kwargs.get("extra_headers"))) - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_STREAMING, kwargs.get("stream") or False) - - -@dont_throw -def _set_response_attributes(span, response): - if not span.is_recording(): - return - - if "error" in response: - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{PROMPT_ERROR}", - json.dumps(response.get("error")), - ) - return - - _set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.get("model")) - _set_span_attribute(span, GEN_AI_RESPONSE_ID, response.get("id")) - - _set_span_attribute( - span, - SpanAttributes.LLM_OPENAI_RESPONSE_SYSTEM_FINGERPRINT, - response.get("system_fingerprint"), - ) - _log_prompt_filter(span, response) - usage = response.get("usage") - if not usage: - return - - if is_openai_v1() and not isinstance(usage, dict): - usage = usage.__dict__ - - _set_span_attribute(span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage.get("total_tokens")) - _set_span_attribute( - span, - SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, - usage.get("completion_tokens"), - ) - _set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.get("prompt_tokens")) - - # Extract and set reasoning tokens if available - # Using the standardized SpanAttributes.LLM_USAGE_REASONING_TOKENS attribute - if ( - isinstance(usage, dict) - and "output_tokens_details" in usage - and "reasoning_tokens" in usage.get("output_tokens_details", {}) - ): - reasoning_tokens = usage.get("output_tokens_details", {}).get("reasoning_tokens") - _set_span_attribute(span, SpanAttributes.LLM_USAGE_REASONING_TOKENS, reasoning_tokens) - return - - -def _log_prompt_filter(span, response_dict): - if response_dict.get("prompt_filter_results"): - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{PROMPT_FILTER_KEY}", - json.dumps(response_dict.get("prompt_filter_results")), - ) - - -@dont_throw -def _set_span_stream_usage(span, prompt_tokens, completion_tokens): - if not span.is_recording(): - return - - if isinstance(completion_tokens, int) and completion_tokens >= 0: - _set_span_attribute(span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens) - - if isinstance(prompt_tokens, int) and prompt_tokens >= 0: - _set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, prompt_tokens) - - if isinstance(prompt_tokens, int) and isinstance(completion_tokens, int) and completion_tokens + prompt_tokens >= 0: - _set_span_attribute( - span, - SpanAttributes.LLM_USAGE_TOTAL_TOKENS, - completion_tokens + prompt_tokens, - ) - - -def _get_openai_base_url(instance): - if hasattr(instance, "_client"): - client = instance._client # pylint: disable=protected-access - if isinstance(client, (openai.AsyncOpenAI, openai.OpenAI)): - return str(client.base_url) - - return "" - - -def is_streaming_response(response): - if is_openai_v1(): - return isinstance(response, openai.Stream) or isinstance(response, openai.AsyncStream) - - return isinstance(response, types.GeneratorType) or isinstance(response, types.AsyncGeneratorType) - - -def model_as_dict(model): - if isinstance(model, dict): - return model - if _PYDANTIC_VERSION < "2.0.0": - return model.dict() - if hasattr(model, "model_dump"): - return model.model_dump() - elif hasattr(model, "parse"): # Raw API response - return model_as_dict(model.parse()) - else: - return model - - -def get_token_count_from_string(string: str, model_name: str): - if not should_record_stream_token_usage(): - return None - - import tiktoken - - if tiktoken_encodings.get(model_name) is None: - try: - encoding = tiktoken.encoding_for_model(model_name) - except KeyError as ex: - # no such model_name in tiktoken - logger.warning(f"Failed to get tiktoken encoding for model_name {model_name}, error: {str(ex)}") - return None - - tiktoken_encodings[model_name] = encoding - else: - encoding = tiktoken_encodings.get(model_name) - - token_count = len(encoding.encode(string)) - return token_count - - -def _token_type(token_type: str): - # Map standardized token types to API-specific token types (target → source) - token_type_mapping = {"input": "prompt_tokens", "output": "completion_tokens"} - # TODO: This implementation is still incorrect and needs to be fixed properly. - # We're defining the dictionary using the proper target→source pattern, - # but the function is actually being used in the opposite direction (source→target). - # The correct fix would be to use get_value() from agentops.instrumentation.openai and - # modify the call sites (in _set_token_counter_metrics) to handle the reversed lookup properly. - # This would require changes to the chat_wrappers.py and completion_wrappers.py files. - - # Return the reverse mapping since we're converting from source to target - for target, source in token_type_mapping.items(): - if token_type == source: - return target - return None - - -def metric_shared_attributes(response_model: str, operation: str, server_address: str, is_streaming: bool = False): - attributes = Config.get_common_metrics_attributes() - - return { - **attributes, - SpanAttributes.LLM_SYSTEM: "openai", - SpanAttributes.LLM_RESPONSE_MODEL: response_model, - "gen_ai.operation.name": operation, - "server.address": server_address, - "stream": is_streaming, - } - - -def propagate_trace_context(span, kwargs): - if is_openai_v1(): - extra_headers = kwargs.get("extra_headers", {}) - ctx = set_span_in_context(span) - TraceContextTextMapPropagator().inject(extra_headers, context=ctx) - kwargs["extra_headers"] = extra_headers - else: - headers = kwargs.get("headers", {}) - ctx = set_span_in_context(span) - TraceContextTextMapPropagator().inject(headers, context=ctx) - kwargs["headers"] = headers diff --git a/third_party/opentelemetry/instrumentation/openai/shared/chat_wrappers.py b/third_party/opentelemetry/instrumentation/openai/shared/chat_wrappers.py deleted file mode 100644 index 137854d87..000000000 --- a/third_party/opentelemetry/instrumentation/openai/shared/chat_wrappers.py +++ /dev/null @@ -1,852 +0,0 @@ -import copy -import json -import logging -import time -from opentelemetry.instrumentation.openai.shared.config import Config -from wrapt import ObjectProxy - - -from opentelemetry import context as context_api -from opentelemetry.metrics import Counter, Histogram -from agentops.semconv import ( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, - SpanAttributes as BaseSpanAttributes, - LLMRequestTypeValues, -) - -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.instrumentation.openai.utils import ( - _with_chat_telemetry_wrapper, - dont_throw, - run_async, -) -from opentelemetry.instrumentation.openai.shared import ( - metric_shared_attributes, - _set_client_attributes, - _set_request_attributes, - _set_span_attribute, - _set_functions_attributes, - _token_type, - set_tools_attributes, - _set_response_attributes, - is_streaming_response, - should_send_prompts, - model_as_dict, - _get_openai_base_url, - OPENAI_LLM_USAGE_TOKEN_TYPES, - should_record_stream_token_usage, - get_token_count_from_string, - _set_span_stream_usage, - propagate_trace_context, -) -from opentelemetry.trace import SpanKind, Tracer -from opentelemetry.trace.status import Status, StatusCode - -from opentelemetry.instrumentation.openai.utils import is_openai_v1 - -SPAN_NAME = "openai.chat.completion" -PROMPT_FILTER_KEY = "prompt_filter_results" -CONTENT_FILTER_KEY = "content_filter_results" - -LLM_REQUEST_TYPE = LLMRequestTypeValues.CHAT - -logger = logging.getLogger(__name__) - - -@_with_chat_telemetry_wrapper -def chat_wrapper( - tracer: Tracer, - token_counter: Counter, - choice_counter: Counter, - duration_histogram: Histogram, - exception_counter: Counter, - streaming_time_to_first_token: Histogram, - streaming_time_to_generate: Histogram, - wrapped, - instance, - args, - kwargs, -): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return wrapped(*args, **kwargs) - # span needs to be opened and closed manually because the response is a generator - - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - attributes={BaseSpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value}, - ) - - run_async(_handle_request(span, kwargs, instance)) - - try: - start_time = time.time() - response = wrapped(*args, **kwargs) - end_time = time.time() - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - duration = end_time - start_time if "start_time" in locals() else 0 - - attributes = { - "error.type": e.__class__.__name__, - } - - if duration > 0 and duration_histogram: - duration_histogram.record(duration, attributes=attributes) - if exception_counter: - exception_counter.add(1, attributes=attributes) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.end() - - raise e - - if is_streaming_response(response): - # span will be closed after the generator is done - if is_openai_v1(): - return ChatStream( - span, - response, - instance, - token_counter, - choice_counter, - duration_histogram, - streaming_time_to_first_token, - streaming_time_to_generate, - start_time, - kwargs, - ) - else: - return _build_from_streaming_response( - span, - response, - instance, - token_counter, - choice_counter, - duration_histogram, - streaming_time_to_first_token, - streaming_time_to_generate, - start_time, - kwargs, - ) - - duration = end_time - start_time - - _handle_response( - response, - span, - instance, - token_counter, - choice_counter, - duration_histogram, - duration, - ) - span.end() - - return response - - -@_with_chat_telemetry_wrapper -async def achat_wrapper( - tracer: Tracer, - token_counter: Counter, - choice_counter: Counter, - duration_histogram: Histogram, - exception_counter: Counter, - streaming_time_to_first_token: Histogram, - streaming_time_to_generate: Histogram, - wrapped, - instance, - args, - kwargs, -): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return await wrapped(*args, **kwargs) - - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - attributes={BaseSpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value}, - ) - await _handle_request(span, kwargs, instance) - - try: - start_time = time.time() - response = await wrapped(*args, **kwargs) - end_time = time.time() - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - duration = end_time - start_time if "start_time" in locals() else 0 - - common_attributes = Config.get_common_metrics_attributes() - attributes = { - **common_attributes, - "error.type": e.__class__.__name__, - } - - if duration > 0 and duration_histogram: - duration_histogram.record(duration, attributes=attributes) - if exception_counter: - exception_counter.add(1, attributes=attributes) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.end() - - raise e - - if is_streaming_response(response): - # span will be closed after the generator is done - if is_openai_v1(): - return ChatStream( - span, - response, - instance, - token_counter, - choice_counter, - duration_histogram, - streaming_time_to_first_token, - streaming_time_to_generate, - start_time, - kwargs, - ) - else: - return _abuild_from_streaming_response( - span, - response, - instance, - token_counter, - choice_counter, - duration_histogram, - streaming_time_to_first_token, - streaming_time_to_generate, - start_time, - kwargs, - ) - - duration = end_time - start_time - - _handle_response( - response, - span, - instance, - token_counter, - choice_counter, - duration_histogram, - duration, - ) - span.end() - - return response - - -@dont_throw -async def _handle_request(span, kwargs, instance): - _set_request_attributes(span, kwargs) - _set_client_attributes(span, instance) - if should_send_prompts(): - await _set_prompts(span, kwargs.get("messages")) - if kwargs.get("functions"): - _set_functions_attributes(span, kwargs.get("functions")) - elif kwargs.get("tools"): - set_tools_attributes(span, kwargs.get("tools")) - if Config.enable_trace_context_propagation: - propagate_trace_context(span, kwargs) - - -@dont_throw -def _handle_response( - response, - span, - instance=None, - token_counter=None, - choice_counter=None, - duration_histogram=None, - duration=None, -): - if is_openai_v1(): - response_dict = model_as_dict(response) - else: - response_dict = response - - # metrics record - _set_chat_metrics( - instance, - token_counter, - choice_counter, - duration_histogram, - response_dict, - duration, - ) - - # span attributes - _set_response_attributes(span, response_dict) - - if should_send_prompts(): - _set_completions(span, response_dict.get("choices")) - - return response - - -def _set_chat_metrics(instance, token_counter, choice_counter, duration_histogram, response_dict, duration): - shared_attributes = metric_shared_attributes( - response_model=response_dict.get("model") or None, - operation="chat", - server_address=_get_openai_base_url(instance), - is_streaming=False, - ) - - # token metrics - usage = response_dict.get("usage") # type: dict - if usage and token_counter: - _set_token_counter_metrics(token_counter, usage, shared_attributes) - - # choices metrics - choices = response_dict.get("choices") - if choices and choice_counter: - _set_choice_counter_metrics(choice_counter, choices, shared_attributes) - - # duration metrics - if duration and isinstance(duration, (float, int)) and duration_histogram: - duration_histogram.record(duration, attributes=shared_attributes) - - -def _set_choice_counter_metrics(choice_counter, choices, shared_attributes): - choice_counter.add(len(choices), attributes=shared_attributes) - for choice in choices: - attributes_with_reason = {**shared_attributes} - if choice.get("finish_reason"): - attributes_with_reason[BaseSpanAttributes.LLM_RESPONSE_FINISH_REASON] = choice.get("finish_reason") - choice_counter.add(1, attributes=attributes_with_reason) - - -def _set_token_counter_metrics(token_counter, usage, shared_attributes): - for name, val in usage.items(): - if name in OPENAI_LLM_USAGE_TOKEN_TYPES: - attributes_with_token_type = { - **shared_attributes, - BaseSpanAttributes.LLM_TOKEN_TYPE: _token_type(name), - } - token_counter.record(val, attributes=attributes_with_token_type) - - -def _is_base64_image(item): - if not isinstance(item, dict): - return False - - if not isinstance(item.get("image_url"), dict): - return False - - if "data:image/" not in item.get("image_url", {}).get("url", ""): - return False - - return True - - -async def _process_image_item(item, trace_id, span_id, message_index, content_index): - if not Config.upload_base64_image: - return item - - image_format = item["image_url"]["url"].split(";")[0].split("/")[1] - image_name = f"message_{message_index}_content_{content_index}.{image_format}" - base64_string = item["image_url"]["url"].split(",")[1] - url = await Config.upload_base64_image(trace_id, span_id, image_name, base64_string) - - return {"type": "image_url", "image_url": {"url": url}} - - -@dont_throw -async def _set_prompts(span, messages): - if not span.is_recording() or messages is None: - return - - for i, msg in enumerate(messages): - prefix = f"{BaseSpanAttributes.LLM_PROMPTS}.{i}" - - _set_span_attribute(span, f"{prefix}.role", msg.get("role")) - if msg.get("content"): - content = copy.deepcopy(msg.get("content")) - if isinstance(content, list): - content = [ - ( - await _process_image_item(item, span.context.trace_id, span.context.span_id, i, j) - if _is_base64_image(item) - else item - ) - for j, item in enumerate(content) - ] - - content = json.dumps(content) - _set_span_attribute(span, f"{prefix}.content", content) - if msg.get("tool_call_id"): - _set_span_attribute(span, f"{prefix}.tool_call_id", msg.get("tool_call_id")) - tool_calls = msg.get("tool_calls") - if tool_calls: - for i, tool_call in enumerate(tool_calls): - if is_openai_v1(): - tool_call = model_as_dict(tool_call) - - function = tool_call.get("function") - _set_span_attribute( - span, - f"{prefix}.tool_calls.{i}.id", - tool_call.get("id"), - ) - _set_span_attribute( - span, - f"{prefix}.tool_calls.{i}.name", - function.get("name"), - ) - _set_span_attribute( - span, - f"{prefix}.tool_calls.{i}.arguments", - function.get("arguments"), - ) - - -def _set_completions(span, choices): - if choices is None: - return - - for choice in choices: - index = choice.get("index") - prefix = f"{BaseSpanAttributes.LLM_COMPLETIONS}.{index}" - _set_span_attribute(span, f"{prefix}.finish_reason", choice.get("finish_reason")) - - if choice.get("content_filter_results"): - _set_span_attribute( - span, - f"{prefix}.{CONTENT_FILTER_KEY}", - json.dumps(choice.get("content_filter_results")), - ) - - if choice.get("finish_reason") == "content_filter": - _set_span_attribute(span, f"{prefix}.role", "assistant") - _set_span_attribute(span, f"{prefix}.content", "FILTERED") - - return - - message = choice.get("message") - if not message: - return - - _set_span_attribute(span, f"{prefix}.role", message.get("role")) - - if message.get("refusal"): - _set_span_attribute(span, f"{prefix}.refusal", message.get("refusal")) - else: - _set_span_attribute(span, f"{prefix}.content", message.get("content")) - - function_call = message.get("function_call") - if function_call: - _set_span_attribute(span, f"{prefix}.tool_calls.0.name", function_call.get("name")) - _set_span_attribute( - span, - f"{prefix}.tool_calls.0.arguments", - function_call.get("arguments"), - ) - - tool_calls = message.get("tool_calls") - if tool_calls: - for i, tool_call in enumerate(tool_calls): - function = tool_call.get("function") - _set_span_attribute( - span, - f"{prefix}.tool_calls.{i}.id", - tool_call.get("id"), - ) - _set_span_attribute( - span, - f"{prefix}.tool_calls.{i}.name", - function.get("name"), - ) - _set_span_attribute( - span, - f"{prefix}.tool_calls.{i}.arguments", - function.get("arguments"), - ) - - -@dont_throw -def _set_streaming_token_metrics(request_kwargs, complete_response, span, token_counter, shared_attributes): - # use tiktoken calculate token usage - if not should_record_stream_token_usage(): - return - - # kwargs={'model': 'gpt-3.5', 'messages': [{'role': 'user', 'content': '...'}], 'stream': True} - prompt_usage = -1 - completion_usage = -1 - - # prompt_usage - if request_kwargs and request_kwargs.get("messages"): - prompt_content = "" - # setting the default model_name as gpt-4. As this uses the embedding "cl100k_base" that - # is used by most of the other model. - model_name = complete_response.get("model") or request_kwargs.get("model") or "gpt-4" - for msg in request_kwargs.get("messages"): - if msg.get("content"): - prompt_content += msg.get("content") - if model_name: - prompt_usage = get_token_count_from_string(prompt_content, model_name) - - # completion_usage - if complete_response.get("choices"): - completion_content = "" - # setting the default model_name as gpt-4. As this uses the embedding "cl100k_base" that - # is used by most of the other model. - model_name = complete_response.get("model") or "gpt-4" - - for choice in complete_response.get("choices"): - if choice.get("message") and choice.get("message").get("content"): - completion_content += choice["message"]["content"] - - if model_name: - completion_usage = get_token_count_from_string(completion_content, model_name) - - # span record - _set_span_stream_usage(span, prompt_usage, completion_usage) - - # metrics record - if token_counter: - if isinstance(prompt_usage, int) and prompt_usage >= 0: - attributes_with_token_type = { - **shared_attributes, - BaseSpanAttributes.LLM_TOKEN_TYPE: "input", - } - token_counter.record(prompt_usage, attributes=attributes_with_token_type) - - if isinstance(completion_usage, int) and completion_usage >= 0: - attributes_with_token_type = { - **shared_attributes, - BaseSpanAttributes.LLM_TOKEN_TYPE: "output", - } - token_counter.record(completion_usage, attributes=attributes_with_token_type) - - -class ChatStream(ObjectProxy): - _span = None - _instance = None - _token_counter = None - _choice_counter = None - _duration_histogram = None - _streaming_time_to_first_token = None - _streaming_time_to_generate = None - _start_time = None - _request_kwargs = None - - def __init__( - self, - span, - response, - instance=None, - token_counter=None, - choice_counter=None, - duration_histogram=None, - streaming_time_to_first_token=None, - streaming_time_to_generate=None, - start_time=None, - request_kwargs=None, - ): - super().__init__(response) - - self._span = span - self._instance = instance - self._token_counter = token_counter - self._choice_counter = choice_counter - self._duration_histogram = duration_histogram - self._streaming_time_to_first_token = streaming_time_to_first_token - self._streaming_time_to_generate = streaming_time_to_generate - self._start_time = start_time - self._request_kwargs = request_kwargs - - self._first_token = True - # will be updated when first token is received - self._time_of_first_token = self._start_time - self._complete_response = {"choices": [], "model": ""} - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.__wrapped__.__exit__(exc_type, exc_val, exc_tb) - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) - - def __iter__(self): - return self - - def __aiter__(self): - return self - - def __next__(self): - try: - chunk = self.__wrapped__.__next__() - except Exception as e: - if isinstance(e, StopIteration): - self._close_span() - raise e - else: - self._process_item(chunk) - return chunk - - async def __anext__(self): - try: - chunk = await self.__wrapped__.__anext__() - except Exception as e: - if isinstance(e, StopAsyncIteration): - self._close_span() - raise e - else: - self._process_item(chunk) - return chunk - - def _process_item(self, item): - self._span.add_event(name=f"{BaseSpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}") - - if self._first_token and self._streaming_time_to_first_token: - self._time_of_first_token = time.time() - self._streaming_time_to_first_token.record( - self._time_of_first_token - self._start_time, - attributes=self._shared_attributes(), - ) - self._first_token = False - - _accumulate_stream_items(item, self._complete_response) - - def _shared_attributes(self): - return metric_shared_attributes( - response_model=self._complete_response.get("model") or self._request_kwargs.get("model") or None, - operation="chat", - server_address=_get_openai_base_url(self._instance), - is_streaming=True, - ) - - @dont_throw - def _close_span(self): - _set_streaming_token_metrics( - self._request_kwargs, - self._complete_response, - self._span, - self._token_counter, - self._shared_attributes(), - ) - - # choice metrics - if self._choice_counter and self._complete_response.get("choices"): - _set_choice_counter_metrics( - self._choice_counter, - self._complete_response.get("choices"), - self._shared_attributes(), - ) - - # duration metrics - if self._start_time and isinstance(self._start_time, (float, int)): - duration = time.time() - self._start_time - else: - duration = None - if duration and isinstance(duration, (float, int)) and self._duration_histogram: - self._duration_histogram.record(duration, attributes=self._shared_attributes()) - if self._streaming_time_to_generate and self._time_of_first_token: - self._streaming_time_to_generate.record( - time.time() - self._time_of_first_token, - attributes=self._shared_attributes(), - ) - - _set_response_attributes(self._span, self._complete_response) - - if should_send_prompts(): - _set_completions(self._span, self._complete_response.get("choices")) - - self._span.set_status(Status(StatusCode.OK)) - self._span.end() - - -# Backward compatibility with OpenAI v0 - - -@dont_throw -def _build_from_streaming_response( - span, - response, - instance=None, - token_counter=None, - choice_counter=None, - duration_histogram=None, - streaming_time_to_first_token=None, - streaming_time_to_generate=None, - start_time=None, - request_kwargs=None, -): - complete_response = {"choices": [], "model": "", "id": ""} - - first_token = True - time_of_first_token = start_time # will be updated when first token is received - - for item in response: - span.add_event(name=f"{BaseSpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}") - - item_to_yield = item - - if first_token and streaming_time_to_first_token: - time_of_first_token = time.time() - streaming_time_to_first_token.record(time_of_first_token - start_time) - first_token = False - - _accumulate_stream_items(item, complete_response) - - yield item_to_yield - - shared_attributes = { - BaseSpanAttributes.LLM_RESPONSE_MODEL: complete_response.get("model") or None, - "server.address": _get_openai_base_url(instance), - "stream": True, - } - - _set_streaming_token_metrics(request_kwargs, complete_response, span, token_counter, shared_attributes) - - # choice metrics - if choice_counter and complete_response.get("choices"): - _set_choice_counter_metrics(choice_counter, complete_response.get("choices"), shared_attributes) - - # duration metrics - if start_time and isinstance(start_time, (float, int)): - duration = time.time() - start_time - else: - duration = None - if duration and isinstance(duration, (float, int)) and duration_histogram: - duration_histogram.record(duration, attributes=shared_attributes) - if streaming_time_to_generate and time_of_first_token: - streaming_time_to_generate.record(time.time() - time_of_first_token) - - _set_response_attributes(span, complete_response) - - if should_send_prompts(): - _set_completions(span, complete_response.get("choices")) - - span.set_status(Status(StatusCode.OK)) - span.end() - - -@dont_throw -async def _abuild_from_streaming_response( - span, - response, - instance=None, - token_counter=None, - choice_counter=None, - duration_histogram=None, - streaming_time_to_first_token=None, - streaming_time_to_generate=None, - start_time=None, - request_kwargs=None, -): - complete_response = {"choices": [], "model": "", "id": ""} - - first_token = True - time_of_first_token = start_time # will be updated when first token is received - - async for item in response: - span.add_event(name=f"{BaseSpanAttributes.LLM_CONTENT_COMPLETION_CHUNK}") - - item_to_yield = item - - if first_token and streaming_time_to_first_token: - time_of_first_token = time.time() - streaming_time_to_first_token.record(time_of_first_token - start_time) - first_token = False - - _accumulate_stream_items(item, complete_response) - - yield item_to_yield - - shared_attributes = { - BaseSpanAttributes.LLM_RESPONSE_MODEL: complete_response.get("model") or None, - "server.address": _get_openai_base_url(instance), - "stream": True, - } - - _set_streaming_token_metrics(request_kwargs, complete_response, span, token_counter, shared_attributes) - - # choice metrics - if choice_counter and complete_response.get("choices"): - _set_choice_counter_metrics(choice_counter, complete_response.get("choices"), shared_attributes) - - # duration metrics - if start_time and isinstance(start_time, (float, int)): - duration = time.time() - start_time - else: - duration = None - if duration and isinstance(duration, (float, int)) and duration_histogram: - duration_histogram.record(duration, attributes=shared_attributes) - if streaming_time_to_generate and time_of_first_token: - streaming_time_to_generate.record(time.time() - time_of_first_token) - - _set_response_attributes(span, complete_response) - - if should_send_prompts(): - _set_completions(span, complete_response.get("choices")) - - span.set_status(Status(StatusCode.OK)) - span.end() - - -def _accumulate_stream_items(item, complete_response): - if is_openai_v1(): - item = model_as_dict(item) - - complete_response["model"] = item.get("model") - complete_response["id"] = item.get("id") - - # prompt filter results - if item.get("prompt_filter_results"): - complete_response["prompt_filter_results"] = item.get("prompt_filter_results") - - for choice in item.get("choices"): - index = choice.get("index") - if len(complete_response.get("choices")) <= index: - complete_response["choices"].append({"index": index, "message": {"content": "", "role": ""}}) - complete_choice = complete_response.get("choices")[index] - if choice.get("finish_reason"): - complete_choice["finish_reason"] = choice.get("finish_reason") - if choice.get("content_filter_results"): - complete_choice["content_filter_results"] = choice.get("content_filter_results") - - delta = choice.get("delta") - - if delta and delta.get("content"): - complete_choice["message"]["content"] += delta.get("content") - - if delta and delta.get("role"): - complete_choice["message"]["role"] = delta.get("role") - if delta and delta.get("tool_calls"): - tool_calls = delta.get("tool_calls") - if not isinstance(tool_calls, list) or len(tool_calls) == 0: - continue - - if not complete_choice["message"].get("tool_calls"): - complete_choice["message"]["tool_calls"] = [] - - for tool_call in tool_calls: - i = int(tool_call["index"]) - if len(complete_choice["message"]["tool_calls"]) <= i: - complete_choice["message"]["tool_calls"].append( - {"id": "", "function": {"name": "", "arguments": ""}} - ) - - span_tool_call = complete_choice["message"]["tool_calls"][i] - span_function = span_tool_call["function"] - tool_call_function = tool_call.get("function") - - if tool_call.get("id"): - span_tool_call["id"] = tool_call.get("id") - if tool_call_function and tool_call_function.get("name"): - span_function["name"] = tool_call_function.get("name") - if tool_call_function and tool_call_function.get("arguments"): - span_function["arguments"] += tool_call_function.get("arguments") diff --git a/third_party/opentelemetry/instrumentation/openai/shared/completion_wrappers.py b/third_party/opentelemetry/instrumentation/openai/shared/completion_wrappers.py deleted file mode 100644 index 3bc053d74..000000000 --- a/third_party/opentelemetry/instrumentation/openai/shared/completion_wrappers.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging - -from opentelemetry import context as context_api - -from agentops.semconv import ( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, - SpanAttributes, - LLMRequestTypeValues, -) - -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.instrumentation.openai.utils import _with_tracer_wrapper, dont_throw -from opentelemetry.instrumentation.openai.shared import ( - _set_client_attributes, - _set_request_attributes, - _set_span_attribute, - _set_functions_attributes, - _set_response_attributes, - is_streaming_response, - should_send_prompts, - model_as_dict, - should_record_stream_token_usage, - get_token_count_from_string, - _set_span_stream_usage, - propagate_trace_context, -) - -from opentelemetry.instrumentation.openai.utils import is_openai_v1 - -from opentelemetry.trace import SpanKind -from opentelemetry.trace.status import Status, StatusCode - -from opentelemetry.instrumentation.openai.shared.config import Config - -SPAN_NAME = "openai.completion" -LLM_REQUEST_TYPE = LLMRequestTypeValues.COMPLETION - -logger = logging.getLogger(__name__) - - -@_with_tracer_wrapper -def completion_wrapper(tracer, wrapped, instance, args, kwargs): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return wrapped(*args, **kwargs) - - # span needs to be opened and closed manually because the response is a generator - span = tracer.start_span( - SPAN_NAME, - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value}, - ) - - _handle_request(span, kwargs, instance) - try: - response = wrapped(*args, **kwargs) - except Exception as e: - span.set_status(Status(StatusCode.ERROR, str(e))) - span.end() - raise e - - if is_streaming_response(response): - # span will be closed after the generator is done - return _build_from_streaming_response(span, kwargs, response) - else: - _handle_response(response, span) - - span.end() - return response - - -@_with_tracer_wrapper -async def acompletion_wrapper(tracer, wrapped, instance, args, kwargs): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return await wrapped(*args, **kwargs) - - span = tracer.start_span( - name=SPAN_NAME, - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value}, - ) - - _handle_request(span, kwargs, instance) - try: - response = await wrapped(*args, **kwargs) - except Exception as e: - span.set_status(Status(StatusCode.ERROR, str(e))) - span.end() - raise e - - if is_streaming_response(response): - # span will be closed after the generator is done - return _abuild_from_streaming_response(span, kwargs, response) - else: - _handle_response(response, span) - - span.end() - return response - - -@dont_throw -def _handle_request(span, kwargs, instance): - _set_request_attributes(span, kwargs) - if should_send_prompts(): - _set_prompts(span, kwargs.get("prompt")) - _set_functions_attributes(span, kwargs.get("functions")) - _set_client_attributes(span, instance) - if Config.enable_trace_context_propagation: - propagate_trace_context(span, kwargs) - - -@dont_throw -def _handle_response(response, span): - if is_openai_v1(): - response_dict = model_as_dict(response) - else: - response_dict = response - - _set_response_attributes(span, response_dict) - - if should_send_prompts(): - _set_completions(span, response_dict.get("choices")) - - -def _set_prompts(span, prompt): - if not span.is_recording() or not prompt: - return - - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.0.user", - prompt[0] if isinstance(prompt, list) else prompt, - ) - - -@dont_throw -def _set_completions(span, choices): - if not span.is_recording() or not choices: - return - - for choice in choices: - index = choice.get("index") - prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" - _set_span_attribute(span, f"{prefix}.finish_reason", choice.get("finish_reason")) - _set_span_attribute(span, f"{prefix}.content", choice.get("text")) - - -@dont_throw -def _build_from_streaming_response(span, request_kwargs, response): - complete_response = {"choices": [], "model": "", "id": ""} - for item in response: - yield item - _accumulate_streaming_response(complete_response, item) - - _set_response_attributes(span, complete_response) - - _set_token_usage(span, request_kwargs, complete_response) - - if should_send_prompts(): - _set_completions(span, complete_response.get("choices")) - - span.set_status(Status(StatusCode.OK)) - span.end() - - -@dont_throw -async def _abuild_from_streaming_response(span, request_kwargs, response): - complete_response = {"choices": [], "model": "", "id": ""} - async for item in response: - yield item - _accumulate_streaming_response(complete_response, item) - - _set_response_attributes(span, complete_response) - - _set_token_usage(span, request_kwargs, complete_response) - - if should_send_prompts(): - _set_completions(span, complete_response.get("choices")) - - span.set_status(Status(StatusCode.OK)) - span.end() - - -@dont_throw -def _set_token_usage(span, request_kwargs, complete_response): - # use tiktoken calculate token usage - if should_record_stream_token_usage(): - prompt_usage = -1 - completion_usage = -1 - - # prompt_usage - if request_kwargs and request_kwargs.get("prompt"): - prompt_content = request_kwargs.get("prompt") - model_name = complete_response.get("model") or None - - if model_name: - prompt_usage = get_token_count_from_string(prompt_content, model_name) - - # completion_usage - if complete_response.get("choices"): - completion_content = "" - model_name = complete_response.get("model") or None - - for choice in complete_response.get("choices"): - if choice.get("text"): - completion_content += choice.get("text") - - if model_name: - completion_usage = get_token_count_from_string(completion_content, model_name) - - # span record - _set_span_stream_usage(span, prompt_usage, completion_usage) - - -@dont_throw -def _accumulate_streaming_response(complete_response, item): - if is_openai_v1(): - item = model_as_dict(item) - - complete_response["model"] = item.get("model") - complete_response["id"] = item.get("id") - for choice in item.get("choices"): - index = choice.get("index") - if len(complete_response.get("choices")) <= index: - complete_response["choices"].append({"index": index, "text": ""}) - complete_choice = complete_response.get("choices")[index] - if choice.get("finish_reason"): - complete_choice["finish_reason"] = choice.get("finish_reason") - - if choice.get("text"): - complete_choice["text"] += choice.get("text") - - return complete_response diff --git a/third_party/opentelemetry/instrumentation/openai/shared/config.py b/third_party/opentelemetry/instrumentation/openai/shared/config.py deleted file mode 100644 index 18f44690c..000000000 --- a/third_party/opentelemetry/instrumentation/openai/shared/config.py +++ /dev/null @@ -1,10 +0,0 @@ -from typing import Callable - - -class Config: - enrich_token_usage = False - enrich_assistant = False - exception_logger = None - get_common_metrics_attributes: Callable[[], dict] = lambda: {} - upload_base64_image: Callable[[str, str, str], str] = lambda trace_id, span_id, base64_image_url: str - enable_trace_context_propagation: bool = True diff --git a/third_party/opentelemetry/instrumentation/openai/shared/embeddings_wrappers.py b/third_party/opentelemetry/instrumentation/openai/shared/embeddings_wrappers.py deleted file mode 100644 index ee4972dfb..000000000 --- a/third_party/opentelemetry/instrumentation/openai/shared/embeddings_wrappers.py +++ /dev/null @@ -1,257 +0,0 @@ -import logging -import time - -from opentelemetry import context as context_api -from opentelemetry.metrics import Counter, Histogram -from agentops.semconv import ( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, - SpanAttributes, - LLMRequestTypeValues, -) - -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.instrumentation.openai.utils import ( - dont_throw, - start_as_current_span_async, - _with_embeddings_telemetry_wrapper, -) -from opentelemetry.instrumentation.openai.shared import ( - metric_shared_attributes, - _set_client_attributes, - _set_request_attributes, - _set_span_attribute, - _set_response_attributes, - _token_type, - should_send_prompts, - model_as_dict, - _get_openai_base_url, - OPENAI_LLM_USAGE_TOKEN_TYPES, - propagate_trace_context, -) - -from opentelemetry.instrumentation.openai.shared.config import Config - -from opentelemetry.instrumentation.openai.utils import is_openai_v1 - -from opentelemetry.trace import SpanKind -from opentelemetry.trace import Status, StatusCode - -SPAN_NAME = "openai.embeddings" -LLM_REQUEST_TYPE = LLMRequestTypeValues.EMBEDDING - -logger = logging.getLogger(__name__) - - -@_with_embeddings_telemetry_wrapper -def embeddings_wrapper( - tracer, - token_counter: Counter, - vector_size_counter: Counter, - duration_histogram: Histogram, - exception_counter: Counter, - wrapped, - instance, - args, - kwargs, -): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return wrapped(*args, **kwargs) - - with tracer.start_as_current_span( - name=SPAN_NAME, - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value}, - ) as span: - _handle_request(span, kwargs, instance) - - try: - # record time for duration - start_time = time.time() - response = wrapped(*args, **kwargs) - end_time = time.time() - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - duration = end_time - start_time if "start_time" in locals() else 0 - attributes = { - "error.type": e.__class__.__name__, - } - - # if there are legal duration, record it - if duration > 0 and duration_histogram: - duration_histogram.record(duration, attributes=attributes) - if exception_counter: - exception_counter.add(1, attributes=attributes) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.end() - - raise e - - duration = end_time - start_time - - _handle_response( - response, - span, - instance, - token_counter, - vector_size_counter, - duration_histogram, - duration, - ) - - return response - - -@_with_embeddings_telemetry_wrapper -async def aembeddings_wrapper( - tracer, - token_counter: Counter, - vector_size_counter: Counter, - duration_histogram: Histogram, - exception_counter: Counter, - wrapped, - instance, - args, - kwargs, -): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return await wrapped(*args, **kwargs) - - async with start_as_current_span_async( - tracer=tracer, - name=SPAN_NAME, - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value}, - ) as span: - _handle_request(span, kwargs, instance) - try: - # record time for duration - start_time = time.time() - response = await wrapped(*args, **kwargs) - end_time = time.time() - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - duration = end_time - start_time if "start_time" in locals() else 0 - attributes = { - "error.type": e.__class__.__name__, - } - - # if there are legal duration, record it - if duration > 0 and duration_histogram: - duration_histogram.record(duration, attributes=attributes) - if exception_counter: - exception_counter.add(1, attributes=attributes) - - span.set_status(Status(StatusCode.ERROR, str(e))) - span.end() - - raise e - - duration = end_time - start_time - _handle_response( - response, - span, - instance, - token_counter, - vector_size_counter, - duration_histogram, - duration, - ) - - return response - - -@dont_throw -def _handle_request(span, kwargs, instance): - _set_request_attributes(span, kwargs) - if should_send_prompts(): - _set_prompts(span, kwargs.get("input")) - _set_client_attributes(span, instance) - if Config.enable_trace_context_propagation: - propagate_trace_context(span, kwargs) - - -@dont_throw -def _handle_response( - response, - span, - instance=None, - token_counter=None, - vector_size_counter=None, - duration_histogram=None, - duration=None, -): - if is_openai_v1(): - response_dict = model_as_dict(response) - else: - response_dict = response - # metrics record - _set_embeddings_metrics( - instance, - token_counter, - vector_size_counter, - duration_histogram, - response_dict, - duration, - ) - # span attributes - _set_response_attributes(span, response_dict) - - -def _set_embeddings_metrics( - instance, - token_counter, - vector_size_counter, - duration_histogram, - response_dict, - duration, -): - shared_attributes = metric_shared_attributes( - response_model=response_dict.get("model") or None, - operation="embeddings", - server_address=_get_openai_base_url(instance), - ) - - # token count metrics - usage = response_dict.get("usage") - if usage and token_counter: - for name, val in usage.items(): - if name in OPENAI_LLM_USAGE_TOKEN_TYPES: - if val is None: - logging.error(f"Received None value for {name} in usage") - continue - attributes_with_token_type = { - **shared_attributes, - SpanAttributes.LLM_TOKEN_TYPE: _token_type(name), - } - token_counter.record(val, attributes=attributes_with_token_type) - - # vec size metrics - # should use counter for vector_size? - vec_embedding = (response_dict.get("data") or [{}])[0].get("embedding", []) - vec_size = len(vec_embedding) - if vector_size_counter: - vector_size_counter.add(vec_size, attributes=shared_attributes) - - # duration metrics - if duration and isinstance(duration, (float, int)) and duration_histogram: - duration_histogram.record(duration, attributes=shared_attributes) - - -def _set_prompts(span, prompt): - if not span.is_recording() or not prompt: - return - - if isinstance(prompt, list): - for i, p in enumerate(prompt): - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.content", p) - else: - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.0.content", - prompt, - ) diff --git a/third_party/opentelemetry/instrumentation/openai/shared/image_gen_wrappers.py b/third_party/opentelemetry/instrumentation/openai/shared/image_gen_wrappers.py deleted file mode 100644 index a25d16861..000000000 --- a/third_party/opentelemetry/instrumentation/openai/shared/image_gen_wrappers.py +++ /dev/null @@ -1,68 +0,0 @@ -import time - -from opentelemetry import context as context_api -from opentelemetry.instrumentation.openai import is_openai_v1 -from opentelemetry.instrumentation.openai.shared import ( - _get_openai_base_url, - metric_shared_attributes, - model_as_dict, -) -from opentelemetry.instrumentation.openai.utils import ( - _with_image_gen_metric_wrapper, -) -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.metrics import Counter, Histogram -from agentops.semconv import SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - - -@_with_image_gen_metric_wrapper -def image_gen_metrics_wrapper( - duration_histogram: Histogram, - exception_counter: Counter, - wrapped, - instance, - args, - kwargs, -): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return wrapped(*args, **kwargs) - - try: - # record time for duration - start_time = time.time() - response = wrapped(*args, **kwargs) - end_time = time.time() - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - duration = end_time - start_time if "start_time" in locals() else 0 - - attributes = { - "error.type": e.__class__.__name__, - } - - if duration > 0 and duration_histogram: - duration_histogram.record(duration, attributes=attributes) - if exception_counter: - exception_counter.add(1, attributes=attributes) - - raise e - - if is_openai_v1(): - response_dict = model_as_dict(response) - else: - response_dict = response - - # not provide response.model in ImagesResponse response, use model in request kwargs - shared_attributes = metric_shared_attributes( - response_model=kwargs.get("model") or None, - operation="image_gen", - server_address=_get_openai_base_url(instance), - ) - - duration = end_time - start_time - if duration_histogram: - duration_histogram.record(duration, attributes=shared_attributes) - - return response diff --git a/third_party/opentelemetry/instrumentation/openai/utils.py b/third_party/opentelemetry/instrumentation/openai/utils.py deleted file mode 100644 index e9d0436f7..000000000 --- a/third_party/opentelemetry/instrumentation/openai/utils.py +++ /dev/null @@ -1,155 +0,0 @@ -import asyncio -from importlib.metadata import version -from contextlib import asynccontextmanager -import logging -import os -import threading -import traceback - -import openai -from opentelemetry.instrumentation.openai.shared.config import Config - -_OPENAI_VERSION = version("openai") - - -def is_openai_v1(): - return _OPENAI_VERSION >= "1.0.0" - - -def is_azure_openai(instance): - return is_openai_v1() and isinstance(instance._client, (openai.AsyncAzureOpenAI, openai.AzureOpenAI)) - - -def is_metrics_enabled() -> bool: - return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true" - - -def should_record_stream_token_usage(): - return Config.enrich_token_usage - - -def _with_image_gen_metric_wrapper(func): - def _with_metric(duration_histogram, exception_counter): - def wrapper(wrapped, instance, args, kwargs): - return func(duration_histogram, exception_counter, wrapped, instance, args, kwargs) - - return wrapper - - return _with_metric - - -def _with_embeddings_telemetry_wrapper(func): - def _with_embeddings_telemetry( - tracer, - token_counter, - vector_size_counter, - duration_histogram, - exception_counter, - ): - def wrapper(wrapped, instance, args, kwargs): - return func( - tracer, - token_counter, - vector_size_counter, - duration_histogram, - exception_counter, - wrapped, - instance, - args, - kwargs, - ) - - return wrapper - - return _with_embeddings_telemetry - - -def _with_chat_telemetry_wrapper(func): - def _with_chat_telemetry( - tracer, - token_counter, - choice_counter, - duration_histogram, - exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ): - def wrapper(wrapped, instance, args, kwargs): - return func( - tracer, - token_counter, - choice_counter, - duration_histogram, - exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - wrapped, - instance, - args, - kwargs, - ) - - return wrapper - - return _with_chat_telemetry - - -def _with_tracer_wrapper(func): - def _with_tracer(tracer): - def wrapper(wrapped, instance, args, kwargs): - return func(tracer, wrapped, instance, args, kwargs) - - return wrapper - - return _with_tracer - - -@asynccontextmanager -async def start_as_current_span_async(tracer, *args, **kwargs): - with tracer.start_as_current_span(*args, **kwargs) as span: - yield span - - -def dont_throw(func): - """ - A decorator that wraps the passed in function and logs exceptions instead of throwing them. - Works for both synchronous and asynchronous functions. - """ - logger = logging.getLogger(func.__module__) - - async def async_wrapper(*args, **kwargs): - try: - return await func(*args, **kwargs) - except Exception as e: - _handle_exception(e, func, logger) - - def sync_wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - _handle_exception(e, func, logger) - - def _handle_exception(e, func, logger): - logger.debug( - "OpenLLMetry failed to trace in %s, error: %s", - func.__name__, - traceback.format_exc(), - ) - if Config.exception_logger: - Config.exception_logger(e) - - return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper - - -def run_async(method): - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - - if loop and loop.is_running(): - thread = threading.Thread(target=lambda: asyncio.run(method)) - thread.start() - thread.join() - else: - asyncio.run(method) diff --git a/third_party/opentelemetry/instrumentation/openai/v1/__init__.py b/third_party/opentelemetry/instrumentation/openai/v1/__init__.py deleted file mode 100644 index cf38553d5..000000000 --- a/third_party/opentelemetry/instrumentation/openai/v1/__init__.py +++ /dev/null @@ -1,250 +0,0 @@ -from typing import Collection - -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.trace import get_tracer - -from opentelemetry.metrics import get_meter - -from wrapt import wrap_function_wrapper - -from opentelemetry.instrumentation.openai.shared.chat_wrappers import ( - chat_wrapper, - achat_wrapper, -) -from opentelemetry.instrumentation.openai.shared.completion_wrappers import ( - completion_wrapper, - acompletion_wrapper, -) -from opentelemetry.instrumentation.openai.shared.embeddings_wrappers import ( - embeddings_wrapper, - aembeddings_wrapper, -) -from opentelemetry.instrumentation.openai.shared.image_gen_wrappers import ( - image_gen_metrics_wrapper, -) -from opentelemetry.instrumentation.openai.v1.assistant_wrappers import ( - assistants_create_wrapper, - runs_create_wrapper, - runs_retrieve_wrapper, - runs_create_and_stream_wrapper, - messages_list_wrapper, -) - -from opentelemetry.instrumentation.openai.utils import is_metrics_enabled -from opentelemetry.instrumentation.openai.version import __version__ - -from agentops.semconv import Meters - -_instruments = ("openai >= 1.0.0",) - - -class OpenAIV1Instrumentor(BaseInstrumentor): - def instrumentation_dependencies(self) -> Collection[str]: - return _instruments - - def _instrument(self, **kwargs): - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(__name__, __version__, tracer_provider) - - # meter and counters are inited here - meter_provider = kwargs.get("meter_provider") - meter = get_meter(__name__, __version__, meter_provider) - - if is_metrics_enabled(): - tokens_histogram = meter.create_histogram( - name=Meters.LLM_TOKEN_USAGE, - unit="token", - description="Measures number of input and output tokens used", - ) - - chat_choice_counter = meter.create_counter( - name=Meters.LLM_GENERATION_CHOICES, - unit="choice", - description="Number of choices returned by chat completions call", - ) - - duration_histogram = meter.create_histogram( - name=Meters.LLM_OPERATION_DURATION, - unit="s", - description="GenAI operation duration", - ) - - chat_exception_counter = meter.create_counter( - name=Meters.LLM_COMPLETIONS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during chat completions", - ) - - streaming_time_to_first_token = meter.create_histogram( - name=Meters.LLM_STREAMING_TIME_TO_FIRST_TOKEN, - unit="s", - description="Time to first token in streaming chat completions", - ) - streaming_time_to_generate = meter.create_histogram( - name=Meters.LLM_STREAMING_TIME_TO_GENERATE, - unit="s", - description="Time between first token and completion in streaming chat completions", - ) - else: - ( - tokens_histogram, - chat_choice_counter, - duration_histogram, - chat_exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ) = (None, None, None, None, None, None) - - wrap_function_wrapper( - "openai.resources.chat.completions", - "Completions.create", - chat_wrapper( - tracer, - tokens_histogram, - chat_choice_counter, - duration_histogram, - chat_exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ), - ) - - wrap_function_wrapper( - "openai.resources.completions", - "Completions.create", - completion_wrapper(tracer), - ) - - if is_metrics_enabled(): - embeddings_vector_size_counter = meter.create_counter( - name=Meters.LLM_EMBEDDINGS_VECTOR_SIZE, - unit="element", - description="he size of returned vector", - ) - embeddings_exception_counter = meter.create_counter( - name=Meters.LLM_EMBEDDINGS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during embeddings operation", - ) - else: - ( - tokens_histogram, - embeddings_vector_size_counter, - embeddings_exception_counter, - ) = (None, None, None) - - wrap_function_wrapper( - "openai.resources.embeddings", - "Embeddings.create", - embeddings_wrapper( - tracer, - tokens_histogram, - embeddings_vector_size_counter, - duration_histogram, - embeddings_exception_counter, - ), - ) - - wrap_function_wrapper( - "openai.resources.chat.completions", - "AsyncCompletions.create", - achat_wrapper( - tracer, - tokens_histogram, - chat_choice_counter, - duration_histogram, - chat_exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ), - ) - wrap_function_wrapper( - "openai.resources.completions", - "AsyncCompletions.create", - acompletion_wrapper(tracer), - ) - wrap_function_wrapper( - "openai.resources.embeddings", - "AsyncEmbeddings.create", - aembeddings_wrapper( - tracer, - tokens_histogram, - embeddings_vector_size_counter, - duration_histogram, - embeddings_exception_counter, - ), - ) - - if is_metrics_enabled(): - image_gen_exception_counter = meter.create_counter( - name=Meters.LLM_IMAGE_GENERATIONS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during image generations operation", - ) - else: - image_gen_exception_counter = None - - wrap_function_wrapper( - "openai.resources.images", - "Images.generate", - image_gen_metrics_wrapper(duration_histogram, image_gen_exception_counter), - ) - - # Beta APIs may not be available consistently in all versions - try: - wrap_function_wrapper( - "openai.resources.beta.assistants", - "Assistants.create", - assistants_create_wrapper(tracer), - ) - wrap_function_wrapper( - "openai.resources.beta.chat.completions", - "Completions.parse", - chat_wrapper( - tracer, - tokens_histogram, - chat_choice_counter, - duration_histogram, - chat_exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ), - ) - wrap_function_wrapper( - "openai.resources.beta.chat.completions", - "AsyncCompletions.parse", - achat_wrapper( - tracer, - tokens_histogram, - chat_choice_counter, - duration_histogram, - chat_exception_counter, - streaming_time_to_first_token, - streaming_time_to_generate, - ), - ) - wrap_function_wrapper( - "openai.resources.beta.threads.runs", - "Runs.create", - runs_create_wrapper(tracer), - ) - wrap_function_wrapper( - "openai.resources.beta.threads.runs", - "Runs.retrieve", - runs_retrieve_wrapper(tracer), - ) - wrap_function_wrapper( - "openai.resources.beta.threads.runs", - "Runs.create_and_stream", - runs_create_and_stream_wrapper(tracer), - ) - wrap_function_wrapper( - "openai.resources.beta.threads.messages", - "Messages.list", - messages_list_wrapper(tracer), - ) - except (AttributeError, ModuleNotFoundError): - pass - - def _uninstrument(self, **kwargs): - pass diff --git a/third_party/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py b/third_party/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py deleted file mode 100644 index 84c07fdba..000000000 --- a/third_party/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py +++ /dev/null @@ -1,230 +0,0 @@ -import logging -import time -from opentelemetry import context as context_api -from opentelemetry.instrumentation.openai.shared import ( - _set_span_attribute, - model_as_dict, -) -from opentelemetry.trace import SpanKind -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY - -from agentops.semconv import SpanAttributes, LLMRequestTypeValues - -from opentelemetry.instrumentation.openai.utils import _with_tracer_wrapper, dont_throw -from opentelemetry.instrumentation.openai.shared.config import Config - -logger = logging.getLogger(__name__) # noqa - -try: - from openai._legacy_response import LegacyAPIResponse -except (ImportError, ModuleNotFoundError): - # This was removed from the `openai` package at some point - logger.debug("LegacyAPIResponse not found in openai package") - LegacyAPIResponse = None - -try: - from openai.types.beta.threads.run import Run -except (ImportError, ModuleNotFoundError): - logger.debug("Run not found in openai package") - Run = None - - -assistants = {} -runs = {} - - -@_with_tracer_wrapper -def assistants_create_wrapper(tracer, wrapped, instance, args, kwargs): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return wrapped(*args, **kwargs) - - response = wrapped(*args, **kwargs) - - assistants[response.id] = { - "model": kwargs.get("model"), - "instructions": kwargs.get("instructions"), - } - - return response - - -@_with_tracer_wrapper -def runs_create_wrapper(tracer, wrapped, instance, args, kwargs): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return wrapped(*args, **kwargs) - - thread_id = kwargs.get("thread_id") - instructions = kwargs.get("instructions") - - response = wrapped(*args, **kwargs) - response_dict = model_as_dict(response) - - runs[thread_id] = { - "start_time": time.time_ns(), - "assistant_id": kwargs.get("assistant_id"), - "instructions": instructions, - "run_id": response_dict.get("id"), - } - - return response - - -@_with_tracer_wrapper -def runs_retrieve_wrapper(tracer, wrapped, instance, args, kwargs): - @dont_throw - def process_response(response): - if type(response) is LegacyAPIResponse: - parsed_response = response.parse() - else: - parsed_response = response - assert type(parsed_response) is Run - - if parsed_response.thread_id in runs: - thread_id = parsed_response.thread_id - runs[thread_id]["end_time"] = time.time_ns() - if parsed_response.usage: - runs[thread_id]["usage"] = parsed_response.usage - - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return wrapped(*args, **kwargs) - - response = wrapped(*args, **kwargs) - process_response(response) - - return response - - -@_with_tracer_wrapper -def messages_list_wrapper(tracer, wrapped, instance, args, kwargs): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return wrapped(*args, **kwargs) - - id = kwargs.get("thread_id") - - response = wrapped(*args, **kwargs) - - response_dict = model_as_dict(response) - if id not in runs: - return response - - run = runs[id] - messages = sorted(response_dict["data"], key=lambda x: x["created_at"]) - - span = tracer.start_span( - "openai.assistant.run", - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, - start_time=run.get("start_time"), - ) - - i = 0 - if assistants.get(run["assistant_id"]) is not None or Config.enrich_assistant: - if Config.enrich_assistant: - assistant = model_as_dict(instance._client.beta.assistants.retrieve(run["assistant_id"])) - assistants[run["assistant_id"]] = assistant - else: - assistant = assistants[run["assistant_id"]] - - _set_span_attribute( - span, - SpanAttributes.LLM_SYSTEM, - "openai", - ) - _set_span_attribute( - span, - SpanAttributes.LLM_REQUEST_MODEL, - assistant["model"], - ) - _set_span_attribute( - span, - SpanAttributes.LLM_RESPONSE_MODEL, - assistant["model"], - ) - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.role", "system") - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{i}.content", - assistant["instructions"], - ) - i += 1 - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.role", "system") - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.content", run["instructions"]) - - for i, msg in enumerate(messages): - prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{i}" - content = msg.get("content") - - _set_span_attribute(span, f"{prefix}.role", msg.get("role")) - _set_span_attribute(span, f"{prefix}.content", content[0].get("text").get("value")) - _set_span_attribute(span, f"gen_ai.response.{i}.id", msg.get("id")) - - if run.get("usage"): - usage_dict = model_as_dict(run.get("usage")) - _set_span_attribute( - span, - SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, - usage_dict.get("completion_tokens"), - ) - _set_span_attribute( - span, - SpanAttributes.LLM_USAGE_PROMPT_TOKENS, - usage_dict.get("prompt_tokens"), - ) - - span.end(run.get("end_time")) - - return response - - -@_with_tracer_wrapper -def runs_create_and_stream_wrapper(tracer, wrapped, instance, args, kwargs): - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): - return wrapped(*args, **kwargs) - - assistant_id = kwargs.get("assistant_id") - instructions = kwargs.get("instructions") - - span = tracer.start_span( - "openai.assistant.run_stream", - kind=SpanKind.CLIENT, - attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, - ) - - i = 0 - if assistants.get(assistant_id) is not None or Config.enrich_assistant: - if Config.enrich_assistant: - assistant = model_as_dict(instance._client.beta.assistants.retrieve(assistant_id)) - assistants[assistant_id] = assistant - else: - assistant = assistants[assistant_id] - - _set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, assistants[assistant_id]["model"]) - _set_span_attribute( - span, - SpanAttributes.LLM_SYSTEM, - "openai", - ) - _set_span_attribute( - span, - SpanAttributes.LLM_RESPONSE_MODEL, - assistants[assistant_id]["model"], - ) - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.role", "system") - _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{i}.content", - assistants[assistant_id]["instructions"], - ) - i += 1 - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.role", "system") - _set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.{i}.content", instructions) - - from opentelemetry.instrumentation.openai.v1.event_handler_wrapper import ( - EventHandleWrapper, - ) - - kwargs["event_handler"] = EventHandleWrapper(original_handler=kwargs["event_handler"], span=span) - - response = wrapped(*args, **kwargs) - - return response diff --git a/third_party/opentelemetry/instrumentation/openai/v1/event_handler_wrapper.py b/third_party/opentelemetry/instrumentation/openai/v1/event_handler_wrapper.py deleted file mode 100644 index 91c4bc438..000000000 --- a/third_party/opentelemetry/instrumentation/openai/v1/event_handler_wrapper.py +++ /dev/null @@ -1,115 +0,0 @@ -from opentelemetry.instrumentation.openai.shared import ( - _set_span_attribute, -) -from agentops.semconv import SpanAttributes -from openai import AssistantEventHandler -from typing_extensions import override - - -class EventHandleWrapper(AssistantEventHandler): - _current_text_index = 0 - _prompt_tokens = 0 - _completion_tokens = 0 - - def __init__(self, original_handler, span): - super().__init__() - self._original_handler = original_handler - self._span = span - - @override - def on_end(self): - _set_span_attribute( - self._span, - SpanAttributes.LLM_USAGE_PROMPT_TOKENS, - self._prompt_tokens, - ) - _set_span_attribute( - self._span, - SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, - self._completion_tokens, - ) - self._original_handler.on_end() - self._span.end() - - @override - def on_event(self, event): - self._original_handler.on_event(event) - - @override - def on_run_step_created(self, run_step): - self._original_handler.on_run_step_created(run_step) - - @override - def on_run_step_delta(self, delta, snapshot): - self._original_handler.on_run_step_delta(delta, snapshot) - - @override - def on_run_step_done(self, run_step): - if run_step.usage: - self._prompt_tokens += run_step.usage.prompt_tokens - self._completion_tokens += run_step.usage.completion_tokens - self._original_handler.on_run_step_done(run_step) - - @override - def on_tool_call_created(self, tool_call): - self._original_handler.on_tool_call_created(tool_call) - - @override - def on_tool_call_delta(self, delta, snapshot): - self._original_handler.on_tool_call_delta(delta, snapshot) - - @override - def on_tool_call_done(self, tool_call): - self._original_handler.on_tool_call_done(tool_call) - - @override - def on_exception(self, exception: Exception): - self._original_handler.on_exception(exception) - - @override - def on_timeout(self): - self._original_handler.on_timeout() - - @override - def on_message_created(self, message): - self._original_handler.on_message_created(message) - - @override - def on_message_delta(self, delta, snapshot): - self._original_handler.on_message_delta(delta, snapshot) - - @override - def on_message_done(self, message): - _set_span_attribute( - self._span, - f"gen_ai.response.{self._current_text_index}.id", - message.id, - ) - self._original_handler.on_message_done(message) - self._current_text_index += 1 - - @override - def on_text_created(self, text): - self._original_handler.on_text_created(text) - - @override - def on_text_delta(self, delta, snapshot): - self._original_handler.on_text_delta(delta, snapshot) - - @override - def on_text_done(self, text): - self._original_handler.on_text_done(text) - _set_span_attribute( - self._span, - f"{SpanAttributes.LLM_COMPLETIONS}.{self._current_text_index}.role", - "assistant", - ) - _set_span_attribute( - self._span, - f"{SpanAttributes.LLM_COMPLETIONS}.{self._current_text_index}.content", - text.value, - ) - - @override - def on_image_file_done(self, image_file): - self._original_handler.on_image_file_done(image_file) diff --git a/third_party/opentelemetry/instrumentation/openai/version.py b/third_party/opentelemetry/instrumentation/openai/version.py deleted file mode 100644 index b997ca922..000000000 --- a/third_party/opentelemetry/instrumentation/openai/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "0.38.5"