Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions examples/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
import openai
import dotenv
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter

dotenv.load_dotenv()

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

exporter = AzureMonitorTraceExporter(
connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
)
span_processor = BatchSpanProcessor(exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

client = openai.AzureOpenAI()

messages = [
{"role": "system", "content": "Don't make assumptions about what values to plug into tools. Ask for clarification if a user request is ambiguous."},
{"role": "user", "content": "What's the weather like today in Seattle?"}
]
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
}
}
]

completion = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
tool_choice="auto",
)
messages.append(completion.choices[0].message)
messages.append(
{
"role": "tool",
"tool_call_id": completion.choices[0].message.tool_calls[0].id,
"content": "{\"temperature\": \"22\", \"unit\": \"celsius\", \"description\": \"Sunny\"}"
}
)
tool_completion = client.chat.completions.create(
model="gpt-4",
messages=messages,
tools=tools,
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ classifiers = [

[project.optional-dependencies]
datalib = ["numpy >= 1", "pandas >= 1.2.3", "pandas-stubs >= 1.1.0.11"]
tracing = ["opentelemetry-sdk", "wrapt"]

[project.urls]
Homepage = "https://github.com/openai/openai-python"
Expand Down
1 change: 1 addition & 0 deletions src/openai/_extras/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .numpy_proxy import numpy as numpy, has_numpy as has_numpy
from .pandas_proxy import pandas as pandas
from .opentelemetry_proxy import opentelemetry as opentelemetry, has_tracing_enabled as has_tracing_enabled
36 changes: 36 additions & 0 deletions src/openai/_extras/opentelemetry_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any
from typing_extensions import override

from .._utils import LazyProxy, coerce_boolean
from ._common import MissingDependencyError, format_instructions

if TYPE_CHECKING:
import opentelemetry.trace as opentelemetry

TRACING_INSTRUCTIONS = format_instructions(library="opentelemetry", extra="tracing")


class OpenTelemetryProxy(LazyProxy[Any]):
@override
def __load__(self) -> Any:
try:
import opentelemetry.trace
except ModuleNotFoundError as err:
raise MissingDependencyError(TRACING_INSTRUCTIONS) from err

return opentelemetry.trace


if not TYPE_CHECKING:
opentelemetry = OpenTelemetryProxy()



def has_tracing_enabled() -> bool:
tracing = os.getenv("OPENAI_TRACE_ENABLED", "")
if coerce_boolean(tracing.lower()):
return True
return False
159 changes: 159 additions & 0 deletions src/openai/_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from __future__ import annotations

import json
import functools
from typing import TypeVar, Callable, Any, Generator, Iterator, Union
from typing_extensions import ParamSpec

from ._extras import opentelemetry as trace, has_tracing_enabled
from .types.chat import ChatCompletion, ChatCompletionChunk
from .types.completion import Completion
from ._streaming import Stream

TracedModels = Union[ChatCompletion, Completion]

_P = ParamSpec("_P")
_R = TypeVar("_R")


def _set_attribute(span: trace.Span, key: str, value: Any) -> None:
if value is not None:
span.set_attribute(key, value)


def _add_request_chat_message_event(span: trace.Span, **kwargs: Any) -> None:
for message in kwargs.get("messages", []):
try:
message = message.to_dict()
except AttributeError:
pass

if message.get("role"):
name = f"gen_ai.{message.get('role')}.message"
span.add_event(
name=name,
attributes={"event.data": json.dumps(message)}
)


def _add_request_chat_attributes(span: trace.Span, **kwargs: Any) -> None:
_set_attribute(span, "gen_ai.system", "openai")
_set_attribute(span, "gen_ai.request.model", kwargs.get("model"))
_set_attribute(span, "gen_ai.request.max_tokens", kwargs.get("max_tokens"))
_set_attribute(span, "gen_ai.request.temperature", kwargs.get("temperature"))
_set_attribute(span, "gen_ai.request.top_p", kwargs.get("top_p"))


def _add_response_chat_message_event(span: trace.Span, result: ChatCompletion) -> None:
for choice in result.choices:
response: dict[str, Any] = {
"message.role": choice.message.role,
"message.content": choice.message.content,
"finish_reason": choice.finish_reason,
"index": choice.index,
}
if choice.message.tool_calls:
response["message.tool_calls"] = [tool.to_dict() for tool in choice.message.tool_calls]
span.add_event(name="gen_ai.response.message", attributes={"event.data": json.dumps(response)})


def _add_response_chat_attributes(span: trace.Span, result: ChatCompletion) -> None:
_set_attribute(span, "gen_ai.response.id", result.id)
_set_attribute(span, "gen_ai.response.model", result.model)
_set_attribute(span, "gen_ai.response.finish_reason", result.choices[0].finish_reason)
if hasattr(result, "usage"):
_set_attribute(span, "gen_ai.usage.completion_tokens", result.usage.completion_tokens if result.usage else None)
_set_attribute(span, "gen_ai.usage.prompt_tokens", result.usage.prompt_tokens if result.usage else None)


def _traceable_stream(stream_obj: Stream[ChatCompletionChunk], span: trace.Span) -> Generator[ChatCompletionChunk, None, None]:
try:
accumulate: dict[str, Any] = {"role": ""}
for chunk in stream_obj:
for item in chunk.choices:
if item.finish_reason:
accumulate["finish_reason"] = item.finish_reason
if item.index:
accumulate["index"] = item.index
if item.delta.role:
accumulate["role"] = item.delta.role
if item.delta.content:
accumulate.setdefault("content", "")
accumulate["content"] += item.delta.content
if item.delta.tool_calls:
accumulate.setdefault("tool_calls", [])
for tool_call in item.delta.tool_calls:
if tool_call.id:
accumulate["tool_calls"].append({"id": tool_call.id, "type": "", "function": {"name": "", "arguments": ""}})
if tool_call.type:
accumulate["tool_calls"][-1]["type"] = tool_call.type
if tool_call.function and tool_call.function.name:
accumulate["tool_calls"][-1]["function"]["name"] = tool_call.function.name
if tool_call.function and tool_call.function.arguments:
accumulate["tool_calls"][-1]["function"]["arguments"] += tool_call.function.arguments
yield chunk

span.add_event(name="gen_ai.response.message", attributes={"event.data": json.dumps(accumulate)})
_add_response_chat_attributes(span, chunk)

finally:
span.end()


def _wrapped_stream(stream_obj: Stream[ChatCompletionChunk], span: trace.Span) -> Stream[ChatCompletionChunk]:
import wrapt

class StreamWrapper(wrapt.ObjectProxy):
def __iter__(self) -> Iterator[ChatCompletionChunk]:
return _traceable_stream(stream_obj, span)

return StreamWrapper(stream_obj)


def _add_request_span_attributes(span: trace.Span, span_name: str, kwargs: Any) -> None:
if span_name.startswith("chat.completions.create"):
_add_request_chat_attributes(span, **kwargs)
_add_request_chat_message_event(span, **kwargs)
# TODO add more models here


def _add_response_span_attributes(span: trace.Span, result: TracedModels) -> None:
if result.object == "chat.completion":
_add_response_chat_attributes(span, result)
_add_response_chat_message_event(span, result)
# TODO add more models here


def traceable(
*, span_name: str
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
if has_tracing_enabled():
tracer = trace.get_tracer(__name__)

def wrapper(func: Callable[_P, _R]) -> Callable[_P, _R]:
@functools.wraps(func)
def inner(*args: _P.args, **kwargs: _P.kwargs) -> _R:
if not has_tracing_enabled():
return func(*args, **kwargs)

span = tracer.start_span(span_name, kind=trace.SpanKind.CLIENT)
try:
_add_request_span_attributes(span, span_name, kwargs)

result = func(*args, **kwargs)

if hasattr(result, "__stream__"):
return _wrapped_stream(result, span)

_add_response_span_attributes(span, result)

except Exception:
span.end()
raise

span.end()
return result

return inner

return wrapper
2 changes: 2 additions & 0 deletions src/openai/resources/chat/completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ..._base_client import (
make_request_options,
)
from ..._tracing import traceable

__all__ = ["Completions", "AsyncCompletions"]

Expand Down Expand Up @@ -547,6 +548,7 @@ def create(
"""
...

@traceable(span_name="chat.completions.create")
@required_args(["messages", "model"], ["messages", "model", "stream"])
def create(
self,
Expand Down