Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 68 additions & 67 deletions agentops/instrumentation/providers/openai/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
"""

from typing import Dict, Any
from wrapt import wrap_function_wrapper

from opentelemetry.metrics import Meter

from agentops.logging import logger
from agentops.instrumentation.common import (
CommonInstrumentor,
InstrumentorConfig,
Expand All @@ -22,11 +26,9 @@
MetricsRecorder,
)
from agentops.instrumentation.providers.openai import LIBRARY_NAME, LIBRARY_VERSION
from agentops.instrumentation.providers.openai.attributes.common import get_response_attributes
from agentops.instrumentation.providers.openai.config import Config
from agentops.instrumentation.providers.openai.utils import is_openai_v1
from agentops.instrumentation.providers.openai.wrappers import (
handle_chat_attributes,
handle_completion_attributes,
handle_embeddings_attributes,
handle_image_gen_attributes,
Expand All @@ -36,9 +38,14 @@
handle_run_stream_attributes,
handle_messages_attributes,
)
from agentops.instrumentation.providers.openai.stream_wrapper import (
chat_completion_stream_wrapper,
async_chat_completion_stream_wrapper,
responses_stream_wrapper,
async_responses_stream_wrapper,
)
from agentops.instrumentation.providers.openai.v0 import OpenAIV0Instrumentor
from agentops.semconv import Meters
from opentelemetry.metrics import Meter

_instruments = ("openai >= 0.27.0",)

Expand Down Expand Up @@ -82,6 +89,59 @@
# Skip normal instrumentation
self.config.wrapped_methods = []

def _custom_wrap(self, **kwargs):
"""Add custom wrappers for streaming functionality."""
if is_openai_v1() and self._tracer:
# from wrapt import wrap_function_wrapper
# # Add streaming wrappers for v1
try:
# Chat completion streaming wrappers

wrap_function_wrapper(
"openai.resources.chat.completions",
"Completions.create",
chat_completion_stream_wrapper(self._tracer),
)

wrap_function_wrapper(
"openai.resources.chat.completions",
"AsyncCompletions.create",
async_chat_completion_stream_wrapper(self._tracer),
)

# Beta chat completion streaming wrappers
wrap_function_wrapper(
"openai.resources.beta.chat.completions",
"Completions.parse",
chat_completion_stream_wrapper(self._tracer),
)

wrap_function_wrapper(
"openai.resources.beta.chat.completions",
"AsyncCompletions.parse",
async_chat_completion_stream_wrapper(self._tracer),
)

# Responses API streaming wrappers
wrap_function_wrapper(
"openai.resources.responses",
"Responses.create",
responses_stream_wrapper(self._tracer),
)

wrap_function_wrapper(
"openai.resources.responses",
"AsyncResponses.create",
async_responses_stream_wrapper(self._tracer),
)
except Exception as e:
logger.warning(f"[OPENAI INSTRUMENTOR] Error setting up OpenAI streaming wrappers: {e}")

Check warning on line 138 in agentops/instrumentation/providers/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/providers/openai/instrumentor.py#L137-L138

Added lines #L137 - L138 were not covered by tests
else:
if not is_openai_v1():
logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - not using OpenAI v1")
if not self._tracer:
logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - no tracer available")

Check warning on line 143 in agentops/instrumentation/providers/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/providers/openai/instrumentor.py#L140-L143

Added lines #L140 - L143 were not covered by tests

def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
"""Create metrics for OpenAI instrumentation."""
metrics = StandardMetrics.create_standard_metrics(meter)
Expand Down Expand Up @@ -130,29 +190,12 @@
OpenAIV0Instrumentor().uninstrument(**kwargs)

def _get_wrapped_methods(self) -> list[WrapConfig]:
"""Get all methods that should be wrapped."""
wrapped_methods = []
"""Get all methods that should be wrapped.

# Chat completions
wrapped_methods.extend(
[
WrapConfig(
trace_name="openai.chat.completion",
package="openai.resources.chat.completions",
class_name="Completions",
method_name="create",
handler=handle_chat_attributes,
),
WrapConfig(
trace_name="openai.chat.completion",
package="openai.resources.chat.completions",
class_name="AsyncCompletions",
method_name="create",
handler=handle_chat_attributes,
is_async=True,
),
]
)
Note: Chat completions and Responses API methods are NOT included here
as they are wrapped directly in _custom_wrap to support streaming.
"""
wrapped_methods = []

# Regular completions
wrapped_methods.extend(
Expand Down Expand Up @@ -221,27 +264,6 @@
)
)

# Chat parse methods
beta_methods.extend(
[
WrapConfig(
trace_name="openai.chat.completion",
package="openai.resources.beta.chat.completions",
class_name="Completions",
method_name="parse",
handler=handle_chat_attributes,
),
WrapConfig(
trace_name="openai.chat.completion",
package="openai.resources.beta.chat.completions",
class_name="AsyncCompletions",
method_name="parse",
handler=handle_chat_attributes,
is_async=True,
),
]
)

# Runs
beta_methods.extend(
[
Expand Down Expand Up @@ -283,27 +305,6 @@
# Add beta methods to wrapped methods (they might fail)
wrapped_methods.extend(beta_methods)

# Responses API (Agents SDK) - our custom addition
wrapped_methods.extend(
[
WrapConfig(
trace_name="openai.responses.create",
package="openai.resources.responses",
class_name="Responses",
method_name="create",
handler=get_response_attributes,
),
WrapConfig(
trace_name="openai.responses.create",
package="openai.resources.responses",
class_name="AsyncResponses",
method_name="create",
handler=get_response_attributes,
is_async=True,
),
]
)

return wrapped_methods

def get_metrics_recorder(self) -> MetricsRecorder:
Expand Down
Loading
Loading