Skip to content
147 changes: 145 additions & 2 deletions agentops/instrumentation/openai/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,124 @@
"""

from typing import List
from opentelemetry.trace import get_tracer
from opentelemetry.trace import get_tracer, SpanKind, Status, StatusCode
from opentelemetry.instrumentation.openai.v1 import OpenAIV1Instrumentor as ThirdPartyOpenAIV1Instrumentor

from agentops.logging import logger
from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap
from agentops.instrumentation.openai import LIBRARY_NAME, LIBRARY_VERSION
from agentops.instrumentation.openai.attributes.common import get_response_attributes
from opentelemetry import context as context_api


def responses_wrapper(tracer, wrapped, instance, args, kwargs):
"""Custom wrapper for OpenAI Responses API that checks for context from OpenAI Agents SDK"""
# Skip instrumentation if it's suppressed in the current context
if context_api.get_value("suppress_instrumentation"):
return wrapped(*args, **kwargs)

Check warning on line 41 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L41

Added line #L41 was not covered by tests

return_value = None

# Check if we have trace context from OpenAI Agents SDK
trace_id = context_api.get_value("openai_agents.trace_id", None)
parent_id = context_api.get_value("openai_agents.parent_id", None)
workflow_input = context_api.get_value("openai_agents.workflow_input", None)

if trace_id:
logger.debug(
f"[OpenAI Instrumentor] Found OpenAI Agents trace context: trace_id={trace_id}, parent_id={parent_id}"
)

with tracer.start_as_current_span(
"openai.responses.create",
kind=SpanKind.CLIENT,
) as span:
try:
attributes = get_response_attributes(args=args, kwargs=kwargs)
for key, value in attributes.items():
span.set_attribute(key, value)

Check warning on line 62 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L62

Added line #L62 was not covered by tests

# If we have trace context from OpenAI Agents SDK, add it as attributes
if trace_id:
span.set_attribute("openai_agents.trace_id", trace_id)
if parent_id:
span.set_attribute("openai_agents.parent_id", parent_id)
if workflow_input:
span.set_attribute("workflow.input", workflow_input)

return_value = wrapped(*args, **kwargs)

attributes = get_response_attributes(return_value=return_value)
for key, value in attributes.items():
span.set_attribute(key, value)

Check warning on line 76 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L76

Added line #L76 was not covered by tests

span.set_status(Status(StatusCode.OK))
except Exception as e:
attributes = get_response_attributes(args=args, kwargs=kwargs, return_value=return_value)
for key, value in attributes.items():
span.set_attribute(key, value)

Check warning on line 82 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L79-L82

Added lines #L79 - L82 were not covered by tests

span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise

Check warning on line 86 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L84-L86

Added lines #L84 - L86 were not covered by tests

return return_value


async def async_responses_wrapper(tracer, wrapped, instance, args, kwargs):
"""Custom async wrapper for OpenAI Responses API that checks for context from OpenAI Agents SDK"""
# Skip instrumentation if it's suppressed in the current context
if context_api.get_value("suppress_instrumentation"):
return await wrapped(*args, **kwargs)

Check warning on line 95 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L95

Added line #L95 was not covered by tests

return_value = None

# Check if we have trace context from OpenAI Agents SDK
trace_id = context_api.get_value("openai_agents.trace_id", None)
parent_id = context_api.get_value("openai_agents.parent_id", None)
workflow_input = context_api.get_value("openai_agents.workflow_input", None)

if trace_id:
logger.debug(
f"[OpenAI Instrumentor] Found OpenAI Agents trace context in async wrapper: trace_id={trace_id}, parent_id={parent_id}"
)

with tracer.start_as_current_span(
"openai.responses.create",
kind=SpanKind.CLIENT,
) as span:
try:
# Add the input attributes to the span before execution
attributes = get_response_attributes(args=args, kwargs=kwargs)
for key, value in attributes.items():
span.set_attribute(key, value)

Check warning on line 117 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L117

Added line #L117 was not covered by tests

# If we have trace context from OpenAI Agents SDK, add it as attributes
if trace_id:
span.set_attribute("openai_agents.trace_id", trace_id)
if parent_id:
span.set_attribute("openai_agents.parent_id", parent_id)
if workflow_input:
span.set_attribute("workflow.input", workflow_input)

return_value = await wrapped(*args, **kwargs)

attributes = get_response_attributes(return_value=return_value)
for key, value in attributes.items():
span.set_attribute(key, value)

Check warning on line 131 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L131

Added line #L131 was not covered by tests

span.set_status(Status(StatusCode.OK))
except Exception as e:

Check warning on line 134 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L134

Added line #L134 was not covered by tests
# Add everything we have in the case of an error
attributes = get_response_attributes(args=args, kwargs=kwargs, return_value=return_value)
for key, value in attributes.items():
span.set_attribute(key, value)

Check warning on line 138 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L136-L138

Added lines #L136 - L138 were not covered by tests

span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise

Check warning on line 142 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L140-L142

Added lines #L140 - L142 were not covered by tests

return return_value


# Methods to wrap beyond what the third-party instrumentation handles
Expand Down Expand Up @@ -77,10 +188,31 @@
for wrap_config in WRAPPED_METHODS:
try:
wrap(wrap_config, tracer)
logger.debug(f"Successfully wrapped {wrap_config}")
logger.debug(f"Successfully wrapped {wrap_config} with standard wrapper")
except (AttributeError, ModuleNotFoundError) as e:
logger.debug(f"Failed to wrap {wrap_config}: {e}")

from wrapt import wrap_function_wrapper

try:
wrap_function_wrapper(
"openai.resources.responses",
"Responses.create",
lambda wrapped, instance, args, kwargs: responses_wrapper(tracer, wrapped, instance, args, kwargs),
)
logger.debug("Successfully wrapped Responses.create with custom wrapper")

wrap_function_wrapper(
"openai.resources.responses",
"AsyncResponses.create",
lambda wrapped, instance, args, kwargs: async_responses_wrapper(
tracer, wrapped, instance, args, kwargs
),
)
logger.debug("Successfully wrapped AsyncResponses.create with custom wrapper")
except (AttributeError, ModuleNotFoundError) as e:
logger.debug(f"Failed to wrap Responses API with custom wrapper: {e}")

Check warning on line 214 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L213-L214

Added lines #L213 - L214 were not covered by tests

logger.debug("Successfully instrumented OpenAI API with Response extensions")

def _uninstrument(self, **kwargs):
Expand All @@ -94,4 +226,15 @@
except Exception as e:
logger.debug(f"Failed to unwrap {wrap_config}: {e}")

from opentelemetry.instrumentation.utils import unwrap as _unwrap

try:
_unwrap("openai.resources.responses.Responses", "create")
logger.debug("Successfully unwrapped Responses.create custom wrapper")

_unwrap("openai.resources.responses.AsyncResponses", "create")
logger.debug("Successfully unwrapped AsyncResponses.create custom wrapper")
except Exception as e:
logger.debug(f"Failed to unwrap Responses API custom wrapper: {e}")

Check warning on line 238 in agentops/instrumentation/openai/instrumentor.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai/instrumentor.py#L237-L238

Added lines #L237 - L238 were not covered by tests

logger.debug("Successfully removed OpenAI API instrumentation with Response extensions")
25 changes: 25 additions & 0 deletions agentops/instrumentation/openai_agents/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,31 @@
trace_id = getattr(span, "trace_id", "unknown")
parent_id = getattr(span, "parent_id", None)

# Special handling for ResponseSpanData to avoid duplicate spans
# and ensure proper trace hierarchy
if span_type == "ResponseSpanData":
logger.debug(

Check warning on line 309 in agentops/instrumentation/openai_agents/exporter.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai_agents/exporter.py#L309

Added line #L309 was not covered by tests
"[agentops.instrumentation.openai_agents] Processing ResponseSpanData for trace context propagation"
)

# Store the trace context information in a global context that can be accessed
# by the OpenAI instrumentation when it creates spans
ctx = context_api.get_current()

Check warning on line 315 in agentops/instrumentation/openai_agents/exporter.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai_agents/exporter.py#L315

Added line #L315 was not covered by tests

# Store the OpenAI Agents trace context in the current context
ctx = context_api.set_value("openai_agents.trace_id", trace_id, ctx)
ctx = context_api.set_value("openai_agents.span_id", span_id, ctx)
ctx = context_api.set_value("openai_agents.parent_id", parent_id, ctx)

Check warning on line 320 in agentops/instrumentation/openai_agents/exporter.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai_agents/exporter.py#L318-L320

Added lines #L318 - L320 were not covered by tests

if hasattr(span_data, "input") and span_data.input:
ctx = context_api.set_value("openai_agents.workflow_input", str(span_data.input), ctx)
context_api.attach(ctx)

Check warning on line 324 in agentops/instrumentation/openai_agents/exporter.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai_agents/exporter.py#L322-L324

Added lines #L322 - L324 were not covered by tests

logger.debug(

Check warning on line 326 in agentops/instrumentation/openai_agents/exporter.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai_agents/exporter.py#L326

Added line #L326 was not covered by tests
f"[agentops.instrumentation.openai_agents] Propagated trace context: trace_id={trace_id}, parent_id={parent_id}"
)
return

Check warning on line 329 in agentops/instrumentation/openai_agents/exporter.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/openai_agents/exporter.py#L329

Added line #L329 was not covered by tests

# Check if this is a span end event
is_end_event = hasattr(span, "status") and span.status == StatusCode.OK.name

Expand Down
181 changes: 181 additions & 0 deletions tests/unit/instrumentation/openai_core/test_custom_wrappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
Tests for OpenAI API Custom Wrappers

This module contains tests for the custom wrappers used in the OpenAI instrumentor.
It verifies that our custom wrappers correctly handle context from OpenAI Agents SDK
and set the appropriate attributes on spans.
"""

import pytest
from unittest.mock import MagicMock, patch

from opentelemetry import context as context_api
from opentelemetry.trace import SpanKind, StatusCode

from agentops.instrumentation.openai.instrumentor import (
OpenAIInstrumentor,
responses_wrapper,
async_responses_wrapper,
)


class TestOpenAICustomWrappers:
"""Tests for OpenAI API custom wrappers"""

@pytest.fixture
def mock_tracer(self):
"""Set up a mock tracer for testing"""
mock_tracer = MagicMock()
mock_span = MagicMock()
mock_tracer.start_as_current_span.return_value.__enter__.return_value = mock_span
return mock_tracer, mock_span

@pytest.fixture
def mock_context(self):
"""Set up a mock context with OpenAI Agents SDK trace information"""
# Mock the context_api.get_value method to return our test values
with patch.object(context_api, "get_value") as mock_get_value:
# Set up the mock to return different values based on the key
def side_effect(key, default=None, context=None):
if key == "openai_agents.trace_id":
return "test-trace-id"
elif key == "openai_agents.parent_id":
return "test-parent-id"
elif key == "openai_agents.workflow_input":
return "test workflow input"
elif key == "suppress_instrumentation":
return False
return default

mock_get_value.side_effect = side_effect
yield mock_get_value

def test_responses_wrapper_with_context(self, mock_tracer, mock_context):
"""Test that the responses_wrapper correctly handles context from OpenAI Agents SDK"""
mock_tracer, mock_span = mock_tracer

# Create a mock wrapped function
mock_wrapped = MagicMock()
mock_wrapped.return_value = {"id": "test-response-id", "model": "test-model"}

# Set up mock get_response_attributes to return empty dict
with patch("agentops.instrumentation.openai.instrumentor.get_response_attributes", return_value={}):
# Call the wrapper
result = responses_wrapper(mock_tracer, mock_wrapped, None, [], {})

# Verify the wrapped function was called
assert mock_wrapped.called

# Verify the span was created with the correct name and kind
mock_tracer.start_as_current_span.assert_called_once_with(
"openai.responses.create",
kind=SpanKind.CLIENT,
)

# Verify the context attributes were set on the span
mock_span.set_attribute.assert_any_call("openai_agents.trace_id", "test-trace-id")
mock_span.set_attribute.assert_any_call("openai_agents.parent_id", "test-parent-id")
mock_span.set_attribute.assert_any_call("workflow.input", "test workflow input")

# Verify the status was set to OK
# Use assert_called to check that set_status was called, then check the status code
assert mock_span.set_status.called, "set_status was not called"
status_arg = mock_span.set_status.call_args[0][0]
assert status_arg.status_code == StatusCode.OK, f"Expected status code OK, got {status_arg.status_code}"

# Verify the result was returned
assert result == {"id": "test-response-id", "model": "test-model"}

@pytest.mark.asyncio
async def test_async_responses_wrapper_with_context(self, mock_tracer, mock_context):
"""Test that the async_responses_wrapper correctly handles context from OpenAI Agents SDK"""
mock_tracer, mock_span = mock_tracer

# Create a mock wrapped function that returns a coroutine
async def mock_async_func(*args, **kwargs):
return {"id": "test-response-id", "model": "test-model"}

mock_wrapped = MagicMock()
mock_wrapped.side_effect = mock_async_func

# Set up mock get_response_attributes to return empty dict
with patch("agentops.instrumentation.openai.instrumentor.get_response_attributes", return_value={}):
# Call the wrapper
result = await async_responses_wrapper(mock_tracer, mock_wrapped, None, [], {})

# Verify the wrapped function was called
assert mock_wrapped.called

# Verify the span was created with the correct name and kind
mock_tracer.start_as_current_span.assert_called_once_with(
"openai.responses.create",
kind=SpanKind.CLIENT,
)

# Verify the context attributes were set on the span
mock_span.set_attribute.assert_any_call("openai_agents.trace_id", "test-trace-id")
mock_span.set_attribute.assert_any_call("openai_agents.parent_id", "test-parent-id")
mock_span.set_attribute.assert_any_call("workflow.input", "test workflow input")

# Verify the status was set to OK
# Use assert_called to check that set_status was called, then check the status code
assert mock_span.set_status.called, "set_status was not called"
status_arg = mock_span.set_status.call_args[0][0]
assert status_arg.status_code == StatusCode.OK, f"Expected status code OK, got {status_arg.status_code}"

# Verify the result was returned
assert result == {"id": "test-response-id", "model": "test-model"}

def test_instrumentor_uses_custom_wrappers(self):
"""Test that the instrumentor uses the custom wrappers"""
# Create instrumentor
instrumentor = OpenAIInstrumentor()

# Mock wrap_function_wrapper
with patch("wrapt.wrap_function_wrapper") as mock_wrap_function_wrapper:
# Mock wrap to avoid errors
with patch("agentops.instrumentation.openai.instrumentor.wrap"):
# Mock the parent class's _instrument method to do nothing
with patch.object(instrumentor.__class__.__bases__[0], "_instrument"):
# Instrument
instrumentor._instrument(tracer_provider=MagicMock())

# Verify wrap_function_wrapper was called for both methods
assert mock_wrap_function_wrapper.call_count == 2

# Check the first call arguments for Responses.create
first_call_args = mock_wrap_function_wrapper.call_args_list[0]
assert first_call_args[0][0] == "openai.resources.responses"
assert first_call_args[0][1] == "Responses.create"

# Check the second call arguments for AsyncResponses.create
second_call_args = mock_wrap_function_wrapper.call_args_list[1]
assert second_call_args[0][0] == "openai.resources.responses"
assert second_call_args[0][1] == "AsyncResponses.create"

def test_instrumentor_unwraps_custom_wrappers(self):
"""Test that the instrumentor unwraps the custom wrappers"""
# Create instrumentor
instrumentor = OpenAIInstrumentor()

# Mock unwrap
with patch("opentelemetry.instrumentation.utils.unwrap") as mock_unwrap:
# Mock standard unwrap to avoid errors
with patch("agentops.instrumentation.openai.instrumentor.unwrap"):
# Mock the parent class's _uninstrument method to do nothing
with patch.object(instrumentor.__class__.__bases__[0], "_uninstrument"):
# Uninstrument
instrumentor._uninstrument()

# Verify unwrap was called for both methods
assert mock_unwrap.call_count == 2

# Check the first call arguments for Responses.create
first_call_args = mock_unwrap.call_args_list[0]
assert first_call_args[0][0] == "openai.resources.responses.Responses"
assert first_call_args[0][1] == "create"

# Check the second call arguments for AsyncResponses.create
second_call_args = mock_unwrap.call_args_list[1]
assert second_call_args[0][0] == "openai.resources.responses.AsyncResponses"
assert second_call_args[0][1] == "create"
Loading