Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ opentelemetry-instrument python use_openai.py

# You can record more information about prompts as log events by enabling content capture.
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true opentelemetry-instrument python use_openai.py

# You can record more information about prompts as span events by enabling content capture.
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true ELASTIC_OTEL_GENAI_EVENTS=span opentelemetry-instrument python use_openai.py
```

Or manual instrumentation:
Expand All @@ -55,8 +52,7 @@ chat_completion = client.chat.completions.create(model="gpt-4o-mini", messages=m

### Instrumentation specific environment variable configuration

- `ELASTIC_OTEL_GENAI_EVENTS` (default: `span`): when set to `log` exports GenAI events as
log events instead of span events.
None

### Elastic specific semantic conventions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import os
from timeit import default_timer
Expand All @@ -26,14 +25,12 @@
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.openai.environment_variables import (
ELASTIC_OTEL_GENAI_EVENTS,
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT,
)
from opentelemetry.instrumentation.openai.helpers import (
_get_embeddings_span_attributes_from_wrapper,
_get_event_attributes,
_get_span_attributes_from_wrapper,
_message_from_choice,
_record_token_usage_metrics,
_record_operation_duration_metric,
_send_log_events_from_messages,
Expand All @@ -46,10 +43,6 @@
from opentelemetry.instrumentation.openai.version import __version__
from opentelemetry.instrumentation.openai.wrappers import StreamWrapper
from opentelemetry.metrics import get_meter
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import (
GEN_AI_COMPLETION,
GEN_AI_PROMPT,
)
from opentelemetry.semconv._incubating.metrics.gen_ai_metrics import (
create_gen_ai_client_token_usage,
create_gen_ai_client_operation_duration,
Expand Down Expand Up @@ -86,13 +79,6 @@ def _instrument(self, **kwargs):
== "true"
)

# we support 3 values for deciding how to send events:
# - "latest" to match latest semconv, as 1.28.0 it's log
# - "log" to send log events (default)
# - "span" to send span events
genai_events = os.environ.get(ELASTIC_OTEL_GENAI_EVENTS, "latest").lower()
self.event_kind = "span" if genai_events == "span" else "log"

tracer_provider = kwargs.get("tracer_provider")
self.tracer = get_tracer(
__name__,
Expand Down Expand Up @@ -165,13 +151,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
if self.capture_message_content:
messages = kwargs.get("messages", [])

if self.event_kind == "log":
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)
elif span.is_recording():
try:
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)

start_time = default_timer()
try:
Expand All @@ -188,7 +168,6 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
stream=result,
span=span,
capture_message_content=self.capture_message_content,
event_kind=self.event_kind,
event_attributes=event_attributes,
event_logger=self.event_logger,
start_time=start_time,
Expand All @@ -205,19 +184,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)

if self.capture_message_content:
if self.event_kind == "log":
_send_log_events_from_choices(
self.event_logger, choices=result.choices, attributes=event_attributes
)
elif span.is_recording():
# same format as the prompt
completion = [_message_from_choice(choice) for choice in result.choices]
try:
span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
_send_log_events_from_choices(self.event_logger, choices=result.choices, attributes=event_attributes)

span.end()

Expand All @@ -239,14 +206,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
) as span:
if self.capture_message_content:
messages = kwargs.get("messages", [])

if self.event_kind == "log":
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)
elif span.is_recording():
try:
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)

start_time = default_timer()
try:
Expand All @@ -263,7 +223,6 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
stream=result,
span=span,
capture_message_content=self.capture_message_content,
event_kind=self.event_kind,
event_attributes=event_attributes,
event_logger=self.event_logger,
start_time=start_time,
Expand All @@ -280,19 +239,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)

if self.capture_message_content:
if self.event_kind == "log":
_send_log_events_from_choices(
self.event_logger, choices=result.choices, attributes=event_attributes
)
elif span.is_recording():
# same format as the prompt
completion = [_message_from_choice(choice) for choice in result.choices]
try:
span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
_send_log_events_from_choices(self.event_logger, choices=result.choices, attributes=event_attributes)

span.end()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,3 @@
# limitations under the License.

OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"

ELASTIC_OTEL_GENAI_EVENTS = "ELASTIC_OTEL_GENAI_EVENTS"
Original file line number Diff line number Diff line change
Expand Up @@ -79,62 +79,6 @@ def _set_embeddings_span_attributes_from_response(span: Span, model: str, usage:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, usage.prompt_tokens)


def _message_from_choice(choice):
"""Format a choice into a message of the same shape of the prompt"""
if tool_calls := getattr(choice.message, "tool_calls", None):
return {
"role": choice.message.role,
"content": "",
"tool_calls": [
{
"id": tool_call.id,
"type": tool_call.type,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
},
}
for tool_call in tool_calls
],
}
else:
return {"role": choice.message.role, "content": choice.message.content}


def _message_from_stream_choices(choices):
"""Format an iterable of choices into a message of the same shape of the prompt"""
messages = {}
tool_calls = {}
for choice in choices:
messages.setdefault(choice.index, {"role": None, "content": ""})
message = messages[choice.index]
if choice.delta.role:
message["role"] = choice.delta.role
if choice.delta.content:
message["content"] += choice.delta.content

if choice.delta.tool_calls:
for call in choice.delta.tool_calls:
tool_calls.setdefault(choice.index, {})
tool_calls[choice.index].setdefault(call.index, {"function": {"arguments": ""}})
tool_call = tool_calls[choice.index][call.index]
if call.function.arguments:
tool_call["function"]["arguments"] += call.function.arguments
if call.function.name:
tool_call["function"]["name"] = call.function.name
if call.id:
tool_call["id"] = call.id
if call.type:
tool_call["type"] = call.type

for message_index in tool_calls:
message = messages[message_index]
message["tool_calls"] = [arguments for _, arguments in sorted(tool_calls[message_index].items())]

# assumes there's only one message
return [message for _, message in sorted(messages.items())][0]


def _attributes_from_client(client) -> Attributes:
span_attributes = {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
from typing import Literal

from opentelemetry._events import EventLogger
from opentelemetry.instrumentation.openai.helpers import (
_message_from_stream_choices,
_record_token_usage_metrics,
_record_operation_duration_metric,
_set_span_attributes_from_response,
_send_log_events_from_stream_choices,
)
from opentelemetry.metrics import Histogram
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import (
GEN_AI_COMPLETION,
)
from opentelemetry.trace import Span
from opentelemetry.trace.status import StatusCode
from opentelemetry.util.types import Attributes
Expand All @@ -46,7 +40,6 @@ def __init__(
stream,
span: Span,
capture_message_content: bool,
event_kind: Literal["log", "span"],
event_attributes: Attributes,
event_logger: EventLogger,
start_time: float,
Expand All @@ -56,7 +49,6 @@ def __init__(
self.stream = stream
self.span = span
self.capture_message_content = capture_message_content
self.event_kind = event_kind
self.event_attributes = event_attributes
self.event_logger = event_logger
self.token_usage_metric = token_usage_metric
Expand Down Expand Up @@ -85,19 +77,9 @@ def end(self, exc=None):
_record_token_usage_metrics(self.token_usage_metric, self.span, self.usage)

if self.capture_message_content:
if self.event_kind == "log":
_send_log_events_from_stream_choices(
self.event_logger, choices=self.choices, span=self.span, attributes=self.event_attributes
)
elif self.span.is_recording():
# same format as the prompt
completion = [_message_from_stream_choices(self.choices)]
try:
self.span.add_event(
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
)
except TypeError:
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
_send_log_events_from_stream_choices(
self.event_logger, choices=self.choices, span=self.span, attributes=self.event_attributes
)

self.span.end()

Expand Down
Loading