2222
2323from wrapt import register_post_import_hook , wrap_function_wrapper
2424
25+ from opentelemetry ._events import get_event_logger
2526from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
2627from opentelemetry .instrumentation .utils import unwrap
2728from opentelemetry .instrumentation .openai .environment_variables import (
2829 ELASTIC_OTEL_GENAI_CAPTURE_CONTENT ,
30+ ELASTIC_OTEL_GENAI_EVENTS ,
2931)
3032from opentelemetry .instrumentation .openai .helpers import (
3133 _get_embeddings_span_attributes_from_wrapper ,
34+ _get_event_attributes ,
3235 _get_span_attributes_from_wrapper ,
3336 _message_from_choice ,
3437 _record_token_usage_metrics ,
3538 _record_operation_duration_metric ,
39+ _send_log_events_from_messages ,
40+ _send_log_events_from_choices ,
3641 _set_span_attributes_from_response ,
3742 _set_embeddings_span_attributes_from_response ,
3843 _span_name_from_span_attributes ,
@@ -72,14 +77,36 @@ def _instrument(self, **kwargs):
7277 **kwargs: Optional arguments
7378 ``tracer_provider``: a TracerProvider, defaults to global
7479 ``meter_provider``: a MeterProvider, defaults to global
80+ ``event_logger_provider``: a EventLoggerProvider, defaults to global
7581 ``capture_content``: to enable content capturing, defaults to False
7682 """
7783 capture_content = "true" if kwargs .get ("capture_content" ) else "false"
7884 self .capture_content = os .environ .get (ELASTIC_OTEL_GENAI_CAPTURE_CONTENT , capture_content ).lower () == "true"
85+
86+ # we support 3 values for deciding how to send events:
87+ # - "latest" to match latest semconv, as 1.27.0 it's span
88+ # - "log" to send log events
89+ # - "span" to send span events (default)
90+ genai_events = os .environ .get (ELASTIC_OTEL_GENAI_EVENTS , "latest" ).lower ()
91+ self .event_kind = "log" if genai_events == "log" else "span"
92+
7993 tracer_provider = kwargs .get ("tracer_provider" )
80- self .tracer = get_tracer (__name__ , __version__ , tracer_provider , schema_url = Schemas .V1_27_0 .value )
94+ self .tracer = get_tracer (
95+ __name__ ,
96+ __version__ ,
97+ tracer_provider ,
98+ schema_url = Schemas .V1_27_0 .value ,
99+ )
81100 meter_provider = kwargs .get ("meter_provider" )
82- self .meter = get_meter (__name__ , __version__ , meter_provider , schema_url = Schemas .V1_27_0 .value )
101+ self .meter = get_meter (
102+ __name__ ,
103+ __version__ ,
104+ meter_provider ,
105+ schema_url = Schemas .V1_27_0 .value ,
106+ )
107+ event_logger_provider = kwargs .get ("event_logger_provider" )
108+ self .event_logger = get_event_logger (__name__ , event_logger_provider )
109+
83110 self .token_usage_metric = create_gen_ai_client_token_usage (self .meter )
84111 self .operation_duration_metric = create_gen_ai_client_operation_duration (self .meter )
85112
@@ -121,6 +148,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
121148 logger .debug (f"openai.resources.chat.completions.Completions.create kwargs: { kwargs } " )
122149
123150 span_attributes = _get_span_attributes_from_wrapper (instance , kwargs )
151+ event_attributes = _get_event_attributes ()
124152
125153 span_name = _span_name_from_span_attributes (span_attributes )
126154 with self .tracer .start_as_current_span (
@@ -130,13 +158,17 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
130158 # this is important to avoid having the span closed before ending the stream
131159 end_on_exit = False ,
132160 ) as span :
161+ # TODO: more fine grained depending on the message.role?
133162 if self .capture_content :
134163 messages = kwargs .get ("messages" , [])
135- prompt = [message for message in messages ]
136- try :
137- span .add_event (EVENT_GEN_AI_CONTENT_PROMPT , attributes = {GEN_AI_PROMPT : json .dumps (prompt )})
138- except TypeError :
139- logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_PROMPT } " )
164+
165+ if self .event_kind == "log" :
166+ _send_log_events_from_messages (self .event_logger , messages = messages , attributes = event_attributes )
167+ else :
168+ try :
169+ span .add_event (EVENT_GEN_AI_CONTENT_PROMPT , attributes = {GEN_AI_PROMPT : json .dumps (messages )})
170+ except TypeError :
171+ logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_PROMPT } " )
140172
141173 start_time = default_timer ()
142174 try :
@@ -153,6 +185,9 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
153185 stream = result ,
154186 span = span ,
155187 capture_content = self .capture_content ,
188+ event_kind = self .event_kind ,
189+ event_attributes = event_attributes ,
190+ event_logger = self .event_logger ,
156191 start_time = start_time ,
157192 token_usage_metric = self .token_usage_metric ,
158193 operation_duration_metric = self .operation_duration_metric ,
@@ -166,14 +201,19 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs):
166201 _record_operation_duration_metric (self .operation_duration_metric , span , start_time )
167202
168203 if self .capture_content :
169- # same format as the prompt
170- completion = [_message_from_choice (choice ) for choice in result .choices ]
171- try :
172- span .add_event (
173- EVENT_GEN_AI_CONTENT_COMPLETION , attributes = {GEN_AI_COMPLETION : json .dumps (completion )}
204+ if self .event_kind == "log" :
205+ _send_log_events_from_choices (
206+ self .event_logger , choices = result .choices , attributes = event_attributes
174207 )
175- except TypeError :
176- logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_COMPLETION } " )
208+ else :
209+ # same format as the prompt
210+ completion = [_message_from_choice (choice ) for choice in result .choices ]
211+ try :
212+ span .add_event (
213+ EVENT_GEN_AI_CONTENT_COMPLETION , attributes = {GEN_AI_COMPLETION : json .dumps (completion )}
214+ )
215+ except TypeError :
216+ logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_COMPLETION } " )
177217
178218 span .end ()
179219
@@ -183,6 +223,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
183223 logger .debug (f"openai.resources.chat.completions.AsyncCompletions.create kwargs: { kwargs } " )
184224
185225 span_attributes = _get_span_attributes_from_wrapper (instance , kwargs )
226+ event_attributes = _get_event_attributes ()
186227
187228 span_name = _span_name_from_span_attributes (span_attributes )
188229 with self .tracer .start_as_current_span (
@@ -194,10 +235,14 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
194235 ) as span :
195236 if self .capture_content :
196237 messages = kwargs .get ("messages" , [])
197- try :
198- span .add_event (EVENT_GEN_AI_CONTENT_PROMPT , attributes = {GEN_AI_PROMPT : json .dumps (messages )})
199- except TypeError :
200- logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_PROMPT } " )
238+
239+ if self .event_kind == "log" :
240+ _send_log_events_from_messages (self .event_logger , messages = messages , attributes = event_attributes )
241+ else :
242+ try :
243+ span .add_event (EVENT_GEN_AI_CONTENT_PROMPT , attributes = {GEN_AI_PROMPT : json .dumps (messages )})
244+ except TypeError :
245+ logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_PROMPT } " )
201246
202247 start_time = default_timer ()
203248 try :
@@ -214,6 +259,9 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
214259 stream = result ,
215260 span = span ,
216261 capture_content = self .capture_content ,
262+ event_kind = self .event_kind ,
263+ event_attributes = event_attributes ,
264+ event_logger = self .event_logger ,
217265 start_time = start_time ,
218266 token_usage_metric = self .token_usage_metric ,
219267 operation_duration_metric = self .operation_duration_metric ,
@@ -227,14 +275,19 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs):
227275 _record_operation_duration_metric (self .operation_duration_metric , span , start_time )
228276
229277 if self .capture_content :
230- # same format as the prompt
231- completion = [_message_from_choice (choice ) for choice in result .choices ]
232- try :
233- span .add_event (
234- EVENT_GEN_AI_CONTENT_COMPLETION , attributes = {GEN_AI_COMPLETION : json .dumps (completion )}
278+ if self .event_kind == "log" :
279+ _send_log_events_from_choices (
280+ self .event_logger , choices = result .choices , attributes = event_attributes
235281 )
236- except TypeError :
237- logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_COMPLETION } " )
282+ else :
283+ # same format as the prompt
284+ completion = [_message_from_choice (choice ) for choice in result .choices ]
285+ try :
286+ span .add_event (
287+ EVENT_GEN_AI_CONTENT_COMPLETION , attributes = {GEN_AI_COMPLETION : json .dumps (completion )}
288+ )
289+ except TypeError :
290+ logger .error (f"Failed to serialize { EVENT_GEN_AI_CONTENT_COMPLETION } " )
238291
239292 span .end ()
240293
0 commit comments