From 7c4fd5bd8ae99bccaffb772d26b1882ddaeda39a Mon Sep 17 00:00:00 2001 From: "mxiamxia@gmail.com" Date: Fri, 22 Aug 2025 17:00:18 -0700 Subject: [PATCH] Add support for OTel GenAI Semantic Convention patterns in LLO handler --- .../opentelemetry/distro/llo_handler.py | 43 ++- .../llo_handler/test_llo_handler_events.py | 329 +++++++++++++++++- .../llo_handler/test_llo_handler_patterns.py | 22 ++ 3 files changed, 391 insertions(+), 3 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py index 42506d905..43292c059 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -16,6 +16,7 @@ ROLE_SYSTEM = "system" ROLE_USER = "user" ROLE_ASSISTANT = "assistant" +ROLE_TOOL = "tool" _logger = logging.getLogger(__name__) @@ -137,6 +138,35 @@ class PatternConfig(TypedDict, total=False): "role": ROLE_USER, "source": "prompt", }, + # OTel GenAI Semantic Convention used by the latest Strands SDK + # References: + # - OTel GenAI SemConv: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/ + # - Strands SDK PR(introduced in v0.1.9): https://github.com/strands-agents/sdk-python/pull/319 + "gen_ai.user.message": { + "type": PatternType.DIRECT, + "role": ROLE_USER, + "source": "prompt", + }, + "gen_ai.assistant.message": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, + "gen_ai.system.message": { + "type": PatternType.DIRECT, + "role": ROLE_SYSTEM, + "source": "prompt", + }, + "gen_ai.tool.message": { + "type": PatternType.DIRECT, + "role": ROLE_TOOL, + "source": "prompt", + }, + "gen_ai.choice": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, } @@ -214,6 +244,7 @@ def _collect_all_llo_messages(self, span: ReadableSpan, attributes: types.Attrib for attr_key, value in attributes.items(): if attr_key in self._exact_match_patterns: config = self._pattern_configs[attr_key] + messages.append( {"content": value, "role": config.get("role", "unknown"), "source": config.get("source", "unknown")} ) @@ -279,6 +310,12 @@ def _collect_llo_attributes_from_span(self, span: ReadableSpan) -> Dict[str, Any # Collect from span events if span.events: for event in span.events: + # Check if event name itself is an LLO pattern (e.g., "gen_ai.user.message") + if self._is_llo_attribute(event.name): + # Put all event attributes as the content as LLO in log event + all_llo_attributes[event.name] = dict(event.attributes) if event.attributes else {} + + # Also check traditional pattern - LLO attributes within event attributes if event.attributes: for key, value in event.attributes.items(): if self._is_llo_attribute(key): @@ -372,6 +409,10 @@ def _filter_span_events(self, span: ReadableSpan) -> None: updated_events = [] for event in span.events: + # Skip entire event if event name is an LLO pattern + if self._is_llo_attribute(event.name): + continue + if not event.attributes: updated_events.append(event) continue @@ -417,7 +458,7 @@ def _group_messages_by_type(self, messages: List[Dict[str, Any]]) -> Dict[str, L elif role == ROLE_ASSISTANT: output_messages.append(formatted_message) else: - # Route based on source for non-standard roles + # Route based on source for non-standard roles including tool if any(key in message.get("source", "") for key in ["completion", "output", "result"]): output_messages.append(formatted_message) else: diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py index 5d90ebc77..1ce194a54 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py @@ -7,7 +7,7 @@ from test_llo_handler_base import LLOHandlerTestBase -class TestLLOHandlerEvents(LLOHandlerTestBase): +class TestLLOHandlerEvents(LLOHandlerTestBase): # pylint: disable=too-many-public-methods """Test event emission and formatting functionality.""" def test_emit_llo_attributes(self): @@ -397,7 +397,7 @@ def test_group_messages_by_type_standard_roles(self): self.assertIn("input", result) self.assertIn("output", result) - # Check input messages + # Check input messages (system and user go to input) self.assertEqual(len(result["input"]), 2) self.assertEqual(result["input"][0], {"role": "system", "content": "System message"}) self.assertEqual(result["input"][1], {"role": "user", "content": "User message"}) @@ -423,6 +423,8 @@ def test_group_messages_by_type_non_standard_roles(self): self.assertEqual(len(result["input"]), 1) self.assertEqual(result["input"][0], {"role": "function", "content": "Function call"}) + # Note: "tool" role from completion source is routed to output based on source + # This differs from "tool" role from prompt source which goes to input as standard role # Non-standard roles from completion/output/result sources go to output self.assertEqual(len(result["output"]), 3) output_contents = [msg["content"] for msg in result["output"]] @@ -430,6 +432,30 @@ def test_group_messages_by_type_non_standard_roles(self): self.assertIn("Custom output", output_contents) self.assertIn("Other result", output_contents) + def test_group_messages_by_type_tool_role_routing(self): + """ + Test _group_messages_by_type correctly routes tool role based on source. + Tool messages from prompt source go to input (tool calls/instructions). + Tool messages from completion/output source go to output (tool results). + """ + messages = [ + {"role": "tool", "content": "Tool instruction", "source": "prompt"}, + {"role": "tool", "content": "Tool result", "source": "completion"}, + {"role": "tool", "content": "Another tool result", "source": "output"}, + ] + + result = self.llo_handler._group_messages_by_type(messages) + + # Tool from prompt source goes to input + self.assertEqual(len(result["input"]), 1) + self.assertEqual(result["input"][0], {"role": "tool", "content": "Tool instruction"}) + + # Tool from completion/output sources go to output + self.assertEqual(len(result["output"]), 2) + output_contents = [msg["content"] for msg in result["output"]] + self.assertIn("Tool result", output_contents) + self.assertIn("Another tool result", output_contents) + def test_group_messages_by_type_empty_list(self): """ Test _group_messages_by_type handles empty message list. @@ -649,3 +675,302 @@ def test_emit_llo_attributes_with_session_id_and_other_attributes(self): self.assertNotIn("other.attribute", emitted_event.attributes) self.assertNotIn("gen_ai.prompt", emitted_event.attributes) self.assertNotIn("gen_ai.completion", emitted_event.attributes) + + def test_emit_llo_attributes_otel_genai_patterns(self): + """ + Test the new GenAI patterns from Strands SDK that follow OTel GenAI Semantic Convention. + """ + attributes = { + "gen_ai.user.message": "What is machine learning?", + "gen_ai.assistant.message": ( + "Machine learning is a subset of AI that enables computers to learn patterns from data." + ), + "gen_ai.system.message": "You are a helpful AI assistant specializing in technology topics.", + "gen_ai.tool.message": "Searching knowledge base for machine learning definitions...", + "gen_ai.choice": "Best answer: Machine learning uses algorithms to find patterns in data.", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "strands.genai.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + self.assertEqual(emitted_event.name, "strands.genai.scope") + self.assertEqual(emitted_event.timestamp, span.end_time) + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + # Check input messages (system, user, and tool messages) + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 3) + + input_contents = [msg["content"] for msg in input_messages] + self.assertIn("What is machine learning?", input_contents) + self.assertIn("You are a helpful AI assistant specializing in technology topics.", input_contents) + self.assertIn("Searching knowledge base for machine learning definitions...", input_contents) + + # Verify roles for input messages + user_msg = next((msg for msg in input_messages if msg["content"] == "What is machine learning?"), None) + self.assertIsNotNone(user_msg) + self.assertEqual(user_msg["role"], "user") + + system_msg = next((msg for msg in input_messages if "helpful AI assistant" in msg["content"]), None) + self.assertIsNotNone(system_msg) + self.assertEqual(system_msg["role"], "system") + + tool_msg = next((msg for msg in input_messages if "Searching knowledge base" in msg["content"]), None) + self.assertIsNotNone(tool_msg) + self.assertEqual(tool_msg["role"], "tool") + + # Check output messages (assistant message and choice) + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 2) + + output_contents = [msg["content"] for msg in output_messages] + self.assertIn( + "Machine learning is a subset of AI that enables computers to learn patterns from data.", output_contents + ) + self.assertIn("Best answer: Machine learning uses algorithms to find patterns in data.", output_contents) + + # Verify all output messages have assistant role + for msg in output_messages: + self.assertEqual(msg["role"], "assistant") + + def test_emit_llo_attributes_genai_user_message_only(self): + """ + Test event generation with only gen_ai.user.message attribute. + """ + attributes = { + "gen_ai.user.message": "Hello, how are you today?", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertNotIn("output", event_body) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 1) + self.assertEqual(input_messages[0]["content"], "Hello, how are you today?") + self.assertEqual(input_messages[0]["role"], "user") + + def test_emit_llo_attributes_genai_assistant_message_only(self): + """ + Test event generation with only gen_ai.assistant.message attribute. + """ + attributes = { + "gen_ai.assistant.message": "I'm doing well, thank you for asking!", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertNotIn("input", event_body) + self.assertIn("output", event_body) + + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 1) + self.assertEqual(output_messages[0]["content"], "I'm doing well, thank you for asking!") + self.assertEqual(output_messages[0]["role"], "assistant") + + def test_emit_llo_attributes_genai_system_message_only(self): + """ + Test event generation with only gen_ai.system.message attribute. + """ + attributes = { + "gen_ai.system.message": "You are a creative writing assistant. Help users write stories.", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertNotIn("output", event_body) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 1) + self.assertEqual( + input_messages[0]["content"], "You are a creative writing assistant. Help users write stories." + ) + self.assertEqual(input_messages[0]["role"], "system") + + def test_emit_llo_attributes_genai_tool_message_only(self): + """ + Test event generation with only gen_ai.tool.message attribute. + """ + attributes = { + "gen_ai.tool.message": "Executing search function with query: 'latest news'", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertNotIn("output", event_body) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 1) + self.assertEqual(input_messages[0]["content"], "Executing search function with query: 'latest news'") + self.assertEqual(input_messages[0]["role"], "tool") + + def test_emit_llo_attributes_genai_choice_only(self): + """ + Test event generation with only gen_ai.choice attribute. + """ + attributes = { + "gen_ai.choice": "Selected option: Continue with the current approach.", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertNotIn("input", event_body) + self.assertIn("output", event_body) + + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 1) + self.assertEqual(output_messages[0]["content"], "Selected option: Continue with the current approach.") + self.assertEqual(output_messages[0]["role"], "assistant") + + def test_emit_llo_attributes_from_span_events(self): + """ + Test that LLO attributes are collected from span events when event names match LLO patterns. + """ + # Create span with normal attributes + span_attributes = {"normal.attribute": "value"} + + # Create span events where event names are LLO patterns + user_event = MagicMock() + user_event.name = "gen_ai.user.message" + user_event.attributes = {"content": "What is the weather today?", "other": "metadata"} + user_event.timestamp = 1234567890 + + assistant_event = MagicMock() + assistant_event.name = "gen_ai.assistant.message" + assistant_event.attributes = {"content": "It's sunny and 75°F", "confidence": 0.95} + assistant_event.timestamp = 1234567891 + + system_event = MagicMock() + system_event.name = "gen_ai.system.message" + system_event.attributes = {"content": "You are a weather assistant"} + system_event.timestamp = 1234567892 + + span = self._create_mock_span(span_attributes) + span.events = [user_event, assistant_event, system_event] + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "event.patterns.scope" + + # Collect LLO attributes and emit event + all_llo_attrs = self.llo_handler._collect_llo_attributes_from_span(span) + self.llo_handler._emit_llo_attributes(span, all_llo_attrs) + + # Should emit event because LLO attributes were collected from span events + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + # Check input messages (user and system) + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 2) + + # User message should contain all event attributes as content + user_msg = next((msg for msg in input_messages if msg["role"] == "user"), None) + self.assertIsNotNone(user_msg) + self.assertEqual(user_msg["content"], {"content": "What is the weather today?", "other": "metadata"}) + + # System message should contain all event attributes as content + system_msg = next((msg for msg in input_messages if msg["role"] == "system"), None) + self.assertIsNotNone(system_msg) + self.assertEqual(system_msg["content"], {"content": "You are a weather assistant"}) + + # Check output messages (assistant) + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 1) + self.assertEqual(output_messages[0]["role"], "assistant") + self.assertEqual(output_messages[0]["content"], {"content": "It's sunny and 75°F", "confidence": 0.95}) + + def test_filter_span_events_removes_llo_pattern_events(self): + """ + Test that span events are completely removed when their names match LLO patterns. + """ + # Create span events + normal_event = MagicMock() + normal_event.name = "normal.event" + normal_event.attributes = {"data": "keep this"} + + llo_event1 = MagicMock() + llo_event1.name = "gen_ai.user.message" + llo_event1.attributes = {"content": "remove this event"} + + llo_event2 = MagicMock() + llo_event2.name = "gen_ai.assistant.message" + llo_event2.attributes = {"content": "remove this too"} + + another_normal_event = MagicMock() + another_normal_event.name = "another.normal.event" + another_normal_event.attributes = {"info": "keep this too"} + + span = self._create_mock_span({}) + span.events = [normal_event, llo_event1, llo_event2, another_normal_event] + + self.llo_handler._filter_span_events(span) + + # Should only have normal events left + self.assertEqual(len(span._events), 2) + remaining_names = [event.name for event in span._events] + self.assertIn("normal.event", remaining_names) + self.assertIn("another.normal.event", remaining_names) + self.assertNotIn("gen_ai.user.message", remaining_names) + self.assertNotIn("gen_ai.assistant.message", remaining_names) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py index 25abfcca6..b29e9f675 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py @@ -110,3 +110,25 @@ def test_build_pattern_matchers_with_missing_regex(self): # Restore original patterns LLO_PATTERNS.clear() LLO_PATTERNS.update(original_patterns) + + def test_is_llo_attribute_otel_genai_patterns_match(self): + """ + Verify _is_llo_attribute recognizes new GenAI patterns from Strands SDK that follow OTel GenAI Semantic + Convention. + """ + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.user.message")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.assistant.message")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.system.message")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.tool.message")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.choice")) + + def test_is_llo_attribute_otel_genai_patterns_no_match(self): + """ + Verify _is_llo_attribute correctly rejects similar but invalid GenAI patterns. + """ + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.user")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.assistant")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.system")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.tool")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.user.message.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.invalid.message"))