diff --git a/examples/otel.py b/examples/otel.py new file mode 100644 index 0000000000..3f37e6e502 --- /dev/null +++ b/examples/otel.py @@ -0,0 +1,65 @@ +import os +import openai +import dotenv +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter + +dotenv.load_dotenv() + +trace.set_tracer_provider(TracerProvider()) +tracer = trace.get_tracer(__name__) + +exporter = AzureMonitorTraceExporter( + connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"] +) +span_processor = BatchSpanProcessor(exporter) +trace.get_tracer_provider().add_span_processor(span_processor) + +client = openai.AzureOpenAI() + +messages = [ + {"role": "system", "content": "Don't make assumptions about what values to plug into tools. Ask for clarification if a user request is ambiguous."}, + {"role": "user", "content": "What's the weather like today in Seattle?"} +] +tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + } + } +] + +completion = client.chat.completions.create( + model="gpt-4", + messages=messages, + tools=tools, + tool_choice="auto", +) +messages.append(completion.choices[0].message) +messages.append( + { + "role": "tool", + "tool_call_id": completion.choices[0].message.tool_calls[0].id, + "content": "{\"temperature\": \"22\", \"unit\": \"celsius\", \"description\": \"Sunny\"}" + } +) +tool_completion = client.chat.completions.create( + model="gpt-4", + messages=messages, + tools=tools, +) diff --git a/pyproject.toml b/pyproject.toml index 11ab55cbe9..4e90339de8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ classifiers = [ [project.optional-dependencies] datalib = ["numpy >= 1", "pandas >= 1.2.3", "pandas-stubs >= 1.1.0.11"] +tracing = ["opentelemetry-sdk", "wrapt"] [project.urls] Homepage = "https://github.com/openai/openai-python" diff --git a/src/openai/_extras/__init__.py b/src/openai/_extras/__init__.py index 864dac4171..c231408f90 100644 --- a/src/openai/_extras/__init__.py +++ b/src/openai/_extras/__init__.py @@ -1,2 +1,3 @@ from .numpy_proxy import numpy as numpy, has_numpy as has_numpy from .pandas_proxy import pandas as pandas +from .opentelemetry_proxy import opentelemetry as opentelemetry, has_tracing_enabled as has_tracing_enabled diff --git a/src/openai/_extras/opentelemetry_proxy.py b/src/openai/_extras/opentelemetry_proxy.py new file mode 100644 index 0000000000..42f12ac817 --- /dev/null +++ b/src/openai/_extras/opentelemetry_proxy.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any +from typing_extensions import override + +from .._utils import LazyProxy, coerce_boolean +from ._common import MissingDependencyError, format_instructions + +if TYPE_CHECKING: + import opentelemetry.trace as opentelemetry + +TRACING_INSTRUCTIONS = format_instructions(library="opentelemetry", extra="tracing") + + +class OpenTelemetryProxy(LazyProxy[Any]): + @override + def __load__(self) -> Any: + try: + import opentelemetry.trace + except ModuleNotFoundError as err: + raise MissingDependencyError(TRACING_INSTRUCTIONS) from err + + return opentelemetry.trace + + +if not TYPE_CHECKING: + opentelemetry = OpenTelemetryProxy() + + + +def has_tracing_enabled() -> bool: + tracing = os.getenv("OPENAI_TRACE_ENABLED", "") + if coerce_boolean(tracing.lower()): + return True + return False diff --git a/src/openai/_tracing.py b/src/openai/_tracing.py new file mode 100644 index 0000000000..2193dc081b --- /dev/null +++ b/src/openai/_tracing.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import json +import functools +from typing import TypeVar, Callable, Any, Generator, Iterator, Union +from typing_extensions import ParamSpec + +from ._extras import opentelemetry as trace, has_tracing_enabled +from .types.chat import ChatCompletion, ChatCompletionChunk +from .types.completion import Completion +from ._streaming import Stream + +TracedModels = Union[ChatCompletion, Completion] + +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +def _set_attribute(span: trace.Span, key: str, value: Any) -> None: + if value is not None: + span.set_attribute(key, value) + + +def _add_request_chat_message_event(span: trace.Span, **kwargs: Any) -> None: + for message in kwargs.get("messages", []): + try: + message = message.to_dict() + except AttributeError: + pass + + if message.get("role"): + name = f"gen_ai.{message.get('role')}.message" + span.add_event( + name=name, + attributes={"event.data": json.dumps(message)} + ) + + +def _add_request_chat_attributes(span: trace.Span, **kwargs: Any) -> None: + _set_attribute(span, "gen_ai.system", "openai") + _set_attribute(span, "gen_ai.request.model", kwargs.get("model")) + _set_attribute(span, "gen_ai.request.max_tokens", kwargs.get("max_tokens")) + _set_attribute(span, "gen_ai.request.temperature", kwargs.get("temperature")) + _set_attribute(span, "gen_ai.request.top_p", kwargs.get("top_p")) + + +def _add_response_chat_message_event(span: trace.Span, result: ChatCompletion) -> None: + for choice in result.choices: + response: dict[str, Any] = { + "message.role": choice.message.role, + "message.content": choice.message.content, + "finish_reason": choice.finish_reason, + "index": choice.index, + } + if choice.message.tool_calls: + response["message.tool_calls"] = [tool.to_dict() for tool in choice.message.tool_calls] + span.add_event(name="gen_ai.response.message", attributes={"event.data": json.dumps(response)}) + + +def _add_response_chat_attributes(span: trace.Span, result: ChatCompletion) -> None: + _set_attribute(span, "gen_ai.response.id", result.id) + _set_attribute(span, "gen_ai.response.model", result.model) + _set_attribute(span, "gen_ai.response.finish_reason", result.choices[0].finish_reason) + if hasattr(result, "usage"): + _set_attribute(span, "gen_ai.usage.completion_tokens", result.usage.completion_tokens if result.usage else None) + _set_attribute(span, "gen_ai.usage.prompt_tokens", result.usage.prompt_tokens if result.usage else None) + + +def _traceable_stream(stream_obj: Stream[ChatCompletionChunk], span: trace.Span) -> Generator[ChatCompletionChunk, None, None]: + try: + accumulate: dict[str, Any] = {"role": ""} + for chunk in stream_obj: + for item in chunk.choices: + if item.finish_reason: + accumulate["finish_reason"] = item.finish_reason + if item.index: + accumulate["index"] = item.index + if item.delta.role: + accumulate["role"] = item.delta.role + if item.delta.content: + accumulate.setdefault("content", "") + accumulate["content"] += item.delta.content + if item.delta.tool_calls: + accumulate.setdefault("tool_calls", []) + for tool_call in item.delta.tool_calls: + if tool_call.id: + accumulate["tool_calls"].append({"id": tool_call.id, "type": "", "function": {"name": "", "arguments": ""}}) + if tool_call.type: + accumulate["tool_calls"][-1]["type"] = tool_call.type + if tool_call.function and tool_call.function.name: + accumulate["tool_calls"][-1]["function"]["name"] = tool_call.function.name + if tool_call.function and tool_call.function.arguments: + accumulate["tool_calls"][-1]["function"]["arguments"] += tool_call.function.arguments + yield chunk + + span.add_event(name="gen_ai.response.message", attributes={"event.data": json.dumps(accumulate)}) + _add_response_chat_attributes(span, chunk) + + finally: + span.end() + + +def _wrapped_stream(stream_obj: Stream[ChatCompletionChunk], span: trace.Span) -> Stream[ChatCompletionChunk]: + import wrapt + + class StreamWrapper(wrapt.ObjectProxy): + def __iter__(self) -> Iterator[ChatCompletionChunk]: + return _traceable_stream(stream_obj, span) + + return StreamWrapper(stream_obj) + + +def _add_request_span_attributes(span: trace.Span, span_name: str, kwargs: Any) -> None: + if span_name.startswith("chat.completions.create"): + _add_request_chat_attributes(span, **kwargs) + _add_request_chat_message_event(span, **kwargs) + # TODO add more models here + + +def _add_response_span_attributes(span: trace.Span, result: TracedModels) -> None: + if result.object == "chat.completion": + _add_response_chat_attributes(span, result) + _add_response_chat_message_event(span, result) + # TODO add more models here + + +def traceable( + *, span_name: str +) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]: + if has_tracing_enabled(): + tracer = trace.get_tracer(__name__) + + def wrapper(func: Callable[_P, _R]) -> Callable[_P, _R]: + @functools.wraps(func) + def inner(*args: _P.args, **kwargs: _P.kwargs) -> _R: + if not has_tracing_enabled(): + return func(*args, **kwargs) + + span = tracer.start_span(span_name, kind=trace.SpanKind.CLIENT) + try: + _add_request_span_attributes(span, span_name, kwargs) + + result = func(*args, **kwargs) + + if hasattr(result, "__stream__"): + return _wrapped_stream(result, span) + + _add_response_span_attributes(span, result) + + except Exception: + span.end() + raise + + span.end() + return result + + return inner + + return wrapper diff --git a/src/openai/resources/chat/completions.py b/src/openai/resources/chat/completions.py index 3b070b716e..8501218521 100644 --- a/src/openai/resources/chat/completions.py +++ b/src/openai/resources/chat/completions.py @@ -30,6 +30,7 @@ from ..._base_client import ( make_request_options, ) +from ..._tracing import traceable __all__ = ["Completions", "AsyncCompletions"] @@ -547,6 +548,7 @@ def create( """ ... + @traceable(span_name="chat.completions.create") @required_args(["messages", "model"], ["messages", "model", "stream"]) def create( self,