Skip to content

Commit 1941391

Browse files
committed
elastic-opentelemetry-instrumentation-openai: implement log events support
1 parent 11c0a50 commit 1941391

File tree

33 files changed

+5071
-170
lines changed

33 files changed

+5071
-170
lines changed

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,22 @@
2222

2323
from wrapt import register_post_import_hook, wrap_function_wrapper
2424

25+
from opentelemetry._events import get_event_logger
2526
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
2627
from opentelemetry.instrumentation.utils import unwrap
2728
from opentelemetry.instrumentation.openai.environment_variables import (
2829
ELASTIC_OTEL_GENAI_CAPTURE_CONTENT,
30+
ELASTIC_OTEL_GENAI_EVENTS,
2931
)
3032
from opentelemetry.instrumentation.openai.helpers import (
3133
_get_embeddings_span_attributes_from_wrapper,
34+
_get_event_attributes,
3235
_get_span_attributes_from_wrapper,
3336
_message_from_choice,
3437
_record_token_usage_metrics,
3538
_record_operation_duration_metric,
39+
_send_log_events_from_messages,
40+
_send_log_events_from_choices,
3641
_set_span_attributes_from_response,
3742
_set_embeddings_span_attributes_from_response,
3843
_span_name_from_span_attributes,
@@ -72,14 +77,36 @@ def _instrument(self, **kwargs):
7277
**kwargs: Optional arguments
7378
``tracer_provider``: a TracerProvider, defaults to global
7479
``meter_provider``: a MeterProvider, defaults to global
80+
``event_logger_provider``: a EventLoggerProvider, defaults to global
7581
``capture_content``: to enable content capturing, defaults to False
7682
"""
7783
capture_content = "true" if kwargs.get("capture_content") else "false"
7884
self.capture_content = os.environ.get(ELASTIC_OTEL_GENAI_CAPTURE_CONTENT, capture_content).lower() == "true"
85+
86+
# we support 3 values for deciding how to send events:
87+
# - "latest" to match latest semconv, as 1.27.0 it's span
88+
# - "log" to send log events
89+
# - "span" to send span events (default)
90+
genai_events = os.environ.get(ELASTIC_OTEL_GENAI_EVENTS, "latest").lower()
91+
self.event_kind = "log" if genai_events == "log" else "span"
92+
7993
tracer_provider = kwargs.get("tracer_provider")
80-
self.tracer = get_tracer(__name__, __version__, tracer_provider, schema_url=Schemas.V1_27_0.value)
94+
self.tracer = get_tracer(
95+
__name__,
96+
__version__,
97+
tracer_provider,
98+
schema_url=Schemas.V1_27_0.value,
99+
)
81100
meter_provider = kwargs.get("meter_provider")
82-
self.meter = get_meter(__name__, __version__, meter_provider, schema_url=Schemas.V1_27_0.value)
101+
self.meter = get_meter(
102+
__name__,
103+
__version__,
104+
meter_provider,
105+
schema_url=Schemas.V1_27_0.value,
106+
)
107+
event_logger_provider = kwargs.get("event_logger_provider")
108+
self.event_logger = get_event_logger(__name__, event_logger_provider)
109+
83110
self.token_usage_metric = create_gen_ai_client_token_usage(self.meter)
84111
self.operation_duration_metric = create_gen_ai_client_operation_duration(self.meter)
85112

@@ -121,6 +148,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
121148
logger.debug(f"openai.resources.chat.completions.Completions.create kwargs: {kwargs}")
122149

123150
span_attributes = _get_span_attributes_from_wrapper(instance, kwargs)
151+
event_attributes = _get_event_attributes()
124152

125153
span_name = _span_name_from_span_attributes(span_attributes)
126154
with self.tracer.start_as_current_span(
@@ -130,13 +158,17 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
130158
# this is important to avoid having the span closed before ending the stream
131159
end_on_exit=False,
132160
) as span:
161+
# TODO: more fine grained depending on the message.role?
133162
if self.capture_content:
134163
messages = kwargs.get("messages", [])
135-
prompt = [message for message in messages]
136-
try:
137-
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(prompt)})
138-
except TypeError:
139-
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")
164+
165+
if self.event_kind == "log":
166+
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)
167+
else:
168+
try:
169+
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
170+
except TypeError:
171+
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")
140172

141173
start_time = default_timer()
142174
try:
@@ -153,6 +185,9 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
153185
stream=result,
154186
span=span,
155187
capture_content=self.capture_content,
188+
event_kind=self.event_kind,
189+
event_attributes=event_attributes,
190+
event_logger=self.event_logger,
156191
start_time=start_time,
157192
token_usage_metric=self.token_usage_metric,
158193
operation_duration_metric=self.operation_duration_metric,
@@ -166,14 +201,19 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
166201
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
167202

168203
if self.capture_content:
169-
# same format as the prompt
170-
completion = [_message_from_choice(choice) for choice in result.choices]
171-
try:
172-
span.add_event(
173-
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
204+
if self.event_kind == "log":
205+
_send_log_events_from_choices(
206+
self.event_logger, choices=result.choices, attributes=event_attributes
174207
)
175-
except TypeError:
176-
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
208+
else:
209+
# same format as the prompt
210+
completion = [_message_from_choice(choice) for choice in result.choices]
211+
try:
212+
span.add_event(
213+
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
214+
)
215+
except TypeError:
216+
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
177217

178218
span.end()
179219

@@ -183,6 +223,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
183223
logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create kwargs: {kwargs}")
184224

185225
span_attributes = _get_span_attributes_from_wrapper(instance, kwargs)
226+
event_attributes = _get_event_attributes()
186227

187228
span_name = _span_name_from_span_attributes(span_attributes)
188229
with self.tracer.start_as_current_span(
@@ -194,10 +235,14 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
194235
) as span:
195236
if self.capture_content:
196237
messages = kwargs.get("messages", [])
197-
try:
198-
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
199-
except TypeError:
200-
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")
238+
239+
if self.event_kind == "log":
240+
_send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes)
241+
else:
242+
try:
243+
span.add_event(EVENT_GEN_AI_CONTENT_PROMPT, attributes={GEN_AI_PROMPT: json.dumps(messages)})
244+
except TypeError:
245+
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_PROMPT}")
201246

202247
start_time = default_timer()
203248
try:
@@ -214,6 +259,9 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
214259
stream=result,
215260
span=span,
216261
capture_content=self.capture_content,
262+
event_kind=self.event_kind,
263+
event_attributes=event_attributes,
264+
event_logger=self.event_logger,
217265
start_time=start_time,
218266
token_usage_metric=self.token_usage_metric,
219267
operation_duration_metric=self.operation_duration_metric,
@@ -227,14 +275,19 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
227275
_record_operation_duration_metric(self.operation_duration_metric, span, start_time)
228276

229277
if self.capture_content:
230-
# same format as the prompt
231-
completion = [_message_from_choice(choice) for choice in result.choices]
232-
try:
233-
span.add_event(
234-
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
278+
if self.event_kind == "log":
279+
_send_log_events_from_choices(
280+
self.event_logger, choices=result.choices, attributes=event_attributes
235281
)
236-
except TypeError:
237-
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
282+
else:
283+
# same format as the prompt
284+
completion = [_message_from_choice(choice) for choice in result.choices]
285+
try:
286+
span.add_event(
287+
EVENT_GEN_AI_CONTENT_COMPLETION, attributes={GEN_AI_COMPLETION: json.dumps(completion)}
288+
)
289+
except TypeError:
290+
logger.error(f"Failed to serialize {EVENT_GEN_AI_CONTENT_COMPLETION}")
238291

239292
span.end()
240293

instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/environment_variables.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@
1515
# limitations under the License.
1616

1717
ELASTIC_OTEL_GENAI_CAPTURE_CONTENT = "ELASTIC_OTEL_GENAI_CAPTURE_CONTENT"
18+
19+
ELASTIC_OTEL_GENAI_EVENTS = "ELASTIC_OTEL_GENAI_EVENTS"

0 commit comments

Comments
 (0)