diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py index 02beefb..95d1966 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py @@ -145,11 +145,13 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): # this is important to avoid having the span closed before ending the stream end_on_exit=False, ) as span: - # TODO: more fine grained depending on the message.role? - if self.capture_message_content: - messages = kwargs.get("messages", []) - - _send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes) + messages = kwargs.get("messages", []) + _send_log_events_from_messages( + self.event_logger, + messages=messages, + attributes=event_attributes, + capture_message_content=self.capture_message_content, + ) start_time = default_timer() try: @@ -183,8 +185,12 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_token_usage_metrics(self.token_usage_metric, span, result.usage) _record_operation_duration_metric(self.operation_duration_metric, span, start_time) - if self.capture_message_content: - _send_log_events_from_choices(self.event_logger, choices=result.choices, attributes=event_attributes) + _send_log_events_from_choices( + self.event_logger, + choices=result.choices, + attributes=event_attributes, + capture_message_content=self.capture_message_content, + ) span.end() @@ -204,9 +210,13 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): # this is important to avoid having the span closed before ending the stream end_on_exit=False, ) as span: - if self.capture_message_content: - messages = kwargs.get("messages", []) - _send_log_events_from_messages(self.event_logger, messages=messages, attributes=event_attributes) + messages = kwargs.get("messages", []) + _send_log_events_from_messages( + self.event_logger, + messages=messages, + attributes=event_attributes, + capture_message_content=self.capture_message_content, + ) start_time = default_timer() try: @@ -240,8 +250,12 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_token_usage_metrics(self.token_usage_metric, span, result.usage) _record_operation_duration_metric(self.operation_duration_metric, span, start_time) - if self.capture_message_content: - _send_log_events_from_choices(self.event_logger, choices=result.choices, attributes=event_attributes) + _send_log_events_from_choices( + self.event_logger, + choices=result.choices, + attributes=event_attributes, + capture_message_content=self.capture_message_content, + ) span.end() diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py index 9b45158..55f1462 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py @@ -245,18 +245,22 @@ def _serialize_tool_calls_for_event(tool_calls): ] -def _send_log_events_from_messages(event_logger: EventLogger, messages, attributes: Attributes): +def _send_log_events_from_messages( + event_logger: EventLogger, messages, attributes: Attributes, capture_message_content: bool +): for message in messages: + body = {} + if capture_message_content: + content = message.get("content") + if content: + body["content"] = content if message["role"] == "system": - event = Event(name=EVENT_GEN_AI_SYSTEM_MESSAGE, body={"content": message["content"]}, attributes=attributes) + event = Event(name=EVENT_GEN_AI_SYSTEM_MESSAGE, body=body, attributes=attributes) event_logger.emit(event) elif message["role"] == "user": - event = Event(name=EVENT_GEN_AI_USER_MESSAGE, body={"content": message["content"]}, attributes=attributes) + event = Event(name=EVENT_GEN_AI_USER_MESSAGE, body=body, attributes=attributes) event_logger.emit(event) elif message["role"] == "assistant": - body = {} - if content := message.get("content"): - body["content"] = content tool_calls = _serialize_tool_calls_for_event(message.get("tool_calls", [])) if tool_calls: body["tool_calls"] = tool_calls @@ -267,28 +271,33 @@ def _send_log_events_from_messages(event_logger: EventLogger, messages, attribut ) event_logger.emit(event) elif message["role"] == "tool": + body["id"] = message["tool_call_id"] event = Event( name=EVENT_GEN_AI_TOOL_MESSAGE, - body={"content": message["content"], "id": message["tool_call_id"]}, + body=body, attributes=attributes, ) event_logger.emit(event) -def _send_log_events_from_choices(event_logger: EventLogger, choices, attributes: Attributes): +def _send_log_events_from_choices( + event_logger: EventLogger, choices, attributes: Attributes, capture_message_content: bool +): for choice in choices: tool_calls = _serialize_tool_calls_for_event(choice.message.tool_calls or []) body = {"finish_reason": choice.finish_reason, "index": choice.index, "message": {}} if tool_calls: body["message"]["tool_calls"] = tool_calls - if choice.message.content: + if capture_message_content and choice.message.content: body["message"]["content"] = choice.message.content event = Event(name=EVENT_GEN_AI_CHOICE, body=body, attributes=attributes) event_logger.emit(event) -def _send_log_events_from_stream_choices(event_logger: EventLogger, choices, span: Span, attributes: Attributes): +def _send_log_events_from_stream_choices( + event_logger: EventLogger, choices, span: Span, attributes: Attributes, capture_message_content: bool +): body = {} message = {} message_content = "" @@ -312,7 +321,7 @@ def _send_log_events_from_stream_choices(event_logger: EventLogger, choices, spa body["finish_reason"] = choice.finish_reason body["index"] = choice.index - if message_content: + if capture_message_content and message_content: message["content"] = message_content if tool_calls: message["tool_calls"] = [call for _, call in sorted(tool_calls.items())] diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index c05c077..e401565 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -79,10 +79,13 @@ def end(self, exc=None): if self.usage: _record_token_usage_metrics(self.token_usage_metric, self.span, self.usage) - if self.capture_message_content: - _send_log_events_from_stream_choices( - self.event_logger, choices=self.choices, span=self.span, attributes=self.event_attributes - ) + _send_log_events_from_stream_choices( + self.event_logger, + choices=self.choices, + span=self.span, + attributes=self.event_attributes, + capture_message_content=self.capture_message_content, + ) self.span.end() diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index 47cc2c8..13cd24b 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -149,7 +149,13 @@ def test_basic( } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) attributes = { @@ -283,7 +289,13 @@ def test_all_the_client_options( assert dict(span.attributes) == expected_attrs logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) attributes = { @@ -302,40 +314,6 @@ def test_all_the_client_options( ) -test_function_calling_with_tools_test_data = [ - ( - "openai_provider_chat_completions", - "gpt-4o-mini", - "gpt-4o-mini-2024-07-18", - "South Atlantic Ocean.", - "chatcmpl-ASfa8hgDJKumHgFlD27gZDNSC8HzZ", - 140, - 19, - 0.006761051714420319, - ), - ( - "azure_provider_chat_completions", - "unused", - "gpt-4-32k", - "South Atlantic Ocean", - "chatcmpl-ASxkDInQTANJ57p0VfUCuAISgNbW8", - 144, - 20, - 0.002889830619096756, - ), - ( - "ollama_provider_chat_completions", - "qwen2.5:0.5b", - "qwen2.5:0.5b", - "The Falklands Islands are located in the oceans south of South America.", - "chatcmpl-641", - 241, - 28, - 0.002600736916065216, - ), -] - - test_multiple_choices_capture_message_content_test_data = [ ( "openai_provider_chat_completions", @@ -448,9 +426,46 @@ def test_multiple_choices_with_capture_message_content( ) +test_function_calling_with_tools_test_data = [ + ( + "openai_provider_chat_completions", + "gpt-4o-mini", + "gpt-4o-mini-2024-07-18", + "South Atlantic Ocean.", + "chatcmpl-ASfa8hgDJKumHgFlD27gZDNSC8HzZ", + "call_62Px1tSvshkL8RBrj4p4msgO", + 140, + 19, + 0.006761051714420319, + ), + ( + "azure_provider_chat_completions", + "unused", + "gpt-4-32k", + "South Atlantic Ocean", + "chatcmpl-ASxkDInQTANJ57p0VfUCuAISgNbW8", + "call_U0QYBadhpy4pBO6jYPm09KvZ", + 144, + 20, + 0.002889830619096756, + ), + ( + "ollama_provider_chat_completions", + "qwen2.5:0.5b", + "qwen2.5:0.5b", + "The Falklands Islands are located in the oceans south of South America.", + "chatcmpl-641", + "call_ww759p36", + 241, + 28, + 0.002600736916065216, + ), +] + + @pytest.mark.vcr() @pytest.mark.parametrize( - "provider_str,model,response_model,content,response_id,input_tokens,output_tokens,duration", + "provider_str,model,response_model,content,response_id,function_call_id,input_tokens,output_tokens,duration", test_function_calling_with_tools_test_data, ) def test_function_calling_with_tools( @@ -459,6 +474,7 @@ def test_function_calling_with_tools( response_model, content, response_id, + function_call_id, input_tokens, output_tokens, duration, @@ -532,7 +548,21 @@ def test_function_calling_with_tools( } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 5 + log_records = logrecords_from_logs(logs) + system_message, user_message, assistant_message, second_user_message, choice = log_records + assert system_message.attributes == {"gen_ai.system": "openai", "event.name": "gen_ai.system.message"} + assert system_message.body == {} + assert user_message.attributes == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert user_message.body == {} + assert assistant_message.attributes == {"gen_ai.system": "openai", "event.name": "gen_ai.assistant.message"} + assert assistant_message.body == {} + assert second_user_message.attributes == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert second_user_message.body == {} + + assert_tool_call_log_record( + choice, [ToolCall(function_call_id, "get_delivery_date", '{"order_id": "order_12345"}')] + ) operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) attributes = { @@ -891,7 +921,11 @@ def test_connection_error(provider_str, model, duration, trace_exporter, metrics } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 1 + log_records = logrecords_from_logs(logs) + (user_message,) = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} (operation_duration_metric,) = get_sorted_metrics(metrics_reader) attributes = { @@ -1193,7 +1227,13 @@ def test_stream( } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) (operation_duration_metric,) = get_sorted_metrics(metrics_reader) attributes = { @@ -1320,7 +1360,13 @@ def test_stream_all_the_client_options( assert dict(span.attributes) == expected_attrs logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) (operation_duration_metric,) = get_sorted_metrics(metrics_reader) attributes = { @@ -1416,7 +1462,13 @@ def test_stream_with_include_usage_option( } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) attributes = { @@ -2128,7 +2180,13 @@ async def test_async_basic( } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) attributes = { @@ -2432,7 +2490,13 @@ async def test_async_stream( } logs = logs_exporter.get_finished_logs() - assert len(logs) == 0 + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice, expected_content=None) (operation_duration_metric,) = get_sorted_metrics(metrics_reader) attributes = { @@ -2881,7 +2945,10 @@ def assert_stop_log_record(log_record: LogRecord, expected_content: str, expecte assert log_record.body["index"] == expected_index assert log_record.body["finish_reason"] == "stop" message = log_record.body["message"] - assert message["content"] == expected_content + if expected_content is None: + assert "content" not in message + else: + assert message["content"] == expected_content def assert_tool_call_log_record(log_record: LogRecord, expected_tool_calls: List[ToolCall], expected_index=0):