Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 113 additions & 34 deletions aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def __init__(self, logger_provider: LoggerProvider):
self._event_logger_provider = EventLoggerProvider(logger_provider=self._logger_provider)
self._event_logger = self._event_logger_provider.get_event_logger("gen_ai.events")

# Patterns for attribute filtering
self._exact_match_patterns = [
# Patterns for attribute filtering - using a set for O(1) lookups
self._exact_match_patterns = {
TRACELOOP_ENTITY_INPUT,
TRACELOOP_ENTITY_OUTPUT,
TRACELOOP_CREW_TASKS_OUTPUT,
Expand All @@ -103,7 +103,7 @@ def __init__(self, logger_provider: LoggerProvider):
OPENLIT_AGENT_HUMAN_INPUT,
OPENINFERENCE_INPUT_VALUE,
OPENINFERENCE_OUTPUT_VALUE,
]
}

# Pre-compile regex patterns for better performance
self._regex_patterns = [
Expand Down Expand Up @@ -236,6 +236,16 @@ def _emit_llo_attributes(
Returns:
None: Events are emitted via the event logger
"""
# Quick check if we have any LLO attributes before running extractors
has_llo_attrs = False
for key in attributes:
if self._is_llo_attribute(key):
has_llo_attrs = True
break

if not has_llo_attrs:
return

all_events = []
all_events.extend(self._extract_gen_ai_prompt_events(span, attributes, event_timestamp))
all_events.extend(self._extract_gen_ai_completion_events(span, attributes, event_timestamp))
Expand All @@ -261,8 +271,19 @@ def _filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
Returns:
Dict[str, Any]: New dictionary with LLO attributes removed
"""
# First check if we need to filter anything
has_llo_attrs = False
for key in attributes:
if self._is_llo_attribute(key):
has_llo_attrs = True
break

# If no LLO attributes found, return the original attributes (no need to copy)
if not has_llo_attrs:
return attributes

# Otherwise, create filtered copy
filtered_attributes = {}

for key, value in attributes.items():
if not self._is_llo_attribute(key):
filtered_attributes[key] = value
Expand Down Expand Up @@ -290,12 +311,16 @@ def _is_llo_attribute(self, key: str) -> bool:
Returns:
bool: True if the key matches any LLO pattern, False otherwise
"""
# Check exact matches first (faster)
# Check exact matches first (O(1) lookup in a set)
if key in self._exact_match_patterns:
return True

# Then check regex patterns
return any(pattern.match(key) for pattern in self._regex_patterns)
for pattern in self._regex_patterns:
if pattern.match(key):
return True

return False

def _extract_gen_ai_prompt_events(
self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None
Expand All @@ -321,22 +346,29 @@ def _extract_gen_ai_prompt_events(
Returns:
List[Event]: Events created from prompt attributes
"""
# Quick check if any prompt content attributes exist
if not any(self._prompt_content_pattern.match(key) for key in attributes):
return []

events = []
span_ctx = span.context
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")

# Use helper method to get appropriate timestamp (prompts are inputs)
prompt_timestamp = self._get_timestamp(span, event_timestamp, is_input=True)

# Find all prompt content attributes and their roles
prompt_content_matches = {}
for key, value in attributes.items():
match = self._prompt_content_pattern.match(key)
if not match:
continue

index = match.group(1)
role_key = f"gen_ai.prompt.{index}.role"
role = attributes.get(role_key, "unknown")

if match:
index = match.group(1)
role_key = f"gen_ai.prompt.{index}.role"
role = attributes.get(role_key, "unknown")
prompt_content_matches[index] = (key, value, role)

# Create events for each content+role pair
for index, (key, value, role) in prompt_content_matches.items():
event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key}
body = {"content": value, "role": role}

Expand Down Expand Up @@ -376,22 +408,29 @@ def _extract_gen_ai_completion_events(
Returns:
List[Event]: Events created from completion attributes
"""
# Quick check if any completion content attributes exist
if not any(self._completion_content_pattern.match(key) for key in attributes):
return []

events = []
span_ctx = span.context
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")

# Use helper method to get appropriate timestamp (completions are outputs)
completion_timestamp = self._get_timestamp(span, event_timestamp, is_input=False)

# Find all completion content attributes and their roles
completion_content_matches = {}
for key, value in attributes.items():
match = self._completion_content_pattern.match(key)
if not match:
continue

index = match.group(1)
role_key = f"gen_ai.completion.{index}.role"
role = attributes.get(role_key, "unknown")

if match:
index = match.group(1)
role_key = f"gen_ai.completion.{index}.role"
role = attributes.get(role_key, "unknown")
completion_content_matches[index] = (key, value, role)

# Create events for each content+role pair
for index, (key, value, role) in completion_content_matches.items():
event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key}
body = {"content": value, "role": role}

Expand Down Expand Up @@ -437,6 +476,18 @@ def _extract_traceloop_events(
Returns:
List[Event]: Events created from Traceloop attributes
"""
# Define the Traceloop attributes we're looking for
traceloop_keys = {
TRACELOOP_ENTITY_INPUT,
TRACELOOP_ENTITY_OUTPUT,
TRACELOOP_CREW_TASKS_OUTPUT,
TRACELOOP_CREW_RESULT,
}

# Quick check if any Traceloop attributes exist
if not any(key in attributes for key in traceloop_keys):
return []

events = []
span_ctx = span.context
# Use traceloop.entity.name for the gen_ai.system value
Expand Down Expand Up @@ -521,6 +572,19 @@ def _extract_openlit_span_event_attributes(
Returns:
List[Event]: Events created from OpenLit attributes
"""
# Define the OpenLit attributes we're looking for
openlit_keys = {
OPENLIT_PROMPT,
OPENLIT_COMPLETION,
OPENLIT_REVISED_PROMPT,
OPENLIT_AGENT_ACTUAL_OUTPUT,
OPENLIT_AGENT_HUMAN_INPUT,
}

# Quick check if any OpenLit attributes exist
if not any(key in attributes for key in openlit_keys):
return []

events = []
span_ctx = span.context
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")
Expand Down Expand Up @@ -597,6 +661,17 @@ def _extract_openinference_attributes(
Returns:
List[Event]: Events created from OpenInference attributes
"""
# Define the OpenInference keys/patterns we're looking for
openinference_direct_keys = {OPENINFERENCE_INPUT_VALUE, OPENINFERENCE_OUTPUT_VALUE}

# Quick check if any OpenInference attributes exist
has_direct_attrs = any(key in attributes for key in openinference_direct_keys)
has_input_msgs = any(self._openinference_input_msg_pattern.match(key) for key in attributes)
has_output_msgs = any(self._openinference_output_msg_pattern.match(key) for key in attributes)

if not (has_direct_attrs or has_input_msgs or has_output_msgs):
return []

events = []
span_ctx = span.context
gen_ai_system = span.attributes.get("llm.model_name", "unknown")
Expand Down Expand Up @@ -626,15 +701,17 @@ def _extract_openinference_attributes(
events.append(event)

# Process input messages
input_messages = {}
for key, value in attributes.items():
match = self._openinference_input_msg_pattern.match(key)
if not match:
continue

index = match.group(1)
role_key = f"llm.input_messages.{index}.message.role"
role = attributes.get(role_key, ROLE_USER) # Default to user if role not specified

if match:
index = match.group(1)
role_key = f"llm.input_messages.{index}.message.role"
role = attributes.get(role_key, ROLE_USER) # Default to user if role not specified
input_messages[index] = (key, value, role)

# Create events for input messages
for index, (key, value, role) in input_messages.items():
event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key}
body = {"content": value, "role": role}

Expand All @@ -648,15 +725,17 @@ def _extract_openinference_attributes(
events.append(event)

# Process output messages
output_messages = {}
for key, value in attributes.items():
match = self._openinference_output_msg_pattern.match(key)
if not match:
continue

index = match.group(1)
role_key = f"llm.output_messages.{index}.message.role"
role = attributes.get(role_key, ROLE_ASSISTANT) # Default to assistant if role not specified

if match:
index = match.group(1)
role_key = f"llm.output_messages.{index}.message.role"
role = attributes.get(role_key, ROLE_ASSISTANT) # Default to assistant if role not specified
output_messages[index] = (key, value, role)

# Create events for output messages
for index, (key, value, role) in output_messages.items():
event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key}
body = {"content": value, "role": role}

Expand Down