Skip to content

Commit 38aa72c

Browse files
authored
feat: Add OpenAI streaming support with telemetry instrumentation (#1091)
1 parent 3145b2c commit 38aa72c

File tree

7 files changed

+924
-138
lines changed

7 files changed

+924
-138
lines changed

agentops/instrumentation/providers/openai/instrumentor.py

Lines changed: 68 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
"""
1414

1515
from typing import Dict, Any
16+
from wrapt import wrap_function_wrapper
1617

18+
from opentelemetry.metrics import Meter
19+
20+
from agentops.logging import logger
1721
from agentops.instrumentation.common import (
1822
CommonInstrumentor,
1923
InstrumentorConfig,
@@ -22,11 +26,9 @@
2226
MetricsRecorder,
2327
)
2428
from agentops.instrumentation.providers.openai import LIBRARY_NAME, LIBRARY_VERSION
25-
from agentops.instrumentation.providers.openai.attributes.common import get_response_attributes
2629
from agentops.instrumentation.providers.openai.config import Config
2730
from agentops.instrumentation.providers.openai.utils import is_openai_v1
2831
from agentops.instrumentation.providers.openai.wrappers import (
29-
handle_chat_attributes,
3032
handle_completion_attributes,
3133
handle_embeddings_attributes,
3234
handle_image_gen_attributes,
@@ -36,9 +38,14 @@
3638
handle_run_stream_attributes,
3739
handle_messages_attributes,
3840
)
41+
from agentops.instrumentation.providers.openai.stream_wrapper import (
42+
chat_completion_stream_wrapper,
43+
async_chat_completion_stream_wrapper,
44+
responses_stream_wrapper,
45+
async_responses_stream_wrapper,
46+
)
3947
from agentops.instrumentation.providers.openai.v0 import OpenAIV0Instrumentor
4048
from agentops.semconv import Meters
41-
from opentelemetry.metrics import Meter
4249

4350
_instruments = ("openai >= 0.27.0",)
4451

@@ -82,6 +89,59 @@ def _initialize(self, **kwargs):
8289
# Skip normal instrumentation
8390
self.config.wrapped_methods = []
8491

92+
def _custom_wrap(self, **kwargs):
93+
"""Add custom wrappers for streaming functionality."""
94+
if is_openai_v1() and self._tracer:
95+
# from wrapt import wrap_function_wrapper
96+
# # Add streaming wrappers for v1
97+
try:
98+
# Chat completion streaming wrappers
99+
100+
wrap_function_wrapper(
101+
"openai.resources.chat.completions",
102+
"Completions.create",
103+
chat_completion_stream_wrapper(self._tracer),
104+
)
105+
106+
wrap_function_wrapper(
107+
"openai.resources.chat.completions",
108+
"AsyncCompletions.create",
109+
async_chat_completion_stream_wrapper(self._tracer),
110+
)
111+
112+
# Beta chat completion streaming wrappers
113+
wrap_function_wrapper(
114+
"openai.resources.beta.chat.completions",
115+
"Completions.parse",
116+
chat_completion_stream_wrapper(self._tracer),
117+
)
118+
119+
wrap_function_wrapper(
120+
"openai.resources.beta.chat.completions",
121+
"AsyncCompletions.parse",
122+
async_chat_completion_stream_wrapper(self._tracer),
123+
)
124+
125+
# Responses API streaming wrappers
126+
wrap_function_wrapper(
127+
"openai.resources.responses",
128+
"Responses.create",
129+
responses_stream_wrapper(self._tracer),
130+
)
131+
132+
wrap_function_wrapper(
133+
"openai.resources.responses",
134+
"AsyncResponses.create",
135+
async_responses_stream_wrapper(self._tracer),
136+
)
137+
except Exception as e:
138+
logger.warning(f"[OPENAI INSTRUMENTOR] Error setting up OpenAI streaming wrappers: {e}")
139+
else:
140+
if not is_openai_v1():
141+
logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - not using OpenAI v1")
142+
if not self._tracer:
143+
logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - no tracer available")
144+
85145
def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
86146
"""Create metrics for OpenAI instrumentation."""
87147
metrics = StandardMetrics.create_standard_metrics(meter)
@@ -130,29 +190,12 @@ def _custom_unwrap(self, **kwargs):
130190
OpenAIV0Instrumentor().uninstrument(**kwargs)
131191

132192
def _get_wrapped_methods(self) -> list[WrapConfig]:
133-
"""Get all methods that should be wrapped."""
134-
wrapped_methods = []
193+
"""Get all methods that should be wrapped.
135194
136-
# Chat completions
137-
wrapped_methods.extend(
138-
[
139-
WrapConfig(
140-
trace_name="openai.chat.completion",
141-
package="openai.resources.chat.completions",
142-
class_name="Completions",
143-
method_name="create",
144-
handler=handle_chat_attributes,
145-
),
146-
WrapConfig(
147-
trace_name="openai.chat.completion",
148-
package="openai.resources.chat.completions",
149-
class_name="AsyncCompletions",
150-
method_name="create",
151-
handler=handle_chat_attributes,
152-
is_async=True,
153-
),
154-
]
155-
)
195+
Note: Chat completions and Responses API methods are NOT included here
196+
as they are wrapped directly in _custom_wrap to support streaming.
197+
"""
198+
wrapped_methods = []
156199

157200
# Regular completions
158201
wrapped_methods.extend(
@@ -221,27 +264,6 @@ def _get_wrapped_methods(self) -> list[WrapConfig]:
221264
)
222265
)
223266

224-
# Chat parse methods
225-
beta_methods.extend(
226-
[
227-
WrapConfig(
228-
trace_name="openai.chat.completion",
229-
package="openai.resources.beta.chat.completions",
230-
class_name="Completions",
231-
method_name="parse",
232-
handler=handle_chat_attributes,
233-
),
234-
WrapConfig(
235-
trace_name="openai.chat.completion",
236-
package="openai.resources.beta.chat.completions",
237-
class_name="AsyncCompletions",
238-
method_name="parse",
239-
handler=handle_chat_attributes,
240-
is_async=True,
241-
),
242-
]
243-
)
244-
245267
# Runs
246268
beta_methods.extend(
247269
[
@@ -283,27 +305,6 @@ def _get_wrapped_methods(self) -> list[WrapConfig]:
283305
# Add beta methods to wrapped methods (they might fail)
284306
wrapped_methods.extend(beta_methods)
285307

286-
# Responses API (Agents SDK) - our custom addition
287-
wrapped_methods.extend(
288-
[
289-
WrapConfig(
290-
trace_name="openai.responses.create",
291-
package="openai.resources.responses",
292-
class_name="Responses",
293-
method_name="create",
294-
handler=get_response_attributes,
295-
),
296-
WrapConfig(
297-
trace_name="openai.responses.create",
298-
package="openai.resources.responses",
299-
class_name="AsyncResponses",
300-
method_name="create",
301-
handler=get_response_attributes,
302-
is_async=True,
303-
),
304-
]
305-
)
306-
307308
return wrapped_methods
308309

309310
def get_metrics_recorder(self) -> MetricsRecorder:

0 commit comments

Comments
 (0)