diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py index 3abc44532..42506d905 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -1,6 +1,143 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import logging +import re +from enum import Enum +from typing import Any, Dict, List, Optional, Sequence, TypedDict + +from opentelemetry._events import Event +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk._events import EventLoggerProvider from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import Event as SpanEvent +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.util import types + +ROLE_SYSTEM = "system" +ROLE_USER = "user" +ROLE_ASSISTANT = "assistant" + +_logger = logging.getLogger(__name__) + + +class PatternType(str, Enum): + """Types of LLO attribute patterns.""" + + INDEXED = "indexed" + DIRECT = "direct" + + +class PatternConfig(TypedDict, total=False): + """Configuration for an LLO pattern.""" + + type: PatternType + regex: Optional[str] + role_key: Optional[str] + role: Optional[str] + default_role: Optional[str] + source: str + + +LLO_PATTERNS: Dict[str, PatternConfig] = { + "gen_ai.prompt.{index}.content": { + "type": PatternType.INDEXED, + "regex": r"^gen_ai\.prompt\.(\d+)\.content$", + "role_key": "gen_ai.prompt.{index}.role", + "default_role": "unknown", + "source": "prompt", + }, + "gen_ai.completion.{index}.content": { + "type": PatternType.INDEXED, + "regex": r"^gen_ai\.completion\.(\d+)\.content$", + "role_key": "gen_ai.completion.{index}.role", + "default_role": "unknown", + "source": "completion", + }, + "llm.input_messages.{index}.message.content": { + "type": PatternType.INDEXED, + "regex": r"^llm\.input_messages\.(\d+)\.message\.content$", + "role_key": "llm.input_messages.{index}.message.role", + "default_role": ROLE_USER, + "source": "input", + }, + "llm.output_messages.{index}.message.content": { + "type": PatternType.INDEXED, + "regex": r"^llm\.output_messages\.(\d+)\.message\.content$", + "role_key": "llm.output_messages.{index}.message.role", + "default_role": ROLE_ASSISTANT, + "source": "output", + }, + "traceloop.entity.input": { + "type": PatternType.DIRECT, + "role": ROLE_USER, + "source": "input", + }, + "traceloop.entity.output": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, + "crewai.crew.tasks_output": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, + "crewai.crew.result": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "result", + }, + "gen_ai.prompt": { + "type": PatternType.DIRECT, + "role": ROLE_USER, + "source": "prompt", + }, + "gen_ai.completion": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "completion", + }, + "gen_ai.content.revised_prompt": { + "type": PatternType.DIRECT, + "role": ROLE_SYSTEM, + "source": "prompt", + }, + "gen_ai.agent.actual_output": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, + "gen_ai.agent.human_input": { + "type": PatternType.DIRECT, + "role": ROLE_USER, + "source": "input", + }, + "input.value": { + "type": PatternType.DIRECT, + "role": ROLE_USER, + "source": "input", + }, + "output.value": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, + "system_prompt": { + "type": PatternType.DIRECT, + "role": ROLE_SYSTEM, + "source": "prompt", + }, + "tool.result": { + "type": PatternType.DIRECT, + "role": ROLE_ASSISTANT, + "source": "output", + }, + "llm.prompts": { + "type": PatternType.DIRECT, + "role": ROLE_USER, + "source": "prompt", + }, +} class LLOHandler: @@ -8,25 +145,413 @@ class LLOHandler: Utility class for handling Large Language Objects (LLO) in OpenTelemetry spans. LLOHandler performs three primary functions: - 1. Identifies input/output prompt content in spans - 2. Extracts and transforms these attributes into an OpenTelemetry Gen AI Event - 3. Filters input/output prompts from spans to maintain privacy and reduce span size - - This LLOHandler supports the following third-party instrumentation libraries: - - Strands - - OpenInference - - Traceloop/OpenLLMetry - - OpenLIT + 1. Identifies Large Language Objects (LLO) content in spans + 2. Extracts and transforms these attributes into OpenTelemetry Gen AI Events + 3. Filters LLO from spans to maintain privacy and reduce span size + + The handler uses a configuration-driven approach with a pattern registry that defines + all supported LLO attribute patterns and their extraction rules. This makes it easy + to add support for new frameworks without modifying the core logic. """ def __init__(self, logger_provider: LoggerProvider): """ Initialize an LLOHandler with the specified logger provider. - This constructor sets up the event logger provider, configures the event logger, - and initializes the patterns used to identify LLO attributes. + This constructor sets up the event logger provider and compiles patterns + from the pattern registry for efficient matching. Args: logger_provider: The OpenTelemetry LoggerProvider used for emitting events. Global LoggerProvider instance injected from our AwsOpenTelemetryConfigurator """ + self._logger_provider = logger_provider + self._event_logger_provider = EventLoggerProvider(logger_provider=self._logger_provider) + + self._build_pattern_matchers() + + def _build_pattern_matchers(self) -> None: + """ + Build efficient pattern matching structures from the pattern registry. + + Creates: + - Set of exact match patterns for O(1) lookups + - List of compiled regex patterns for indexed patterns + - Mapping of patterns to their configurations + """ + self._exact_match_patterns = set() + self._regex_patterns = [] + self._pattern_configs = {} + + for pattern_key, config in LLO_PATTERNS.items(): + if config["type"] == PatternType.DIRECT: + self._exact_match_patterns.add(pattern_key) + self._pattern_configs[pattern_key] = config + elif config["type"] == PatternType.INDEXED: + if regex_str := config.get("regex"): + compiled_regex = re.compile(regex_str) + self._regex_patterns.append((compiled_regex, pattern_key, config)) + + def _collect_all_llo_messages(self, span: ReadableSpan, attributes: types.Attributes) -> List[Dict[str, Any]]: + """ + Collect all LLO messages from attributes using the pattern registry. + + This is the main collection method that processes all patterns defined + in the registry and extracts messages accordingly. + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + + Returns: + List[Dict[str, Any]]: List of message dictionaries with 'content', 'role', and 'source' keys + """ + messages = [] + + if attributes is None: + return messages + + for attr_key, value in attributes.items(): + if attr_key in self._exact_match_patterns: + config = self._pattern_configs[attr_key] + messages.append( + {"content": value, "role": config.get("role", "unknown"), "source": config.get("source", "unknown")} + ) + + indexed_messages = self._collect_indexed_messages(attributes) + messages.extend(indexed_messages) + + return messages + + def _collect_indexed_messages(self, attributes: types.Attributes) -> List[Dict[str, Any]]: + """ + Collect messages from indexed patterns (e.g., gen_ai.prompt.0.content). + + Handles patterns with numeric indices and their associated role attributes. + + Args: + attributes: Dictionary of attributes to process + + Returns: + List[Dict[str, Any]]: List of message dictionaries + """ + indexed_messages = {} + + if attributes is None: + return [] + + for attr_key, value in attributes.items(): + for regex, pattern_key, config in self._regex_patterns: + match = regex.match(attr_key) + if match: + index = int(match.group(1)) + + role = config.get("default_role", "unknown") + if role_key_template := config.get("role_key"): + role_key = role_key_template.replace("{index}", str(index)) + role = attributes.get(role_key, role) + + key = (pattern_key, index) + indexed_messages[key] = {"content": value, "role": role, "source": config.get("source", "unknown")} + break + + sorted_keys = sorted(indexed_messages.keys(), key=lambda k: (k[0], k[1])) + return [indexed_messages[k] for k in sorted_keys] + + def _collect_llo_attributes_from_span(self, span: ReadableSpan) -> Dict[str, Any]: + """ + Collect all LLO attributes from a span's attributes and events. + + Args: + span: The span to collect LLO attributes from + + Returns: + Dictionary of all LLO attributes found in the span + """ + all_llo_attributes = {} + + # Collect from span attributes + if span.attributes is not None: + for key, value in span.attributes.items(): + if self._is_llo_attribute(key): + all_llo_attributes[key] = value + + # Collect from span events + if span.events: + for event in span.events: + if event.attributes: + for key, value in event.attributes.items(): + if self._is_llo_attribute(key): + all_llo_attributes[key] = value + + return all_llo_attributes + + # pylint: disable-next=no-self-use + def _update_span_attributes(self, span: ReadableSpan, filtered_attributes: types.Attributes) -> None: + """ + Update span attributes, preserving BoundedAttributes if present. + + Args: + span: The span to update + filtered_attributes: The filtered attributes to set + """ + if filtered_attributes is not None and isinstance(span.attributes, BoundedAttributes): + span._attributes = BoundedAttributes( + maxlen=span.attributes.maxlen, + attributes=filtered_attributes, + immutable=span.attributes._immutable, + max_value_len=span.attributes.max_value_len, + ) + else: + span._attributes = filtered_attributes + + def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]: + """ + Processes a sequence of spans to extract and filter LLO attributes. + + For each span, this method: + 1. Collects all LLO attributes from span attributes and all span events + 2. Emits a single consolidated Gen AI Event with all collected LLO content + 3. Filters out LLO attributes from the span and its events to maintain privacy + 4. Preserves non-LLO attributes in the span + + Handles LLO attributes from multiple frameworks: + - Traceloop (indexed prompt/completion patterns and entity input/output) + - OpenLit (direct prompt/completion patterns, including from span events) + - OpenInference (input/output values and structured messages) + - Strands SDK (system prompts and tool results) + - CrewAI (tasks output and results) + + Args: + spans: A sequence of OpenTelemetry ReadableSpan objects to process + + Returns: + List[ReadableSpan]: Modified spans with LLO attributes removed + """ + modified_spans = [] + + for span in spans: + # Collect all LLO attributes from both span attributes and events + all_llo_attributes = self._collect_llo_attributes_from_span(span) + + # Emit a single consolidated event if we found any LLO attributes + if all_llo_attributes: + self._emit_llo_attributes(span, all_llo_attributes) + + # Filter span attributes + filtered_attributes = None + if span.attributes is not None: + filtered_attributes = self._filter_attributes(span.attributes) + + # Update span attributes + self._update_span_attributes(span, filtered_attributes) + + # Filter span events + self._filter_span_events(span) + + modified_spans.append(span) + + return modified_spans + + def _filter_span_events(self, span: ReadableSpan) -> None: + """ + Filter LLO attributes from span events. + + This method removes LLO attributes from event attributes while preserving + the event structure and non-LLO attributes. + + Args: + span: The ReadableSpan to filter events for + + Returns: + None: The span is modified in-place + """ + if not span.events: + return + + updated_events = [] + + for event in span.events: + if not event.attributes: + updated_events.append(event) + continue + + updated_event_attributes = self._filter_attributes(event.attributes) + + if updated_event_attributes is not None and len(updated_event_attributes) != len(event.attributes): + limit = None + if isinstance(event.attributes, BoundedAttributes): + limit = event.attributes.maxlen + + updated_event = SpanEvent( + name=event.name, attributes=updated_event_attributes, timestamp=event.timestamp, limit=limit + ) + + updated_events.append(updated_event) + else: + updated_events.append(event) + + span._events = updated_events + + # pylint: disable-next=no-self-use + def _group_messages_by_type(self, messages: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, str]]]: + """ + Group messages into input and output categories based on role and source. + + Args: + messages: List of message dictionaries with 'role', 'content', and 'source' keys + + Returns: + Dictionary with 'input' and 'output' lists of messages + """ + input_messages = [] + output_messages = [] + + for message in messages: + role = message.get("role", "unknown") + content = message.get("content", "") + formatted_message = {"role": role, "content": content} + + if role in [ROLE_SYSTEM, ROLE_USER]: + input_messages.append(formatted_message) + elif role == ROLE_ASSISTANT: + output_messages.append(formatted_message) + else: + # Route based on source for non-standard roles + if any(key in message.get("source", "") for key in ["completion", "output", "result"]): + output_messages.append(formatted_message) + else: + input_messages.append(formatted_message) + + return {"input": input_messages, "output": output_messages} + + def _emit_llo_attributes( + self, span: ReadableSpan, attributes: types.Attributes, event_timestamp: Optional[int] = None + ) -> None: + """ + Extract LLO attributes and emit them as a single consolidated Gen AI Event. + + This method: + 1. Collects all LLO attributes using the pattern registry + 2. Groups them into input and output messages + 3. Emits one event per span containing all LLO content + + The event body format: + { + "input": { + "messages": [ + {"role": "system", "content": "..."}, + {"role": "user", "content": "..."} + ] + }, + "output": { + "messages": [ + {"role": "assistant", "content": "..."} + ] + } + } + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + None: Event is emitted via the event logger + """ + if attributes is None: + return + has_llo_attrs = any(self._is_llo_attribute(key) for key in attributes) + if not has_llo_attrs: + return + + all_messages = self._collect_all_llo_messages(span, attributes) + if not all_messages: + return + + # Group messages into input/output categories + grouped_messages = self._group_messages_by_type(all_messages) + + # Build event body + event_body = {} + if grouped_messages["input"]: + event_body["input"] = {"messages": grouped_messages["input"]} + if grouped_messages["output"]: + event_body["output"] = {"messages": grouped_messages["output"]} + + if not event_body: + return + + timestamp = event_timestamp if event_timestamp is not None else span.end_time + event_logger = self._event_logger_provider.get_event_logger(span.instrumentation_scope.name) + + event_attributes = {} + if span.attributes and "session.id" in span.attributes: + event_attributes["session.id"] = span.attributes["session.id"] + + event = Event( + name=span.instrumentation_scope.name, + timestamp=timestamp, + body=event_body, + attributes=event_attributes if event_attributes else None, + trace_id=span.context.trace_id, + span_id=span.context.span_id, + trace_flags=span.context.trace_flags, + ) + + event_logger.emit(event) + _logger.debug("Emitted Gen AI Event with input/output message format") + + def _filter_attributes(self, attributes: types.Attributes) -> types.Attributes: + """ + Create a new attributes dictionary with LLO attributes removed. + + This method creates a new dictionary containing only non-LLO attributes, + preserving the original values while filtering out sensitive LLO content. + This helps maintain privacy and reduces the size of spans. + + Args: + attributes: Original dictionary of span or event attributes + + Returns: + types.Attributes: New dictionary with LLO attributes removed, or None if input is None + """ + has_llo_attrs = False + for key in attributes: + if self._is_llo_attribute(key): + has_llo_attrs = True + break + + if not has_llo_attrs: + return attributes + + if attributes is None: + return None + + filtered_attributes = {} + for key, value in attributes.items(): + if not self._is_llo_attribute(key): + filtered_attributes[key] = value + + return filtered_attributes + + def _is_llo_attribute(self, key: str) -> bool: + """ + Determine if an attribute key contains LLO content based on pattern matching. + + Uses the pattern registry to check if a key matches any LLO pattern. + + Args: + key: The attribute key to check + + Returns: + bool: True if the key matches any LLO pattern, False otherwise + """ + if key in self._exact_match_patterns: + return True + + for regex, _, _ in self._regex_patterns: + if regex.match(key): + return True + + return False diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_base.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_base.py new file mode 100644 index 000000000..9f45da93d --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_base.py @@ -0,0 +1,57 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Base test utilities for LLO Handler tests.""" +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import ReadableSpan, SpanContext +from opentelemetry.trace import SpanKind, TraceFlags, TraceState + + +class LLOHandlerTestBase(TestCase): + """Base class with common setup and utilities for LLO Handler tests.""" + + def setUp(self): + self.logger_provider_mock = MagicMock(spec=LoggerProvider) + self.event_logger_mock = MagicMock() + self.event_logger_provider_mock = MagicMock() + self.event_logger_provider_mock.get_event_logger.return_value = self.event_logger_mock + + with patch( + "amazon.opentelemetry.distro.llo_handler.EventLoggerProvider", return_value=self.event_logger_provider_mock + ): + self.llo_handler = LLOHandler(self.logger_provider_mock) + + @staticmethod + def _create_mock_span(attributes=None, kind=SpanKind.INTERNAL, preserve_none=False): + """ + Create a mock ReadableSpan for testing. + + Args: + attributes: Span attributes dictionary. Defaults to empty dict unless preserve_none=True + kind: The span kind (default: INTERNAL) + preserve_none: If True, keeps None attributes instead of converting to empty dict + + Returns: + MagicMock: A mock span with context, attributes, and basic properties set + """ + if attributes is None and not preserve_none: + attributes = {} + + span_context = SpanContext( + trace_id=0x123456789ABCDEF0123456789ABCDEF0, + span_id=0x123456789ABCDEF0, + is_remote=False, + trace_flags=TraceFlags.SAMPLED, + trace_state=TraceState.get_default(), + ) + + mock_span = MagicMock(spec=ReadableSpan) + mock_span.context = span_context + mock_span.attributes = attributes + mock_span.kind = kind + mock_span.start_time = 1234567890 + + return mock_span diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_collection.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_collection.py new file mode 100644 index 000000000..a86cebb20 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_collection.py @@ -0,0 +1,269 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for LLO Handler message collection functionality.""" + +from test_llo_handler_base import LLOHandlerTestBase + + +class TestLLOHandlerCollection(LLOHandlerTestBase): + """Test message collection from various frameworks.""" + + def test_collect_gen_ai_prompt_messages_system_role(self): + """ + Verify indexed prompt messages with system role are collected with correct content, role, and source. + """ + attributes = { + "gen_ai.prompt.0.content": "system instruction", + "gen_ai.prompt.0.role": "system", + } + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "system instruction") + self.assertEqual(message["role"], "system") + self.assertEqual(message["source"], "prompt") + + def test_collect_gen_ai_prompt_messages_user_role(self): + """ + Verify indexed prompt messages with user role are collected with correct content, role, and source. + """ + attributes = { + "gen_ai.prompt.0.content": "user question", + "gen_ai.prompt.0.role": "user", + } + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "user question") + self.assertEqual(message["role"], "user") + self.assertEqual(message["source"], "prompt") + + def test_collect_gen_ai_prompt_messages_assistant_role(self): + """ + Verify indexed prompt messages with assistant role are collected with correct content, role, and source. + """ + attributes = { + "gen_ai.prompt.1.content": "assistant response", + "gen_ai.prompt.1.role": "assistant", + } + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "assistant response") + self.assertEqual(message["role"], "assistant") + self.assertEqual(message["source"], "prompt") + + def test_collect_gen_ai_prompt_messages_function_role(self): + """ + Verify indexed prompt messages with non-standard 'function' role are collected correctly. + """ + attributes = { + "gen_ai.prompt.2.content": "function data", + "gen_ai.prompt.2.role": "function", + } + + span = self._create_mock_span(attributes) + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "function data") + self.assertEqual(message["role"], "function") + self.assertEqual(message["source"], "prompt") + + def test_collect_gen_ai_prompt_messages_unknown_role(self): + """ + Verify indexed prompt messages with unknown role are collected with the role preserved. + """ + attributes = { + "gen_ai.prompt.3.content": "unknown type content", + "gen_ai.prompt.3.role": "unknown", + } + + span = self._create_mock_span(attributes) + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "unknown type content") + self.assertEqual(message["role"], "unknown") + self.assertEqual(message["source"], "prompt") + + def test_collect_gen_ai_completion_messages_assistant_role(self): + """ + Verify indexed completion messages with assistant role are collected with source='completion'. + """ + attributes = { + "gen_ai.completion.0.content": "assistant completion", + "gen_ai.completion.0.role": "assistant", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "assistant completion") + self.assertEqual(message["role"], "assistant") + self.assertEqual(message["source"], "completion") + + def test_collect_gen_ai_completion_messages_other_role(self): + """ + Verify indexed completion messages with custom roles are collected with source='completion'. + """ + attributes = { + "gen_ai.completion.1.content": "other completion", + "gen_ai.completion.1.role": "other", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "other completion") + self.assertEqual(message["role"], "other") + self.assertEqual(message["source"], "completion") + + def test_collect_all_llo_messages_none_attributes(self): + """ + Verify _collect_all_llo_messages returns empty list when attributes are None. + """ + span = self._create_mock_span(None, preserve_none=True) + + messages = self.llo_handler._collect_all_llo_messages(span, None) + + self.assertEqual(messages, []) + self.assertEqual(len(messages), 0) + + def test_collect_indexed_messages_none_attributes(self): + """ + Verify _collect_indexed_messages returns empty list when attributes are None. + """ + messages = self.llo_handler._collect_indexed_messages(None) + + self.assertEqual(messages, []) + self.assertEqual(len(messages), 0) + + def test_collect_indexed_messages_missing_role(self): + """ + Verify indexed messages use default roles when role attributes are missing. + """ + attributes = { + "gen_ai.prompt.0.content": "prompt without role", + "gen_ai.completion.0.content": "completion without role", + } + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 2) + + prompt_msg = next((m for m in messages if m["content"] == "prompt without role"), None) + self.assertIsNotNone(prompt_msg) + self.assertEqual(prompt_msg["role"], "unknown") + self.assertEqual(prompt_msg["source"], "prompt") + + completion_msg = next((m for m in messages if m["content"] == "completion without role"), None) + self.assertIsNotNone(completion_msg) + self.assertEqual(completion_msg["role"], "unknown") + self.assertEqual(completion_msg["source"], "completion") + + def test_indexed_messages_with_out_of_order_indices(self): + """ + Test that indexed messages are sorted correctly even with out-of-order indices + """ + attributes = { + "gen_ai.prompt.5.content": "fifth prompt", + "gen_ai.prompt.5.role": "user", + "gen_ai.prompt.1.content": "first prompt", + "gen_ai.prompt.1.role": "system", + "gen_ai.prompt.3.content": "third prompt", + "gen_ai.prompt.3.role": "user", + "llm.input_messages.10.message.content": "tenth message", + "llm.input_messages.10.message.role": "assistant", + "llm.input_messages.2.message.content": "second message", + "llm.input_messages.2.message.role": "user", + } + + messages = self.llo_handler._collect_indexed_messages(attributes) + + # Messages should be sorted by pattern key first, then by index + self.assertEqual(len(messages), 5) + + # Check gen_ai.prompt messages are in order + gen_ai_messages = [m for m in messages if "prompt" in m["source"]] + self.assertEqual(gen_ai_messages[0]["content"], "first prompt") + self.assertEqual(gen_ai_messages[1]["content"], "third prompt") + self.assertEqual(gen_ai_messages[2]["content"], "fifth prompt") + + # Check llm.input_messages are in order + llm_messages = [m for m in messages if m["content"] in ["second message", "tenth message"]] + self.assertEqual(llm_messages[0]["content"], "second message") + self.assertEqual(llm_messages[1]["content"], "tenth message") + + def test_collect_methods_message_format(self): + """ + Verify all message collection methods return consistent message format with content, role, and source fields. + """ + attributes = { + "gen_ai.prompt.0.content": "prompt", + "gen_ai.prompt.0.role": "user", + "gen_ai.completion.0.content": "response", + "gen_ai.completion.0.role": "assistant", + "traceloop.entity.input": "input", + "gen_ai.prompt": "direct prompt", + "input.value": "inference input", + } + + span = self._create_mock_span(attributes) + + prompt_messages = self.llo_handler._collect_all_llo_messages(span, attributes) + for msg in prompt_messages: + self.assertIn("content", msg) + self.assertIn("role", msg) + self.assertIn("source", msg) + self.assertIsInstance(msg["content"], str) + self.assertIsInstance(msg["role"], str) + self.assertIsInstance(msg["source"], str) + + completion_messages = self.llo_handler._collect_all_llo_messages(span, attributes) + for msg in completion_messages: + self.assertIn("content", msg) + self.assertIn("role", msg) + self.assertIn("source", msg) + + traceloop_messages = self.llo_handler._collect_all_llo_messages(span, attributes) + for msg in traceloop_messages: + self.assertIn("content", msg) + self.assertIn("role", msg) + self.assertIn("source", msg) + + openlit_messages = self.llo_handler._collect_all_llo_messages(span, attributes) + for msg in openlit_messages: + self.assertIn("content", msg) + self.assertIn("role", msg) + self.assertIn("source", msg) + + openinference_messages = self.llo_handler._collect_all_llo_messages(span, attributes) + for msg in openinference_messages: + self.assertIn("content", msg) + self.assertIn("role", msg) + self.assertIn("source", msg) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py new file mode 100644 index 000000000..5d90ebc77 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_events.py @@ -0,0 +1,651 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for LLO Handler event emission functionality.""" + +from unittest.mock import MagicMock, patch + +from test_llo_handler_base import LLOHandlerTestBase + + +class TestLLOHandlerEvents(LLOHandlerTestBase): + """Test event emission and formatting functionality.""" + + def test_emit_llo_attributes(self): + """ + Verify _emit_llo_attributes creates a single consolidated event with input/output message groups + containing all LLO content from various frameworks. + """ + attributes = { + "gen_ai.prompt.0.content": "prompt content", + "gen_ai.prompt.0.role": "user", + "gen_ai.completion.0.content": "completion content", + "gen_ai.completion.0.role": "assistant", + "traceloop.entity.input": "traceloop input", + "traceloop.entity.name": "entity_name", + "gen_ai.agent.actual_output": "agent output", + "crewai.crew.tasks_output": "tasks output", + "crewai.crew.result": "crew result", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + self.assertEqual(emitted_event.name, "test.scope") + self.assertEqual(emitted_event.timestamp, span.end_time) + self.assertEqual(emitted_event.trace_id, span.context.trace_id) + self.assertEqual(emitted_event.span_id, span.context.span_id) + self.assertEqual(emitted_event.trace_flags, span.context.trace_flags) + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertIn("output", event_body) + self.assertIn("messages", event_body["input"]) + self.assertIn("messages", event_body["output"]) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 2) + + user_prompt = next((msg for msg in input_messages if msg["content"] == "prompt content"), None) + self.assertIsNotNone(user_prompt) + self.assertEqual(user_prompt["role"], "user") + + traceloop_input = next((msg for msg in input_messages if msg["content"] == "traceloop input"), None) + self.assertIsNotNone(traceloop_input) + self.assertEqual(traceloop_input["role"], "user") + + output_messages = event_body["output"]["messages"] + self.assertTrue(len(output_messages) >= 3) + + completion = next((msg for msg in output_messages if msg["content"] == "completion content"), None) + self.assertIsNotNone(completion) + self.assertEqual(completion["role"], "assistant") + + agent_output = next((msg for msg in output_messages if msg["content"] == "agent output"), None) + self.assertIsNotNone(agent_output) + self.assertEqual(agent_output["role"], "assistant") + + def test_emit_llo_attributes_multiple_frameworks(self): + """ + Verify a single span containing LLO attributes from multiple frameworks + (Traceloop, OpenLit, OpenInference, CrewAI) generates one consolidated event. + """ + attributes = { + "gen_ai.prompt.0.content": "Tell me about AI", + "gen_ai.prompt.0.role": "user", + "gen_ai.completion.0.content": "AI is a field of computer science...", + "gen_ai.completion.0.role": "assistant", + "traceloop.entity.input": "What is machine learning?", + "traceloop.entity.output": "Machine learning is a subset of AI...", + "gen_ai.prompt": "Explain neural networks", + "gen_ai.completion": "Neural networks are computing systems...", + "input.value": "How do transformers work?", + "output.value": "Transformers are a type of neural network architecture...", + "crewai.crew.result": "Task completed successfully", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.multi.framework" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + self.assertEqual(emitted_event.name, "test.multi.framework") + self.assertEqual(emitted_event.timestamp, span.end_time) + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + input_messages = event_body["input"]["messages"] + input_contents = [msg["content"] for msg in input_messages] + self.assertIn("Tell me about AI", input_contents) + self.assertIn("What is machine learning?", input_contents) + self.assertIn("Explain neural networks", input_contents) + self.assertIn("How do transformers work?", input_contents) + + output_messages = event_body["output"]["messages"] + output_contents = [msg["content"] for msg in output_messages] + self.assertIn("AI is a field of computer science...", output_contents) + self.assertIn("Machine learning is a subset of AI...", output_contents) + self.assertIn("Neural networks are computing systems...", output_contents) + self.assertIn("Transformers are a type of neural network architecture...", output_contents) + self.assertIn("Task completed successfully", output_contents) + + for msg in input_messages: + self.assertIn(msg["role"], ["user", "system"]) + for msg in output_messages: + self.assertEqual(msg["role"], "assistant") + + def test_emit_llo_attributes_no_llo_attributes(self): + """ + Verify _emit_llo_attributes does not emit events when span contains only non-LLO attributes. + """ + attributes = { + "normal.attribute": "value", + "another.attribute": 123, + } + + span = self._create_mock_span(attributes) + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_not_called() + + def test_emit_llo_attributes_mixed_input_output(self): + """ + Verify event generation correctly separates mixed input (system/user) and output (assistant) messages. + """ + attributes = { + "gen_ai.prompt.0.content": "system message", + "gen_ai.prompt.0.role": "system", + "gen_ai.prompt.1.content": "user message", + "gen_ai.prompt.1.role": "user", + "gen_ai.completion.0.content": "assistant response", + "gen_ai.completion.0.role": "assistant", + "input.value": "direct input", + "output.value": "direct output", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 3) + + input_roles = [msg["role"] for msg in input_messages] + self.assertIn("system", input_roles) + self.assertIn("user", input_roles) + + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 2) + + for msg in output_messages: + self.assertEqual(msg["role"], "assistant") + + def test_emit_llo_attributes_with_event_timestamp(self): + """ + Verify _emit_llo_attributes uses provided event timestamp instead of span end time. + """ + attributes = { + "gen_ai.prompt": "test prompt", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + event_timestamp = 9999999999 + + self.llo_handler._emit_llo_attributes(span, attributes, event_timestamp=event_timestamp) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + self.assertEqual(emitted_event.timestamp, event_timestamp) + + def test_emit_llo_attributes_none_attributes(self): + """ + Test _emit_llo_attributes with None attributes - should return early + """ + span = self._create_mock_span({}) + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, None) + + self.event_logger_mock.emit.assert_not_called() + + def test_emit_llo_attributes_role_based_routing(self): + """ + Test role-based routing for non-standard roles + """ + attributes = { + # Standard roles - should go to their expected places + "gen_ai.prompt.0.content": "system prompt", + "gen_ai.prompt.0.role": "system", + "gen_ai.prompt.1.content": "user prompt", + "gen_ai.prompt.1.role": "user", + "gen_ai.completion.0.content": "assistant response", + "gen_ai.completion.0.role": "assistant", + # Non-standard roles - should be routed based on source + "gen_ai.prompt.2.content": "function prompt", + "gen_ai.prompt.2.role": "function", + "gen_ai.completion.1.content": "tool completion", + "gen_ai.completion.1.role": "tool", + "gen_ai.prompt.3.content": "unknown prompt", + "gen_ai.prompt.3.role": "custom_role", + "gen_ai.completion.2.content": "unknown completion", + "gen_ai.completion.2.role": "another_custom", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + # Verify event was emitted + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + + # Check input messages + input_messages = event_body["input"]["messages"] + input_contents = [msg["content"] for msg in input_messages] + + # Standard roles (system, user) should be in input + self.assertIn("system prompt", input_contents) + self.assertIn("user prompt", input_contents) + + # Non-standard roles from prompt source should be in input + self.assertIn("function prompt", input_contents) + self.assertIn("unknown prompt", input_contents) + + # Check output messages + output_messages = event_body["output"]["messages"] + output_contents = [msg["content"] for msg in output_messages] + + # Standard role (assistant) should be in output + self.assertIn("assistant response", output_contents) + + # Non-standard roles from completion source should be in output + self.assertIn("tool completion", output_contents) + self.assertIn("unknown completion", output_contents) + + def test_emit_llo_attributes_empty_messages(self): + """ + Test _emit_llo_attributes when messages list is empty after collection + """ + # Create a span with attributes that would normally match patterns but with empty content + attributes = { + "gen_ai.prompt.0.content": "", + "gen_ai.prompt.0.role": "user", + } + + span = self._create_mock_span(attributes) + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + # Mock _collect_all_llo_messages to return empty list + with patch.object(self.llo_handler, "_collect_all_llo_messages", return_value=[]): + self.llo_handler._emit_llo_attributes(span, attributes) + + # Should not emit event when no messages collected + self.event_logger_mock.emit.assert_not_called() + + def test_emit_llo_attributes_only_input_messages(self): + """ + Test event generation when only input messages are present + """ + attributes = { + "gen_ai.prompt.0.content": "system instruction", + "gen_ai.prompt.0.role": "system", + "gen_ai.prompt.1.content": "user question", + "gen_ai.prompt.1.role": "user", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + + self.assertIn("input", event_body) + self.assertNotIn("output", event_body) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 2) + + def test_emit_llo_attributes_only_output_messages(self): + """ + Test event generation when only output messages are present + """ + attributes = { + "gen_ai.completion.0.content": "assistant response", + "gen_ai.completion.0.role": "assistant", + "output.value": "another output", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + + self.assertNotIn("input", event_body) + self.assertIn("output", event_body) + + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 2) + + def test_emit_llo_attributes_empty_event_body(self): + """ + Test that no event is emitted when event body would be empty + """ + # Create attributes that would result in messages with empty content + attributes = { + "gen_ai.prompt.0.content": "", + "gen_ai.prompt.0.role": "user", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + # Mock _collect_all_llo_messages to return messages with empty content + with patch.object( + self.llo_handler, + "_collect_all_llo_messages", + return_value=[{"content": "", "role": "user", "source": "prompt"}], + ): + self.llo_handler._emit_llo_attributes(span, attributes) + + # Event should still be emitted as we have a message (even with empty content) + self.event_logger_mock.emit.assert_called_once() + + def test_group_messages_by_type_standard_roles(self): + """ + Test _group_messages_by_type correctly groups messages with standard roles. + """ + messages = [ + {"role": "system", "content": "System message", "source": "prompt"}, + {"role": "user", "content": "User message", "source": "prompt"}, + {"role": "assistant", "content": "Assistant message", "source": "completion"}, + ] + + result = self.llo_handler._group_messages_by_type(messages) + + self.assertIn("input", result) + self.assertIn("output", result) + + # Check input messages + self.assertEqual(len(result["input"]), 2) + self.assertEqual(result["input"][0], {"role": "system", "content": "System message"}) + self.assertEqual(result["input"][1], {"role": "user", "content": "User message"}) + + # Check output messages + self.assertEqual(len(result["output"]), 1) + self.assertEqual(result["output"][0], {"role": "assistant", "content": "Assistant message"}) + + def test_group_messages_by_type_non_standard_roles(self): + """ + Test _group_messages_by_type correctly routes non-standard roles based on source. + """ + messages = [ + {"role": "function", "content": "Function call", "source": "prompt"}, + {"role": "tool", "content": "Tool result", "source": "completion"}, + {"role": "custom", "content": "Custom output", "source": "output"}, + {"role": "other", "content": "Other result", "source": "result"}, + ] + + result = self.llo_handler._group_messages_by_type(messages) + + # Non-standard roles from prompt source go to input + self.assertEqual(len(result["input"]), 1) + self.assertEqual(result["input"][0], {"role": "function", "content": "Function call"}) + + # Non-standard roles from completion/output/result sources go to output + self.assertEqual(len(result["output"]), 3) + output_contents = [msg["content"] for msg in result["output"]] + self.assertIn("Tool result", output_contents) + self.assertIn("Custom output", output_contents) + self.assertIn("Other result", output_contents) + + def test_group_messages_by_type_empty_list(self): + """ + Test _group_messages_by_type handles empty message list. + """ + result = self.llo_handler._group_messages_by_type([]) + + self.assertEqual(result, {"input": [], "output": []}) + self.assertEqual(len(result["input"]), 0) + self.assertEqual(len(result["output"]), 0) + + def test_group_messages_by_type_missing_fields(self): + """ + Test _group_messages_by_type handles messages with missing role or content. + """ + messages = [ + {"content": "No role", "source": "prompt"}, # Missing role + {"role": "user", "source": "prompt"}, # Missing content + {"role": "assistant", "content": "Complete message", "source": "completion"}, + ] + + result = self.llo_handler._group_messages_by_type(messages) + + # Message without role gets "unknown" role and goes to input (no completion/output/result in source) + self.assertEqual(len(result["input"]), 2) + self.assertEqual(result["input"][0], {"role": "unknown", "content": "No role"}) + self.assertEqual(result["input"][1], {"role": "user", "content": ""}) + + # Complete message goes to output + self.assertEqual(len(result["output"]), 1) + self.assertEqual(result["output"][0], {"role": "assistant", "content": "Complete message"}) + + def test_emit_llo_attributes_with_llm_prompts(self): + """ + Test that llm.prompts attribute is properly emitted in the input section. + """ + llm_prompts_content = "[{'role': 'system', 'content': [{'text': 'You are helpful.', 'type': 'text'}]}]" + attributes = { + "llm.prompts": llm_prompts_content, + "gen_ai.completion.0.content": "I understand.", + "gen_ai.completion.0.role": "assistant", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + + # Check that llm.prompts is in input section + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 1) + self.assertEqual(input_messages[0]["content"], llm_prompts_content) + self.assertEqual(input_messages[0]["role"], "user") + + # Check output section has the completion + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 1) + self.assertEqual(output_messages[0]["content"], "I understand.") + self.assertEqual(output_messages[0]["role"], "assistant") + + def test_emit_llo_attributes_openlit_style_events(self): + """ + Test that LLO attributes from OpenLit-style span events are collected and emitted + in a single consolidated event, not as separate events. + """ + # This test simulates the OpenLit pattern where prompt and completion are in span events + # The span processor should collect from both and emit a single event + + span_attributes = {"normal.attribute": "value"} + + # Create events like OpenLit does + prompt_event_attrs = {"gen_ai.prompt": "Explain quantum computing"} + prompt_event = MagicMock(attributes=prompt_event_attrs, timestamp=1234567890) + + completion_event_attrs = {"gen_ai.completion": "Quantum computing is..."} + completion_event = MagicMock(attributes=completion_event_attrs, timestamp=1234567891) + + span = self._create_mock_span(span_attributes) + span.events = [prompt_event, completion_event] + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "openlit.otel.tracing" + + # Process the span (this would normally be called by process_spans) + all_llo_attrs = {} + + # Collect from span attributes + for key, value in span_attributes.items(): + if self.llo_handler._is_llo_attribute(key): + all_llo_attrs[key] = value + + # Collect from events + for event in span.events: + if event.attributes: + for key, value in event.attributes.items(): + if self.llo_handler._is_llo_attribute(key): + all_llo_attrs[key] = value + + # Emit consolidated event + self.llo_handler._emit_llo_attributes(span, all_llo_attrs) + + # Verify single event was emitted with both input and output + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + event_body = emitted_event.body + + # Both input and output should be in the same event + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + # Check input section + input_messages = event_body["input"]["messages"] + self.assertEqual(len(input_messages), 1) + self.assertEqual(input_messages[0]["content"], "Explain quantum computing") + self.assertEqual(input_messages[0]["role"], "user") + + # Check output section + output_messages = event_body["output"]["messages"] + self.assertEqual(len(output_messages), 1) + self.assertEqual(output_messages[0]["content"], "Quantum computing is...") + self.assertEqual(output_messages[0]["role"], "assistant") + + def test_emit_llo_attributes_with_session_id(self): + """ + Verify session.id attribute from span is copied to event attributes when present. + """ + attributes = { + "session.id": "test-session-123", + "gen_ai.prompt": "Hello, AI", + "gen_ai.completion": "Hello! How can I help you?", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + # Verify session.id was copied to event attributes + self.assertIsNotNone(emitted_event.attributes) + self.assertEqual(emitted_event.attributes.get("session.id"), "test-session-123") + # Event class always adds event.name + self.assertIn("event.name", emitted_event.attributes) + + # Verify event body still contains LLO data + event_body = emitted_event.body + self.assertIn("input", event_body) + self.assertIn("output", event_body) + + def test_emit_llo_attributes_without_session_id(self): + """ + Verify event attributes do not contain session.id when not present in span attributes. + """ + attributes = { + "gen_ai.prompt": "Hello, AI", + "gen_ai.completion": "Hello! How can I help you?", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + # Verify session.id is not in event attributes + self.assertIsNotNone(emitted_event.attributes) + self.assertNotIn("session.id", emitted_event.attributes) + # Event class always adds event.name + self.assertIn("event.name", emitted_event.attributes) + + def test_emit_llo_attributes_with_session_id_and_other_attributes(self): + """ + Verify only session.id is copied from span attributes when mixed with other attributes. + """ + attributes = { + "session.id": "session-456", + "user.id": "user-789", + "gen_ai.prompt": "What's the weather?", + "gen_ai.completion": "I can't check the weather.", + "other.attribute": "some-value", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._emit_llo_attributes(span, attributes) + + self.event_logger_mock.emit.assert_called_once() + emitted_event = self.event_logger_mock.emit.call_args[0][0] + + # Verify only session.id was copied to event attributes (plus event.name from Event class) + self.assertIsNotNone(emitted_event.attributes) + self.assertEqual(emitted_event.attributes.get("session.id"), "session-456") + self.assertIn("event.name", emitted_event.attributes) + # Verify other span attributes were not copied + self.assertNotIn("user.id", emitted_event.attributes) + self.assertNotIn("other.attribute", emitted_event.attributes) + self.assertNotIn("gen_ai.prompt", emitted_event.attributes) + self.assertNotIn("gen_ai.completion", emitted_event.attributes) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_frameworks.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_frameworks.py new file mode 100644 index 000000000..5dfc069b9 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_frameworks.py @@ -0,0 +1,444 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for LLO Handler framework-specific functionality.""" + +from unittest.mock import MagicMock + +from test_llo_handler_base import LLOHandlerTestBase + + +class TestLLOHandlerFrameworks(LLOHandlerTestBase): + """Test framework-specific LLO attribute handling.""" + + def test_collect_traceloop_messages(self): + """ + Verify Traceloop entity input/output attributes are collected with correct roles + (input->user, output->assistant). + """ + attributes = { + "traceloop.entity.input": "input data", + "traceloop.entity.output": "output data", + "traceloop.entity.name": "my_entity", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + traceloop_messages = [m for m in messages if m["source"] in ["input", "output"]] + + self.assertEqual(len(traceloop_messages), 2) + + input_message = traceloop_messages[0] + self.assertEqual(input_message["content"], "input data") + self.assertEqual(input_message["role"], "user") + self.assertEqual(input_message["source"], "input") + + output_message = traceloop_messages[1] + self.assertEqual(output_message["content"], "output data") + self.assertEqual(output_message["role"], "assistant") + self.assertEqual(output_message["source"], "output") + + def test_collect_traceloop_messages_all_attributes(self): + """ + Verify collection of mixed Traceloop and CrewAI attributes, ensuring all are collected + with appropriate roles and sources. + """ + attributes = { + "traceloop.entity.input": "input data", + "traceloop.entity.output": "output data", + "crewai.crew.tasks_output": "[TaskOutput(description='Task 1', output='Result 1')]", + "crewai.crew.result": "Final crew result", + "traceloop.entity.name": "crewai_agent", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 4) + + self.assertEqual(messages[0]["content"], "input data") + self.assertEqual(messages[0]["role"], "user") + self.assertEqual(messages[0]["source"], "input") + + self.assertEqual(messages[1]["content"], "output data") + self.assertEqual(messages[1]["role"], "assistant") + self.assertEqual(messages[1]["source"], "output") + + self.assertEqual(messages[2]["content"], "[TaskOutput(description='Task 1', output='Result 1')]") + self.assertEqual(messages[2]["role"], "assistant") + self.assertEqual(messages[2]["source"], "output") + + self.assertEqual(messages[3]["content"], "Final crew result") + self.assertEqual(messages[3]["role"], "assistant") + self.assertEqual(messages[3]["source"], "result") + + def test_collect_openlit_messages_direct_prompt(self): + """ + Verify OpenLit's direct gen_ai.prompt attribute is collected with user role and prompt source. + """ + attributes = {"gen_ai.prompt": "user direct prompt"} + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "user direct prompt") + self.assertEqual(message["role"], "user") + self.assertEqual(message["source"], "prompt") + + def test_collect_openlit_messages_direct_completion(self): + """ + Verify OpenLit's direct gen_ai.completion attribute is collected with assistant role and completion source. + """ + attributes = {"gen_ai.completion": "assistant direct completion"} + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "assistant direct completion") + self.assertEqual(message["role"], "assistant") + self.assertEqual(message["source"], "completion") + + def test_collect_openlit_messages_all_attributes(self): + """ + Verify all OpenLit framework attributes (prompt, completion, revised_prompt, agent.*) + are collected with correct roles and sources. + """ + attributes = { + "gen_ai.prompt": "user prompt", + "gen_ai.completion": "assistant response", + "gen_ai.content.revised_prompt": "revised prompt", + "gen_ai.agent.actual_output": "agent output", + "gen_ai.agent.human_input": "human input to agent", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 5) + + self.assertEqual(messages[0]["content"], "user prompt") + self.assertEqual(messages[0]["role"], "user") + self.assertEqual(messages[0]["source"], "prompt") + + self.assertEqual(messages[1]["content"], "assistant response") + self.assertEqual(messages[1]["role"], "assistant") + self.assertEqual(messages[1]["source"], "completion") + + self.assertEqual(messages[2]["content"], "revised prompt") + self.assertEqual(messages[2]["role"], "system") + self.assertEqual(messages[2]["source"], "prompt") + + self.assertEqual(messages[3]["content"], "agent output") + self.assertEqual(messages[3]["role"], "assistant") + self.assertEqual(messages[3]["source"], "output") + + self.assertEqual(messages[4]["content"], "human input to agent") + self.assertEqual(messages[4]["role"], "user") + self.assertEqual(messages[4]["source"], "input") + + def test_collect_openlit_messages_revised_prompt(self): + """ + Verify OpenLit's gen_ai.content.revised_prompt is collected with system role and prompt source. + """ + attributes = {"gen_ai.content.revised_prompt": "revised system prompt"} + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "revised system prompt") + self.assertEqual(message["role"], "system") + self.assertEqual(message["source"], "prompt") + + def test_collect_openinference_messages_direct_attributes(self): + """ + Verify OpenInference's direct input.value and output.value attributes are collected + with appropriate roles (user/assistant) and sources. + """ + attributes = { + "input.value": "user prompt", + "output.value": "assistant response", + "llm.model_name": "gpt-4", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 2) + + input_message = messages[0] + self.assertEqual(input_message["content"], "user prompt") + self.assertEqual(input_message["role"], "user") + self.assertEqual(input_message["source"], "input") + + output_message = messages[1] + self.assertEqual(output_message["content"], "assistant response") + self.assertEqual(output_message["role"], "assistant") + self.assertEqual(output_message["source"], "output") + + def test_collect_openinference_messages_structured_input(self): + """ + Verify OpenInference's indexed llm.input_messages.{n}.message.content attributes + are collected with roles from corresponding role attributes. + """ + attributes = { + "llm.input_messages.0.message.content": "system prompt", + "llm.input_messages.0.message.role": "system", + "llm.input_messages.1.message.content": "user message", + "llm.input_messages.1.message.role": "user", + "llm.model_name": "claude-3", + } + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 2) + + system_message = messages[0] + self.assertEqual(system_message["content"], "system prompt") + self.assertEqual(system_message["role"], "system") + self.assertEqual(system_message["source"], "input") + + user_message = messages[1] + self.assertEqual(user_message["content"], "user message") + self.assertEqual(user_message["role"], "user") + self.assertEqual(user_message["source"], "input") + + def test_collect_openinference_messages_structured_output(self): + """ + Verify OpenInference's indexed llm.output_messages.{n}.message.content attributes + are collected with source='output' and roles from corresponding attributes. + """ + attributes = { + "llm.output_messages.0.message.content": "assistant response", + "llm.output_messages.0.message.role": "assistant", + "llm.model_name": "llama-3", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + + output_message = messages[0] + self.assertEqual(output_message["content"], "assistant response") + self.assertEqual(output_message["role"], "assistant") + self.assertEqual(output_message["source"], "output") + + def test_collect_openinference_messages_mixed_attributes(self): + """ + Verify mixed OpenInference attributes (direct and indexed) are all collected + and maintain correct roles and counts. + """ + attributes = { + "input.value": "direct input", + "output.value": "direct output", + "llm.input_messages.0.message.content": "message input", + "llm.input_messages.0.message.role": "user", + "llm.output_messages.0.message.content": "message output", + "llm.output_messages.0.message.role": "assistant", + "llm.model_name": "bedrock.claude-3", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 4) + + contents = [msg["content"] for msg in messages] + self.assertIn("direct input", contents) + self.assertIn("direct output", contents) + self.assertIn("message input", contents) + self.assertIn("message output", contents) + + roles = [msg["role"] for msg in messages] + self.assertEqual(roles.count("user"), 2) + self.assertEqual(roles.count("assistant"), 2) + + def test_collect_openlit_messages_agent_actual_output(self): + """ + Verify OpenLit's gen_ai.agent.actual_output is collected with assistant role and output source. + """ + attributes = {"gen_ai.agent.actual_output": "Agent task output result"} + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + + message = messages[0] + self.assertEqual(message["content"], "Agent task output result") + self.assertEqual(message["role"], "assistant") + self.assertEqual(message["source"], "output") + + def test_collect_openlit_messages_agent_human_input(self): + """ + Verify OpenLit's gen_ai.agent.human_input is collected with user role and input source. + """ + attributes = {"gen_ai.agent.human_input": "Human input to the agent"} + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], "Human input to the agent") + self.assertEqual(message["role"], "user") + self.assertEqual(message["source"], "input") + + def test_collect_traceloop_messages_crew_outputs(self): + """ + Verify CrewAI-specific attributes (tasks_output, result) are collected with assistant role + and appropriate sources. + """ + attributes = { + "crewai.crew.tasks_output": "[TaskOutput(description='Task description', output='Task result')]", + "crewai.crew.result": "Final crew execution result", + "traceloop.entity.name": "crewai", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 2) + + tasks_message = messages[0] + self.assertEqual(tasks_message["content"], "[TaskOutput(description='Task description', output='Task result')]") + self.assertEqual(tasks_message["role"], "assistant") + self.assertEqual(tasks_message["source"], "output") + + result_message = messages[1] + self.assertEqual(result_message["content"], "Final crew execution result") + self.assertEqual(result_message["role"], "assistant") + self.assertEqual(result_message["source"], "result") + + def test_openinference_messages_with_default_roles(self): + """ + Verify OpenInference indexed messages use default roles (user for input, assistant for output) + when role attributes are missing. + """ + attributes = { + "llm.input_messages.0.message.content": "input without role", + "llm.output_messages.0.message.content": "output without role", + } + + span = self._create_mock_span(attributes) + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 2) + + input_msg = next((m for m in messages if m["content"] == "input without role"), None) + self.assertIsNotNone(input_msg) + self.assertEqual(input_msg["role"], "user") + self.assertEqual(input_msg["source"], "input") + + output_msg = next((m for m in messages if m["content"] == "output without role"), None) + self.assertIsNotNone(output_msg) + self.assertEqual(output_msg["role"], "assistant") + self.assertEqual(output_msg["source"], "output") + + def test_collect_strands_sdk_messages(self): + """ + Verify Strands SDK patterns (system_prompt, tool.result) are collected + with correct roles and sources. + """ + attributes = { + "system_prompt": "You are a helpful assistant", + "tool.result": "Tool execution completed successfully", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "strands.sdk" + + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 2) + + system_msg = next((m for m in messages if m["content"] == "You are a helpful assistant"), None) + self.assertIsNotNone(system_msg) + self.assertEqual(system_msg["role"], "system") + self.assertEqual(system_msg["source"], "prompt") + + tool_msg = next((m for m in messages if m["content"] == "Tool execution completed successfully"), None) + self.assertIsNotNone(tool_msg) + self.assertEqual(tool_msg["role"], "assistant") + self.assertEqual(tool_msg["source"], "output") + + def test_collect_llm_prompts_messages(self): + """ + Verify llm.prompts attribute is collected as a user message with prompt source. + """ + attributes = { + "llm.prompts": ( + "[{'role': 'system', 'content': [{'text': 'You are a helpful AI assistant.', 'type': 'text'}]}, " + "{'role': 'user', 'content': [{'text': 'What are the benefits of using FastAPI?', 'type': 'text'}]}]" + ), + "other.attribute": "not collected", + } + + span = self._create_mock_span(attributes) + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 1) + message = messages[0] + self.assertEqual(message["content"], attributes["llm.prompts"]) + self.assertEqual(message["role"], "user") + self.assertEqual(message["source"], "prompt") + + def test_collect_llm_prompts_with_other_messages(self): + """ + Verify llm.prompts works correctly alongside other LLO attributes. + """ + attributes = { + "llm.prompts": "[{'role': 'system', 'content': 'System prompt'}]", + "gen_ai.prompt": "Direct prompt", + "gen_ai.completion": "Assistant response", + } + + span = self._create_mock_span(attributes) + messages = self.llo_handler._collect_all_llo_messages(span, attributes) + + self.assertEqual(len(messages), 3) + + # Check llm.prompts message + llm_prompts_msg = next((m for m in messages if m["content"] == attributes["llm.prompts"]), None) + self.assertIsNotNone(llm_prompts_msg) + self.assertEqual(llm_prompts_msg["role"], "user") + self.assertEqual(llm_prompts_msg["source"], "prompt") + + # Check other messages are still collected + direct_prompt_msg = next((m for m in messages if m["content"] == "Direct prompt"), None) + self.assertIsNotNone(direct_prompt_msg) + + completion_msg = next((m for m in messages if m["content"] == "Assistant response"), None) + self.assertIsNotNone(completion_msg) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py new file mode 100644 index 000000000..25abfcca6 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_patterns.py @@ -0,0 +1,112 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for LLO Handler pattern matching functionality.""" + +from test_llo_handler_base import LLOHandlerTestBase + +from amazon.opentelemetry.distro.llo_handler import LLO_PATTERNS, LLOHandler, PatternType + + +class TestLLOHandlerPatterns(LLOHandlerTestBase): + """Test pattern matching and recognition functionality.""" + + def test_init(self): + """ + Verify LLOHandler initializes correctly with logger provider and creates event logger provider. + """ + self.assertEqual(self.llo_handler._logger_provider, self.logger_provider_mock) + self.assertEqual(self.llo_handler._event_logger_provider, self.event_logger_provider_mock) + + def test_is_llo_attribute_match(self): + """ + Verify _is_llo_attribute correctly identifies indexed Gen AI prompt patterns (gen_ai.prompt.{n}.content). + """ + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt.0.content")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt.123.content")) + + def test_is_llo_attribute_no_match(self): + """ + Verify _is_llo_attribute correctly rejects malformed patterns and non-LLO attributes. + """ + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.prompt.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.prompt.abc.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("some.other.attribute")) + + def test_is_llo_attribute_traceloop_match(self): + """ + Verify _is_llo_attribute recognizes Traceloop framework patterns (traceloop.entity.input/output). + """ + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.input")) + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.output")) + + def test_is_llo_attribute_openlit_match(self): + """ + Verify _is_llo_attribute recognizes OpenLit framework patterns (gen_ai.prompt, gen_ai.completion, etc.). + """ + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) + + def test_is_llo_attribute_openinference_match(self): + """ + Verify _is_llo_attribute recognizes OpenInference patterns including both direct (input/output.value) + and indexed (llm.input_messages.{n}.message.content) patterns. + """ + self.assertTrue(self.llo_handler._is_llo_attribute("input.value")) + self.assertTrue(self.llo_handler._is_llo_attribute("output.value")) + self.assertTrue(self.llo_handler._is_llo_attribute("llm.input_messages.0.message.content")) + self.assertTrue(self.llo_handler._is_llo_attribute("llm.output_messages.123.message.content")) + + def test_is_llo_attribute_crewai_match(self): + """ + Verify _is_llo_attribute recognizes CrewAI framework patterns (gen_ai.agent.*, crewai.crew.*). + """ + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.agent.actual_output")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.agent.human_input")) + self.assertTrue(self.llo_handler._is_llo_attribute("crewai.crew.tasks_output")) + self.assertTrue(self.llo_handler._is_llo_attribute("crewai.crew.result")) + + def test_is_llo_attribute_strands_sdk_match(self): + """ + Verify _is_llo_attribute recognizes Strands SDK patterns (system_prompt, tool.result). + """ + self.assertTrue(self.llo_handler._is_llo_attribute("system_prompt")) + self.assertTrue(self.llo_handler._is_llo_attribute("tool.result")) + + def test_is_llo_attribute_llm_prompts_match(self): + """ + Verify _is_llo_attribute recognizes llm.prompts pattern. + """ + self.assertTrue(self.llo_handler._is_llo_attribute("llm.prompts")) + + def test_build_pattern_matchers_with_missing_regex(self): + """ + Test _build_pattern_matchers handles patterns with missing regex gracefully + """ + # Temporarily modify LLO_PATTERNS to have a pattern without regex + original_patterns = LLO_PATTERNS.copy() + + # Add a malformed indexed pattern without regex + LLO_PATTERNS["test.bad.pattern"] = { + "type": PatternType.INDEXED, + # Missing "regex" key + "role_key": "test.bad.pattern.role", + "default_role": "unknown", + "source": "test", + } + + try: + # Create a new handler to trigger pattern building + handler = LLOHandler(self.logger_provider_mock) + + # Should handle gracefully - the bad pattern should be skipped + self.assertNotIn("test.bad.pattern", handler._pattern_configs) + + # Other patterns should still work + self.assertTrue(handler._is_llo_attribute("gen_ai.prompt")) + self.assertFalse(handler._is_llo_attribute("test.bad.pattern")) + + finally: + # Restore original patterns + LLO_PATTERNS.clear() + LLO_PATTERNS.update(original_patterns) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_processing.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_processing.py new file mode 100644 index 000000000..d76699849 --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/llo_handler/test_llo_handler_processing.py @@ -0,0 +1,328 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for LLO Handler span and attribute processing functionality.""" + +from unittest.mock import MagicMock, patch + +from test_llo_handler_base import LLOHandlerTestBase + +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk.trace import Event as SpanEvent + + +class TestLLOHandlerProcessing(LLOHandlerTestBase): + """Test span processing and attribute filtering functionality.""" + + def test_filter_attributes(self): + """ + Verify _filter_attributes removes LLO content attributes while preserving role attributes + and other non-LLO attributes. + """ + attributes = { + "gen_ai.prompt.0.content": "test content", + "gen_ai.prompt.0.role": "user", + "normal.attribute": "value", + "another.normal.attribute": 123, + } + + filtered = self.llo_handler._filter_attributes(attributes) + + self.assertNotIn("gen_ai.prompt.0.content", filtered) + self.assertIn("gen_ai.prompt.0.role", filtered) + self.assertIn("normal.attribute", filtered) + self.assertIn("another.normal.attribute", filtered) + + def test_filter_attributes_empty_dict(self): + """ + Verify _filter_attributes returns empty dict when given empty dict. + """ + result = self.llo_handler._filter_attributes({}) + + self.assertEqual(result, {}) + + def test_filter_attributes_none_handling(self): + """ + Verify _filter_attributes returns original attributes when no LLO attributes are present. + """ + attributes = {"normal.attr": "value"} + result = self.llo_handler._filter_attributes(attributes) + + self.assertEqual(result, attributes) + + def test_filter_attributes_no_llo_attrs(self): + """ + Test _filter_attributes when there are no LLO attributes - should return original + """ + attributes = { + "normal.attr1": "value1", + "normal.attr2": "value2", + "other.attribute": "value", # This is not an LLO attribute + } + + result = self.llo_handler._filter_attributes(attributes) + + # Should return the same attributes object when no LLO attrs present + self.assertIs(result, attributes) + self.assertEqual(result, attributes) + + def test_process_spans(self): + """ + Verify process_spans extracts LLO attributes, emits events, filters attributes, + and processes span events correctly. + """ + attributes = {"gen_ai.prompt.0.content": "prompt content", "normal.attribute": "normal value"} + + span = self._create_mock_span(attributes) + span.events = [] + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit, patch.object( + self.llo_handler, "_filter_attributes" + ) as mock_filter: + + filtered_attributes = {"normal.attribute": "normal value"} + mock_filter.return_value = filtered_attributes + + result = self.llo_handler.process_spans([span]) + + # Now it's called with only the LLO attributes + expected_llo_attrs = {"gen_ai.prompt.0.content": "prompt content"} + mock_emit.assert_called_once_with(span, expected_llo_attrs) + mock_filter.assert_called_once_with(attributes) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], span) + self.assertEqual(result[0]._attributes, filtered_attributes) + + def test_process_spans_with_bounded_attributes(self): + """ + Verify process_spans correctly handles spans with BoundedAttributes, + preserving attribute limits and settings. + """ + bounded_attrs = BoundedAttributes( + maxlen=10, + attributes={"gen_ai.prompt.0.content": "prompt content", "normal.attribute": "normal value"}, + immutable=False, + max_value_len=1000, + ) + + span = self._create_mock_span(bounded_attrs) + span.events = [] # Add empty events list + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit, patch.object( + self.llo_handler, "_filter_attributes" + ) as mock_filter: + + filtered_attributes = {"normal.attribute": "normal value"} + mock_filter.return_value = filtered_attributes + + result = self.llo_handler.process_spans([span]) + + # Now it's called with only the LLO attributes + expected_llo_attrs = {"gen_ai.prompt.0.content": "prompt content"} + mock_emit.assert_called_once_with(span, expected_llo_attrs) + mock_filter.assert_called_once_with(bounded_attrs) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], span) + self.assertIsInstance(result[0]._attributes, BoundedAttributes) + self.assertEqual(dict(result[0]._attributes), filtered_attributes) + + def test_process_spans_none_attributes(self): + """ + Verify process_spans correctly handles spans with None attributes. + """ + span = self._create_mock_span(None, preserve_none=True) + span.events = [] + + result = self.llo_handler.process_spans([span]) + + self.assertEqual(len(result), 1) + self.assertIsNone(result[0]._attributes) + + def test_filter_span_events(self): + """ + Verify _filter_span_events filters LLO attributes from span events correctly. + """ + event_attributes = { + "gen_ai.prompt": "event prompt", + "normal.attribute": "keep this", + } + + event = SpanEvent( + name="test_event", + attributes=event_attributes, + timestamp=1234567890, + ) + + span = self._create_mock_span({}) + span.events = [event] + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._filter_span_events(span) + + span_events = getattr(span, "_events", []) + updated_event = span_events[0] + self.assertIn("normal.attribute", updated_event.attributes) + self.assertNotIn("gen_ai.prompt", updated_event.attributes) + + def test_filter_span_events_no_events(self): + """ + Verify _filter_span_events handles spans with no events gracefully. + """ + span = self._create_mock_span({}) + span.events = None + span._events = None + + self.llo_handler._filter_span_events(span) + + self.assertIsNone(span._events) + + def test_filter_span_events_no_attributes(self): + """ + Test _filter_span_events when event has no attributes + """ + event = SpanEvent( + name="test_event", + attributes=None, + timestamp=1234567890, + ) + + span = self._create_mock_span({}) + span.events = [event] + + self.llo_handler._filter_span_events(span) + + # Should handle gracefully and keep the original event + span_events = getattr(span, "_events", []) + self.assertEqual(len(span_events), 1) + self.assertEqual(span_events[0], event) + + def test_filter_span_events_bounded_attributes(self): + """ + Test _filter_span_events with BoundedAttributes in events + """ + bounded_event_attrs = BoundedAttributes( + maxlen=5, + attributes={ + "gen_ai.prompt": "event prompt", + "normal.attribute": "keep this", + }, + immutable=False, + max_value_len=100, + ) + + event = SpanEvent( + name="test_event", + attributes=bounded_event_attrs, + timestamp=1234567890, + limit=5, + ) + + span = self._create_mock_span({}) + span.events = [event] + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "test.scope" + + self.llo_handler._filter_span_events(span) + + # Verify event was updated with filtered attributes + span_events = getattr(span, "_events", []) + updated_event = span_events[0] + self.assertIsInstance(updated_event, SpanEvent) + self.assertEqual(updated_event.name, "test_event") + self.assertIn("normal.attribute", updated_event.attributes) + self.assertNotIn("gen_ai.prompt", updated_event.attributes) + + def test_process_spans_consolidated_event_emission(self): + """ + Verify process_spans collects LLO attributes from both span attributes and events, + then emits a single consolidated event. + """ + # Span attributes with prompt + span_attributes = { + "gen_ai.prompt": "What is quantum computing?", + "normal.attribute": "keep this", + } + + # Event attributes with completion + event_attributes = { + "gen_ai.completion": "Quantum computing is...", + "other.attribute": "also keep this", + } + + event = SpanEvent( + name="gen_ai.content.completion", + attributes=event_attributes, + timestamp=1234567890, + ) + + span = self._create_mock_span(span_attributes) + span.events = [event] + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "openlit.otel.tracing" + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit: + result = self.llo_handler.process_spans([span]) + + # Should emit once with combined attributes + mock_emit.assert_called_once() + call_args = mock_emit.call_args[0] + emitted_span = call_args[0] + emitted_attributes = call_args[1] + + # Verify the emitted attributes contain both prompt and completion + self.assertEqual(emitted_span, span) + self.assertIn("gen_ai.prompt", emitted_attributes) + self.assertIn("gen_ai.completion", emitted_attributes) + self.assertEqual(emitted_attributes["gen_ai.prompt"], "What is quantum computing?") + self.assertEqual(emitted_attributes["gen_ai.completion"], "Quantum computing is...") + + # Verify span attributes are filtered + self.assertNotIn("gen_ai.prompt", result[0]._attributes) + self.assertIn("normal.attribute", result[0]._attributes) + + # Verify event attributes are filtered + updated_event = result[0]._events[0] + self.assertNotIn("gen_ai.completion", updated_event.attributes) + self.assertIn("other.attribute", updated_event.attributes) + + def test_process_spans_multiple_events_consolidated(self): + """ + Verify process_spans handles multiple events correctly, collecting all LLO attributes + into a single consolidated event. + """ + span_attributes = {"normal.attribute": "keep this"} + + # First event with prompt + event1_attrs = {"gen_ai.prompt": "First question"} + event1 = SpanEvent( + name="gen_ai.content.prompt", + attributes=event1_attrs, + timestamp=1234567890, + ) + + # Second event with completion + event2_attrs = {"gen_ai.completion": "First answer"} + event2 = SpanEvent( + name="gen_ai.content.completion", + attributes=event2_attrs, + timestamp=1234567891, + ) + + span = self._create_mock_span(span_attributes) + span.events = [event1, event2] + span.instrumentation_scope = MagicMock() + span.instrumentation_scope.name = "openlit.otel.tracing" + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit: + self.llo_handler.process_spans([span]) + + # Should emit once with attributes from both events + mock_emit.assert_called_once() + emitted_attributes = mock_emit.call_args[0][1] + + self.assertIn("gen_ai.prompt", emitted_attributes) + self.assertIn("gen_ai.completion", emitted_attributes) + self.assertEqual(emitted_attributes["gen_ai.prompt"], "First question") + self.assertEqual(emitted_attributes["gen_ai.completion"], "First answer")