diff --git a/sdk/ai/azure-ai-projects/README.md b/sdk/ai/azure-ai-projects/README.md index a944aab13488..db4c31184e48 100644 --- a/sdk/ai/azure-ai-projects/README.md +++ b/sdk/ai/azure-ai-projects/README.md @@ -418,6 +418,15 @@ To enable content recording, set the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE The AI Projects client library automatically instruments OpenAI responses and conversations operations through `AiProjectInstrumentation`. You can disable this instrumentation by setting the environment variable `AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API` to `false`. If the environment variable is not set, the responses and conversations APIs will be instrumented by default. +### Tracing Binary Data + +Binary data are images and files sent to the service as input messages. When you enable content recording (`OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` set to `true`), by default you only trace file IDs and filenames. To enable full binary data tracing, set `AZURE_TRACING_GEN_AI_INCLUDE_BINARY_DATA` to `true`. In this case: + +- **Images**: Image URLs (including data URIs with base64-encoded content) are included +- **Files**: File data is included if sent via the API + +**Important:** Binary data can contain sensitive information and may significantly increase trace size. Some trace backends and tracing implementations may have limitations on the maximum size of trace data that can be sent to and/or supported by the backend. Ensure your observability backend and tracing implementation support the expected trace payload sizes when enabling binary data tracing. + ### Additional resources For more information see: diff --git a/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/_responses_instrumentor.py b/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/_responses_instrumentor.py index 73382b7dcd46..7b62f86ba605 100644 --- a/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/_responses_instrumentor.py +++ b/sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/_responses_instrumentor.py @@ -52,6 +52,7 @@ _responses_traces_enabled: bool = False _trace_responses_content: bool = False +_trace_binary_data: bool = False # Azure OpenAI system identifier for traces AZURE_OPENAI_SYSTEM = "azure.openai" @@ -132,6 +133,14 @@ def is_content_recording_enabled(self) -> bool: """ return self._impl.is_content_recording_enabled() + def is_binary_data_enabled(self) -> bool: + """This function gets the binary data tracing value. + + :return: A bool value indicating whether binary data tracing is enabled. + :rtype: bool + """ + return self._impl.is_binary_data_enabled() + class _ResponsesInstrumentorPreview: # pylint: disable=too-many-instance-attributes,too-many-statements,too-many-public-methods """ @@ -389,10 +398,14 @@ def instrument(self, enable_content_recording: Optional[bool] = None): os.environ.get("OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "false") ) + # Check if binary data tracing is enabled + enable_binary_data = self._str_to_bool(os.environ.get("AZURE_TRACING_GEN_AI_INCLUDE_BINARY_DATA", "false")) + if not self.is_instrumented(): - self._instrument_responses(enable_content_recording) + self._instrument_responses(enable_content_recording, enable_binary_data) else: self.set_enable_content_recording(enable_content_recording) + self.set_enable_binary_data(enable_binary_data) def uninstrument(self): """ @@ -431,6 +444,23 @@ def is_content_recording_enabled(self) -> bool: """ return self._is_content_recording_enabled() + def set_enable_binary_data(self, enable_binary_data: bool = False) -> None: + """This function sets the binary data tracing value. + + :param enable_binary_data: Indicates whether tracing of binary data (such as images) should be enabled. + This only takes effect when content recording is also enabled. + :type enable_binary_data: bool + """ + self._set_enable_binary_data(enable_binary_data=enable_binary_data) + + def is_binary_data_enabled(self) -> bool: + """This function gets the binary data tracing value. + + :return: A bool value indicating whether binary data tracing is enabled. + :rtype: bool + """ + return self._is_binary_data_enabled() + def _set_attributes(self, span: "AbstractSpan", *attrs: Tuple[str, Any]) -> None: for attr in attrs: span.add_attribute(attr[0], attr[1]) @@ -474,7 +504,8 @@ def _add_message_event( event_body: Dict[str, Any] = {} if _trace_responses_content and content: - event_body["text"] = content + # Use consistent structured format with content array + event_body["content"] = [{"type": "text", "text": content}] attributes = self._create_event_attributes( conversation_id=conversation_id, @@ -560,6 +591,138 @@ def _add_tool_message_events( # Use "tool" for the event name: gen_ai.tool.message span.span_instance.add_event(name="gen_ai.tool.message", attributes=attributes) + # pylint: disable=too-many-branches + def _add_structured_input_events( + self, + span: "AbstractSpan", + input_list: List[Any], + conversation_id: Optional[str] = None, + ) -> None: + """ + Add message events for structured input (list format). + This handles cases like messages with images, multi-part content, etc. + """ + for input_item in input_list: + try: + # Extract role - handle both dict and object + if isinstance(input_item, dict): + role = input_item.get("role", "user") + content = input_item.get("content") + else: + role = getattr(input_item, "role", "user") + content = getattr(input_item, "content", None) + + if not content: + continue + + # Build structured event content with content parts + event_body: Dict[str, Any] = {} + + # Only process content if content recording is enabled + if _trace_responses_content: + content_parts = [] + has_non_text_content = False + + # Content can be a list of content items + if isinstance(content, list): + for content_item in content: + content_type = None + + # Handle dict format + if isinstance(content_item, dict): + content_type = content_item.get("type") + if content_type in ("input_text", "text"): + text = content_item.get("text") + if text: + content_parts.append({"type": "text", "text": text}) + elif content_type == "input_image": + has_non_text_content = True + image_part = {"type": "image"} + # Include image data if binary data tracing is enabled + if _trace_binary_data: + image_url = content_item.get("image_url") + if image_url: + image_part["image_url"] = image_url + content_parts.append(image_part) + elif content_type == "input_file": + has_non_text_content = True + file_part = {"type": "file"} + # Only include filename and file_id if content recording is enabled + filename = content_item.get("filename") + if filename: + file_part["filename"] = filename + file_id = content_item.get("file_id") + if file_id: + file_part["file_id"] = file_id + # Only include file_data if binary data tracing is enabled + if _trace_binary_data: + file_data = content_item.get("file_data") + if file_data: + file_part["file_data"] = file_data + content_parts.append(file_part) + elif content_type: + # Other content types (audio, video, etc.) + has_non_text_content = True + content_parts.append({"type": content_type}) + + # Handle object format + elif hasattr(content_item, "type"): + content_type = getattr(content_item, "type", None) + if content_type in ("input_text", "text"): + text = getattr(content_item, "text", None) + if text: + content_parts.append({"type": "text", "text": text}) + elif content_type == "input_image": + has_non_text_content = True + image_part = {"type": "image"} + # Include image data if binary data tracing is enabled + if _trace_binary_data: + image_url = getattr(content_item, "image_url", None) + if image_url: + image_part["image_url"] = image_url + content_parts.append(image_part) + elif content_type == "input_file": + has_non_text_content = True + file_part = {"type": "file"} + # Only include filename and file_id if content recording is enabled + filename = getattr(content_item, "filename", None) + if filename: + file_part["filename"] = filename + file_id = getattr(content_item, "file_id", None) + if file_id: + file_part["file_id"] = file_id + # Only include file_data if binary data tracing is enabled + if _trace_binary_data: + file_data = getattr(content_item, "file_data", None) + if file_data: + file_part["file_data"] = file_data + content_parts.append(file_part) + elif content_type: + # Other content types + has_non_text_content = True + content_parts.append({"type": content_type}) + + # Only add content if we have content parts + if content_parts: + # Always use consistent structured format + event_body["content"] = content_parts + + # Create event attributes + attributes = self._create_event_attributes( + conversation_id=conversation_id, + message_role=role, + ) + attributes[GEN_AI_EVENT_CONTENT] = json.dumps(event_body, ensure_ascii=False) + + # Add the event + event_name = f"gen_ai.{role}.message" + span.span_instance.add_event(name=event_name, attributes=attributes) + + except Exception: # pylint: disable=broad-exception-caught + # Skip items that can't be processed + logger.debug("Failed to process structured input item: %s", input_item, exc_info=True) + continue + def _emit_tool_call_event( self, span: "AbstractSpan", @@ -868,6 +1031,14 @@ def start_responses_span( content=input_text, conversation_id=conversation_id, ) + elif isinstance(input_to_check, list) and not has_tool_outputs: + # Handle structured input (list format) - extract text content from user messages + # This handles cases like image inputs with text prompts + self._add_structured_input_events( + span, + input_list=input_to_check, + conversation_id=conversation_id, + ) return span @@ -2982,7 +3153,7 @@ def _available_responses_apis_and_injectors(self): """ yield from self._generate_api_and_injector(self._all_api_list()) - def _instrument_responses(self, enable_content_tracing: bool = False): + def _instrument_responses(self, enable_content_tracing: bool = False, enable_binary_data: bool = False): """This function modifies the methods of the Responses API classes to inject logic before calling the original methods. The original methods are stored as _original attributes of the methods. @@ -2991,15 +3162,20 @@ def _instrument_responses(self, enable_content_tracing: bool = False): This also controls whether function call tool function names, parameter names and parameter values are traced. :type enable_content_tracing: bool + :param enable_binary_data: Indicates whether tracing of binary data (such as images) should be enabled. + This only takes effect when content recording is also enabled. + :type enable_binary_data: bool """ # pylint: disable=W0603 global _responses_traces_enabled global _trace_responses_content + global _trace_binary_data if _responses_traces_enabled: return _responses_traces_enabled = True _trace_responses_content = enable_content_tracing + _trace_binary_data = enable_binary_data # Initialize metrics instruments self._initialize_metrics() @@ -3050,6 +3226,14 @@ def _is_content_recording_enabled(self) -> bool: global _trace_responses_content return _trace_responses_content + def _set_enable_binary_data(self, enable_binary_data: bool = False) -> None: + global _trace_binary_data + _trace_binary_data = enable_binary_data + + def _is_binary_data_enabled(self) -> bool: + global _trace_binary_data + return _trace_binary_data + def record_error(self, span, exc): # pyright: ignore [reportPossiblyUnboundVariable] span.span_instance.set_status(StatusCode.ERROR, str(exc)) diff --git a/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor.py b/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor.py index 445d2b53feb3..3d7d6a7873f0 100644 --- a/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor.py +++ b/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor.py @@ -21,6 +21,25 @@ settings.tracing_implementation = "OpenTelemetry" _utils._span_impl_type = settings.tracing_implementation() +# Environment variable for binary data tracing +BINARY_DATA_TRACING_ENV_VARIABLE = "AZURE_TRACING_GEN_AI_INCLUDE_BINARY_DATA" + +# Base64-encoded test image (PNG format) for testing binary data capture. +# This is a small png image with letters ABC in black on white background. +TEST_IMAGE_BASE64 = ( + "iVBORw0KGgoAAAANSUhEUgAAAHgAAABDCAYAAABX2cG8AAAAAXNSR0IArs4c6QAAAARnQU1BAACxjwv8YQUAAAAJcEhZcwAAFiUAABYlAUlSJPAAAAPBSURBVHhe7ZFbbgMxDANz" + "/0u3KNAA24Et60FtGscD8CdLkVL8+DpszYM/HPbiPPDmnAfenPPAm3MeeHO2e+DH4/FHn852/wAf+NMfervL+bDUp7HdxXzQmT6FLS/lY1p6F7J7+51vBP+Mlf4j3HEkDz7Xm8E/" + "wqP/Avey5MHnKhBdSAH/iGc3f6NeCXfxyIPPlYQLeZeqws5rL3+nXgF38MiL35mAS0UWq8DOUS+/z3ydsHulDLkpJ1ywsmgE9s066bG8atg5kgJNygAuq17cgn1WJ32WVwX7KCXa" + "tAtcmuqEXVYffZZXAbsoNfrEX7j4SF2wx+qiz/JWYc8tnfxBAZefqQv2rLroXfkzML+z60pLOg+w1AE7rB76LG8W5nd2kZYGHvE8hL91Hcl8q4M+y5uF+V09I+QtPOJ6CH/ndxXM" + "n3XQY3mzMLujw0LexEN4DL+NPFWYPcrnd8tbgdnq/BXyNh4zOojfZ74szGU2v818VZjd0bFC2sZDZsfQY3kzMPeazd9HHhXM7+hYIW3kMdZB9K38EZjpkRrmd/WskDbymNVB9Hpm" + "PDBvpQ7Y0dWzQtbKYzwH0e+dW8E8S12wp7PLQtbKY7wHcSYyO4NZljpgR2fXClkrj4kcxLnoPGGOVyqYq8yOImnmMZ6D6J8pAzOiqsI8RWYWSTOPUSsK57PKwpxKVhVJM49RKwrn" + "K8rAjGyOgnIzD+lQFM7PMuiZKQrnMxkqys08pEsROOuZp5+KwNnovJJyMw/xyIJezwzhbGSec6qMV1Fq5hGqQ5gZzeZcZPYHzkYzOBeZVVNq5hHKQ5gbyeeMd+4K5yMZnIvMqik1" + "8wjlIcyN5HPGO3eF85EMzkVm1aSbeUDHEcz39tDvmSGcj2RwLjKrJt3MA7qOYIeni96VfwTnIxmci84rSbdy+a4D2OHponflH8H5aAZno/MqUq1cvHt5dq066bO8Izj7qgwFqUYu" + "fcfi7LN66Zn5CGei84QZlawsqTYufMfS7LN66Zn5rtBPZWBGJStLuI3L3rkwe2f9/D7yPKFvpArMUmRGCDdx0TuX/YHdo35+p4ffLFVhHtVNuIEL3rHkFXaP+vn96eFvlpQwm+ok" + "lM7FupebsernjlF1wI6ROgilcqGupapwR6+6Yd9KCkIpXEC1hBruuNLdsD8jL37nL5mSu+GfMdKr4T4ZefC53hD+Gd4/5G64Y0QefK7DLfABV/Lgcx1uh49JefE7D2/JeeDNOQ+8" + "OeeBN+c88OacB96c88Cbcx54c84Db8554M35BqSHAPxoJdj6AAAAAElFTkSuQmCC" +) + class TestResponsesInstrumentor(TestAiAgentsInstrumentorBase): """Tests for ResponsesInstrumentor with real endpoints.""" @@ -221,7 +240,7 @@ def test_sync_non_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "Write a short poem about AI"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a short poem about AI"}]}', }, }, { @@ -229,7 +248,7 @@ def test_sync_non_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -318,6 +337,7 @@ def test_sync_non_streaming_without_content_recording(self, **kwargs): def test_sync_streaming_with_content_recording(self, **kwargs): """Test synchronous streaming responses with content recording enabled.""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -380,7 +400,7 @@ def test_sync_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "Write a short poem about AI"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a short poem about AI"}]}', }, }, { @@ -388,7 +408,7 @@ def test_sync_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -623,7 +643,7 @@ def test_sync_non_streaming_without_conversation(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "Write a short poem about AI"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a short poem about AI"}]}', }, }, { @@ -631,7 +651,7 @@ def test_sync_non_streaming_without_conversation(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -645,6 +665,7 @@ def test_sync_non_streaming_without_conversation(self, **kwargs): def test_sync_function_tool_with_content_recording_non_streaming(self, **kwargs): """Test synchronous function tool usage with content recording enabled (non-streaming).""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -755,7 +776,7 @@ def test_sync_function_tool_with_content_recording_non_streaming(self, **kwargs) "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "What\'s the weather in Seattle?"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "What\'s the weather in Seattle?"}]}', }, }, { @@ -803,7 +824,7 @@ def test_sync_function_tool_with_content_recording_non_streaming(self, **kwargs) "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -817,6 +838,7 @@ def test_sync_function_tool_with_content_recording_non_streaming(self, **kwargs) def test_sync_function_tool_with_content_recording_streaming(self, **kwargs): """Test synchronous function tool usage with content recording enabled (streaming).""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -950,7 +972,7 @@ def test_sync_function_tool_with_content_recording_streaming(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "What\'s the weather in Seattle?"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "What\'s the weather in Seattle?"}]}', }, }, { @@ -998,7 +1020,7 @@ def test_sync_function_tool_with_content_recording_streaming(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -1012,6 +1034,7 @@ def test_sync_function_tool_with_content_recording_streaming(self, **kwargs): def test_sync_function_tool_without_content_recording_non_streaming(self, **kwargs): """Test synchronous function tool usage without content recording (non-streaming).""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -1183,6 +1206,7 @@ def test_sync_function_tool_without_content_recording_non_streaming(self, **kwar def test_sync_function_tool_without_content_recording_streaming(self, **kwargs): """Test synchronous function tool usage without content recording (streaming).""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -1371,6 +1395,8 @@ def test_sync_function_tool_without_content_recording_streaming(self, **kwargs): @recorded_by_proxy def test_sync_function_tool_list_conversation_items_with_content_recording(self, **kwargs): """Test listing conversation items after function tool usage with content recording enabled.""" + from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -1504,6 +1530,8 @@ def test_sync_function_tool_list_conversation_items_with_content_recording(self, @recorded_by_proxy def test_sync_function_tool_list_conversation_items_without_content_recording(self, **kwargs): """Test listing conversation items after function tool usage without content recording.""" + from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -1627,3 +1655,1435 @@ def test_sync_function_tool_list_conversation_items_without_content_recording(se assert "gen_ai.event.content" in tool_event_attrs content = tool_event_attrs["gen_ai.event.content"] assert content == "{}" # Should be empty JSON object + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_sync_multiple_text_inputs_with_content_recording_non_streaming(self, **kwargs): + """Test synchronous non-streaming responses with multiple text inputs and content recording enabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + result = client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=False + ) + + # Verify the response exists + assert hasattr(result, "output") + assert result.output is not None + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Hello"}]}', + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a haiku about Python"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_sync_multiple_text_inputs_with_content_recording_streaming(self, **kwargs): + """Test synchronous streaming responses with multiple text inputs and content recording enabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + stream = client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=True + ) + + # Consume the stream + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Hello"}]}', + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a haiku about Python"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_sync_multiple_text_inputs_without_content_recording_non_streaming(self, **kwargs): + """Test synchronous non-streaming responses with multiple text inputs and content recording disabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + result = client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=False + ) + + # Verify the response exists + assert hasattr(result, "output") + assert result.output is not None + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message, all with empty content + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_sync_multiple_text_inputs_without_content_recording_streaming(self, **kwargs): + """Test synchronous streaming responses with multiple text inputs and content recording disabled.""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + stream = client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=True + ) + + # Consume the stream + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message, all with empty content + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_off_binary_off_non_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + # Send only an image (no text) + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_off_binary_on_non_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty (binary flag doesn't matter) + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_on_binary_off_non_streaming(self, **kwargs): + """Test image only with content recording ON and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_on_binary_on_non_streaming(self, **kwargs): + """Test image only with content recording ON and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have image type AND image_url with base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests (Text + Image) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_off_binary_off_non_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + print( + "Deployment Name ------------------------------------------------------------------ :", deployment_name + ) + + conversation = client.conversations.create() + + # Send text + image + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_off_binary_on_non_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty (binary flag doesn't matter) + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_on_binary_off_non_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have text and image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"text","text":"What is shown in this image?"},{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_on_binary_on_non_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + result = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have text and image with full base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"text","text":"What is shown in this image?"}},{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests - Streaming (Image Only) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_off_binary_off_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + # Consume the stream + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_off_binary_on_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_on_binary_off_streaming(self, **kwargs): + """Test image only with content recording ON and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_image_only_content_on_binary_on_streaming(self, **kwargs): + """Test image only with content recording ON and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have image type AND image_url with base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests - Streaming (Text + Image) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_off_binary_off_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_off_binary_on_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_on_binary_off_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have text and image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"text","text":"What is shown in this image?"},{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy + def test_text_and_image_content_on_binary_on_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + with self.create_client(operation_group="tracing", **kwargs) as project_client: + client = project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + conversation = client.conversations.create() + + stream = client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have text and image with full base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"text","text":"What is shown in this image?"}},{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True diff --git a/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor_async.py b/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor_async.py index eba4538c62c6..9f841226ceb3 100644 --- a/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor_async.py +++ b/sdk/ai/azure-ai-projects/tests/agents/telemetry/test_responses_instrumentor_async.py @@ -17,6 +17,11 @@ from test_base import servicePreparer from test_ai_instrumentor_base import TestAiAgentsInstrumentorBase, CONTENT_TRACING_ENV_VARIABLE +BINARY_DATA_TRACING_ENV_VARIABLE = "AZURE_TRACING_GEN_AI_INCLUDE_BINARY_DATA" + +# Base64 encoded small test image (1x1 PNG) +TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==" + settings.tracing_implementation = "OpenTelemetry" _utils._span_impl_type = settings.tracing_implementation() @@ -86,7 +91,7 @@ async def test_async_non_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "Write a short poem about AI"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a short poem about AI"}]}', }, }, { @@ -94,7 +99,7 @@ async def test_async_non_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -171,7 +176,7 @@ async def test_async_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "Write a short poem about AI"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a short poem about AI"}]}', }, }, { @@ -179,7 +184,7 @@ async def test_async_streaming_with_content_recording(self, **kwargs): "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -291,6 +296,7 @@ async def test_async_list_conversation_items_with_content_recording(self, **kwar async def test_async_function_tool_with_content_recording_streaming(self, **kwargs): """Test asynchronous function tool usage with content recording enabled (streaming).""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -426,7 +432,7 @@ async def test_async_function_tool_with_content_recording_streaming(self, **kwar "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "user", - "gen_ai.event.content": '{"text": "What\'s the weather in Seattle?"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "What\'s the weather in Seattle?"}]}', }, }, { @@ -474,7 +480,7 @@ async def test_async_function_tool_with_content_recording_streaming(self, **kwar "attributes": { "gen_ai.provider.name": "azure.openai", "gen_ai.message.role": "assistant", - "gen_ai.event.content": '{"text": "*"}', + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', }, }, ] @@ -488,6 +494,7 @@ async def test_async_function_tool_with_content_recording_streaming(self, **kwar async def test_async_function_tool_without_content_recording_streaming(self, **kwargs): """Test asynchronous function tool usage without content recording (streaming).""" from openai.types.responses.response_input_param import FunctionCallOutput + self.cleanup() os.environ.update( {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} @@ -671,3 +678,1454 @@ async def test_async_function_tool_without_content_recording_streaming(self, **k ] events_match = GenAiTraceVerifier().check_span_events(span2, expected_events_2) assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_multiple_text_inputs_with_content_recording_non_streaming(self, **kwargs): + """Test asynchronous non-streaming responses with multiple text inputs and content recording enabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + async with self.create_async_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = await project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = await client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + result = await client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=False + ) + + # Verify the response exists + assert hasattr(result, "output") + assert result.output is not None + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Hello"}]}', + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a haiku about Python"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_multiple_text_inputs_with_content_recording_streaming(self, **kwargs): + """Test asynchronous streaming responses with multiple text inputs and content recording enabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "True", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + async with self.create_async_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = await project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = await client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + stream = await client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=True + ) + + # Consume the stream + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Hello"}]}', + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "Write a haiku about Python"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": '{"content": [{"type": "text", "text": "*"}]}', + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_multiple_text_inputs_without_content_recording_non_streaming(self, **kwargs): + """Test asynchronous non-streaming responses with multiple text inputs and content recording disabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + async with self.create_async_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = await project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = await client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + result = await client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=False + ) + + # Verify the response exists + assert hasattr(result, "output") + assert result.output is not None + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message, all with empty content + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests (Image Only) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_off_binary_off_non_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + # Send only an image (no text) + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_off_binary_on_non_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty (binary flag doesn't matter) + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_on_binary_off_non_streaming(self, **kwargs): + """Test image only with content recording ON and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_on_binary_on_non_streaming(self, **kwargs): + """Test image only with content recording ON and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have image type AND image_url with base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests (Text + Image) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_off_binary_off_non_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + # Send text + image + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_off_binary_on_non_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty (binary flag doesn't matter) + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_on_binary_off_non_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data OFF (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have text and image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"text","text":"What is shown in this image?"},{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_on_binary_on_non_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data ON (non-streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + result = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=False, + ) + + assert hasattr(result, "output") + assert result.output is not None + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have text and image with full base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"text","text":"What is shown in this image?"}},{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests - Streaming (Image Only) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_off_binary_off_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + # Consume the stream + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_off_binary_on_streaming(self, **kwargs): + """Test image only with content recording OFF and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_on_binary_off_streaming(self, **kwargs): + """Test image only with content recording ON and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_image_only_content_on_binary_on_streaming(self, **kwargs): + """Test image only with content recording ON and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [{"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have image type AND image_url with base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + # ======================================== + # Binary Data Tracing Tests - Streaming (Text + Image) + # ======================================== + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_off_binary_off_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_off_binary_on_streaming(self, **kwargs): + """Test text + image with content recording OFF and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "False", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording OFF: event content should be empty + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_on_binary_off_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data OFF (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "False"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary OFF: should have text and image type but no image_url + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": '{"content":[{"type":"text","text":"What is shown in this image?"},{"type":"image"}]}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_with_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_text_and_image_content_on_binary_on_streaming(self, **kwargs): + """Test text + image with content recording ON and binary data ON (streaming).""" + self.cleanup() + os.environ.update({CONTENT_TRACING_ENV_VARIABLE: "True", BINARY_DATA_TRACING_ENV_VARIABLE: "True"}) + self.setup_telemetry() + assert True == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + project_client = self.create_async_client(operation_group="tracing", **kwargs) + deployment_name = self.test_agents_params["model_deployment_name"] + + async with project_client: + client = await project_client.get_openai_client() + conversation = await client.conversations.create() + + stream = await client.responses.create( + model=deployment_name, + conversation=conversation.id, + input=[ + { + "role": "user", + "content": [ + {"type": "input_text", "text": "What is shown in this image?"}, + {"type": "input_image", "image_url": f"data:image/png;base64,{TEST_IMAGE_BASE64}"}, + ], + } + ], + stream=True, + ) + + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Content recording ON, binary ON: should have text and image with full base64 data + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": f'{{"content":[{{"type":"text","text":"What is shown in this image?"}},{{"type":"image","image_url":"data:image/png;base64,{TEST_IMAGE_BASE64}"}}]}}', + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "*", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True + + @pytest.mark.skip(reason="recordings not working for responses API") + @pytest.mark.usefixtures("instrument_without_content") + @servicePreparer() + @recorded_by_proxy_async + async def test_async_multiple_text_inputs_without_content_recording_streaming(self, **kwargs): + """Test asynchronous streaming responses with multiple text inputs and content recording disabled.""" + self.cleanup() + os.environ.update( + {CONTENT_TRACING_ENV_VARIABLE: "False", "AZURE_TRACING_GEN_AI_INSTRUMENT_RESPONSES_API": "True"} + ) + self.setup_telemetry() + assert False == AIProjectInstrumentor().is_content_recording_enabled() + assert True == AIProjectInstrumentor().is_instrumented() + + async with self.create_async_client(operation_group="tracing", **kwargs) as project_client: + # Get the OpenAI client from the project client + client = await project_client.get_openai_client() + deployment_name = self.test_agents_params["model_deployment_name"] + + # Create a conversation + conversation = await client.conversations.create() + + # Create responses with multiple text inputs as a list + input_list = [ + {"role": "user", "content": [{"type": "input_text", "text": "Hello"}]}, + {"role": "user", "content": [{"type": "input_text", "text": "Write a haiku about Python"}]}, + ] + + stream = await client.responses.create( + model=deployment_name, conversation=conversation.id, input=input_list, stream=True + ) + + # Consume the stream + accumulated_content = [] + async for chunk in stream: + if hasattr(chunk, "delta") and isinstance(chunk.delta, str): + accumulated_content.append(chunk.delta) + elif hasattr(chunk, "output") and chunk.output: + accumulated_content.append(chunk.output) + + full_content = "".join(accumulated_content) + assert full_content is not None + assert len(full_content) > 0 + + # Check spans + self.exporter.force_flush() + spans = self.exporter.get_spans_by_name(f"responses {deployment_name}") + assert len(spans) == 1 + span = spans[0] + + # Check span attributes + expected_attributes = [ + ("az.namespace", "Microsoft.CognitiveServices"), + ("gen_ai.operation.name", "responses"), + ("gen_ai.request.model", deployment_name), + ("gen_ai.provider.name", "azure.openai"), + ("server.address", ""), + ("gen_ai.conversation.id", conversation.id), + ("gen_ai.response.model", deployment_name), + ("gen_ai.response.id", ""), + ("gen_ai.usage.input_tokens", "+"), + ("gen_ai.usage.output_tokens", "+"), + ] + attributes_match = GenAiTraceVerifier().check_span_attributes(span, expected_attributes) + assert attributes_match == True + + # Check span events - should have 2 user messages and 1 assistant message, all with empty content + expected_events = [ + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.user.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "user", + "gen_ai.event.content": "{}", + }, + }, + { + "name": "gen_ai.assistant.message", + "attributes": { + "gen_ai.provider.name": "azure.openai", + "gen_ai.message.role": "assistant", + "gen_ai.event.content": "{}", + }, + }, + ] + events_match = GenAiTraceVerifier().check_span_events(span, expected_events) + assert events_match == True