Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
#
annotated-types==0.7.0
# via pydantic
anyio==4.5.0
anyio==4.6.2.post1
# via
# httpx
# openai
asgiref==3.8.1
# via opentelemetry-test-utils
build==1.2.2
build==1.2.2.post1
# via pip-tools
certifi==2024.8.30
# via
Expand All @@ -32,7 +32,7 @@ exceptiongroup==1.2.2
# pytest
h11==0.14.0
# via httpcore
httpcore==1.0.5
httpcore==1.0.6
# via httpx
httpx==0.27.2
# via openai
Expand All @@ -41,48 +41,52 @@ idna==3.10
# anyio
# httpx
# yarl
importlib-metadata==8.4.0
importlib-metadata==8.5.0
# via opentelemetry-api
iniconfig==2.0.0
# via pytest
jiter==0.5.0
jiter==0.7.1
# via openai
multidict==6.1.0
# via yarl
numpy==2.1.3
# via elastic-opentelemetry-instrumentation-openai (pyproject.toml)
openai==1.50.2
openai==1.54.4
# via elastic-opentelemetry-instrumentation-openai (pyproject.toml)
opentelemetry-api==1.27.0
opentelemetry-api==1.28.1
# via
# elastic-opentelemetry-instrumentation-openai (pyproject.toml)
# opentelemetry-instrumentation
# opentelemetry-sdk
# opentelemetry-semantic-conventions
# opentelemetry-test-utils
opentelemetry-instrumentation==0.48b0
opentelemetry-instrumentation==0.49b1
# via elastic-opentelemetry-instrumentation-openai (pyproject.toml)
opentelemetry-sdk==1.27.0
opentelemetry-sdk==1.28.1
# via opentelemetry-test-utils
opentelemetry-semantic-conventions==0.48b0
opentelemetry-semantic-conventions==0.49b1
# via
# elastic-opentelemetry-instrumentation-openai (pyproject.toml)
# opentelemetry-instrumentation
# opentelemetry-sdk
opentelemetry-test-utils==0.48b0
opentelemetry-test-utils==0.49b1
# via elastic-opentelemetry-instrumentation-openai (pyproject.toml)
packaging==24.1
packaging==24.2
# via
# build
# opentelemetry-instrumentation
# pytest
pip-tools==7.4.1
# via elastic-opentelemetry-instrumentation-openai (pyproject.toml)
pluggy==1.5.0
# via pytest
propcache==0.2.0
# via yarl
pydantic==2.9.2
# via openai
pydantic-core==2.23.4
# via pydantic
pyproject-hooks==1.1.0
pyproject-hooks==1.2.0
# via
# build
# pip-tools
Expand All @@ -102,12 +106,12 @@ sniffio==1.3.1
# anyio
# httpx
# openai
tomli==2.0.1
tomli==2.1.0
# via
# build
# pip-tools
# pytest
tqdm==4.66.5
tqdm==4.67.0
# via openai
typing-extensions==4.12.2
# via
Expand All @@ -118,20 +122,22 @@ typing-extensions==4.12.2
# opentelemetry-sdk
# pydantic
# pydantic-core
vcrpy==6.0.1
urllib3==2.2.3
# via vcrpy
vcrpy==6.0.2
# via
# elastic-opentelemetry-instrumentation-openai (pyproject.toml)
# pytest-vcr
wheel==0.44.0
wheel==0.45.0
# via pip-tools
wrapt==1.16.0
# via
# deprecated
# opentelemetry-instrumentation
# vcrpy
yarl==1.11.1
yarl==1.17.1
# via vcrpy
zipp==3.20.2
zipp==3.21.0
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dependencies = [
# 1.27.0 is required for Events API
"opentelemetry-api >= 1.27.0",
"opentelemetry-instrumentation >= 0.48b0",
"opentelemetry-semantic-conventions >= 0.48b0",
# 1.28.1 is required for Events API/SDK
"opentelemetry-api ~= 1.28.1",
"opentelemetry-instrumentation ~= 0.49b1",
"opentelemetry-semantic-conventions ~= 0.49b1",
]

[project.readme]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@

from wrapt import register_post_import_hook, wrap_function_wrapper

from opentelemetry._events import get_event_logger
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.openai.environment_variables import (
ELASTIC_OTEL_GENAI_CAPTURE_CONTENT,
ELASTIC_OTEL_GENAI_EVENTS,
)
from opentelemetry.instrumentation.openai.helpers import (
_get_embeddings_span_attributes_from_wrapper,
_get_event_attributes,
_get_span_attributes_from_wrapper,
_message_from_choice,
_record_token_usage_metrics,
_record_operation_duration_metric,
_send_log_events_from_messages,
_send_log_events_from_choices,
_set_span_attributes_from_response,
_set_embeddings_span_attributes_from_response,
_span_name_from_span_attributes,
Expand Down Expand Up @@ -72,14 +77,36 @@ def _instrument(self, **kwargs):
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``event_logger_provider``: a EventLoggerProvider, defaults to global
``capture_content``: to enable content capturing, defaults to False
"""
capture_content = "true" if kwargs.get("capture_content") else "false"
self.capture_content = os.environ.get(ELASTIC_OTEL_GENAI_CAPTURE_CONTENT, capture_content).lower() == "true"

# we support 3 values for deciding how to send events:
# - "latest" to match latest semconv, as 1.27.0 it's span
# - "log" to send log events
# - "span" to send span events (default)
genai_events = os.environ.get(ELASTIC_OTEL_GENAI_EVENTS, "latest").lower()
self.event_kind = "log" if genai_events == "log" else "span"

tracer_provider = kwargs.get("tracer_provider")
self.tracer = get_tracer(__name__, __version__, tracer_provider, schema_url=Schemas.V1_27_0.value)
self.tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_27_0.value,
)
meter_provider = kwargs.get("meter_provider")
self.meter = get_meter(__name__, __version__, meter_provider, schema_url=Schemas.V1_27_0.value)
self.meter = get_meter(
__name__,
__version__,
meter_provider,
schema_url=Schemas.V1_27_0.value,
)
event_logger_provider = kwargs.get("event_logger_provider")
self.event_logger = get_event_logger(__name__, event_logger_provider)

self.token_usage_metric = create_gen_ai_client_token_usage(self.meter)
self.operation_duration_metric = create_gen_ai_client_operation_duration(self.meter)

Expand Down Expand Up @@ -121,6 +148,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
logger.debug(f"openai.resources.chat.completions.Completions.create kwargs: {kwargs}")

span_attributes = _get_span_attributes_from_wrapper(instance, kwargs)
event_attributes = _get_event_attributes()

span_name = _span_name_from_span_attributes(span_attributes)
with self.tracer.start_as_current_span(
Expand All @@ -130,13 +158,17 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
# this is important to avoid having the span closed before ending the stream
end_on_exit=False,
) as span:
# TODO: more fine grained depending on the message.role?
if self.capture_content:
messages = kwargs.get("messages", [])
prompt = [message for message in messages]
try:
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(prompt)})
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")

if self.event_kind == "log":
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)
else:
try:
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")

start_time = default_timer()
try:
Expand All @@ -153,6 +185,9 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
stream=result,
span=span,
capture_content=self.capture_content,
event_kind=self.event_kind,
event_attributes=event_attributes,
event_logger=self.event_logger,
start_time=start_time,
token_usage_metric=self.token_usage_metric,
operation_duration_metric=self.operation_duration_metric,
Expand All @@ -166,14 +201,19 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)

if self.capture_content:
# same format as the prompt
completion = [_message_from_choice(choice) for choice in result.choices]
try:
span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
if self.event_kind == "log":
_send_log_events_from_choices(
self.event_logger, choices=result.choices, attributes=event_attributes
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
else:
# same format as the prompt
completion = [_message_from_choice(choice) for choice in result.choices]
try:
span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")

span.end()

Expand All @@ -183,6 +223,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create kwargs: {kwargs}")

span_attributes = _get_span_attributes_from_wrapper(instance, kwargs)
event_attributes = _get_event_attributes()

span_name = _span_name_from_span_attributes(span_attributes)
with self.tracer.start_as_current_span(
Expand All @@ -194,10 +235,14 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
) as span:
if self.capture_content:
messages = kwargs.get("messages", [])
try:
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")

if self.event_kind == "log":
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)
else:
try:
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")

start_time = default_timer()
try:
Expand All @@ -214,6 +259,9 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
stream=result,
span=span,
capture_content=self.capture_content,
event_kind=self.event_kind,
event_attributes=event_attributes,
event_logger=self.event_logger,
start_time=start_time,
token_usage_metric=self.token_usage_metric,
operation_duration_metric=self.operation_duration_metric,
Expand All @@ -227,14 +275,19 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)

if self.capture_content:
# same format as the prompt
completion = [_message_from_choice(choice) for choice in result.choices]
try:
span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
if self.event_kind == "log":
_send_log_events_from_choices(
self.event_logger, choices=result.choices, attributes=event_attributes
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
else:
# same format as the prompt
completion = [_message_from_choice(choice) for choice in result.choices]
try:
span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")

span.end()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@
# limitations under the License.

ELASTIC_OTEL_GENAI_CAPTURE_CONTENT = "ELASTIC_OTEL_GENAI_CAPTURE_CONTENT"

ELASTIC_OTEL_GENAI_EVENTS = "ELASTIC_OTEL_GENAI_EVENTS"
Loading