Skip to content

Commit 5f30abf

Browse files
committed
emit llo to log pipeline
1 parent 68f031e commit 5f30abf

File tree

1 file changed

+121
-6
lines changed
  • aws-opentelemetry-distro/src/amazon/opentelemetry/distro

1 file changed

+121
-6
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py

Lines changed: 121 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
import os
35
import re
46
from typing import Dict, Any, List, Optional, Sequence
57

68
from opentelemetry.attributes import BoundedAttributes
79
from opentelemetry.sdk.trace import ReadableSpan, Event
10+
from opentelemetry.sdk._logs import LoggerProvider, LogRecord
11+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
12+
from opentelemetry._logs import get_logger
13+
from opentelemetry._logs.severity import SeverityNumber
14+
from opentelemetry.trace import TraceFlags
15+
16+
from amazon.opentelemetry.distro.otlp_aws_logs_exporter import OTLPAwsLogExporter
17+
18+
_logger = logging.getLogger(__name__)
819

920

1021
class LLOHandler:
1122
"""
1223
Utility class for handling Large Language Model Output (LLO) attributes.
13-
This class identifies LLO attributes and determines whether they should be
14-
processed or filtered out from telemetry data.
24+
This class identifies LLO attributes, emits them as log records,
25+
and filters them out from telemetry data.
1526
"""
1627

1728
def __init__(self):
@@ -35,6 +46,27 @@ def __init__(self):
3546
r"^llm.output_messages\.\d+\.message.content$",
3647
]
3748

49+
# Set up logger for LLO data
50+
self._setup_logger()
51+
52+
def _setup_logger(self):
53+
"""
54+
Set up the logger with OTLP AWS Logs Exporter
55+
"""
56+
# Set up the logs exporter
57+
logs_endpoint = os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT")
58+
if logs_endpoint:
59+
self._logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint)
60+
self._logger_provider = LoggerProvider()
61+
self._logger_provider.add_log_record_processor(
62+
BatchLogRecordProcessor(self._logs_exporter)
63+
)
64+
self._logger = get_logger("llo_logger", logger_provider=self._logger_provider)
65+
_logger.debug(f"Initialized LLO logger with AWS OTLP Logs exporter at {logs_endpoint}")
66+
else:
67+
self._logger = None
68+
_logger.warning("No OTEL_EXPORTER_OTLP_LOGS_ENDPOINT specified, LLO attributes will be filtered but not emitted as logs")
69+
3870
def is_llo_attribute(self, key: str) -> bool:
3971
"""
4072
Determine if an attribute is LLO based on its key.
@@ -67,13 +99,85 @@ def filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
6799
filtered_attributes[key] = value
68100
return filtered_attributes
69101

102+
def emit_llo_attributes(self, span: ReadableSpan, attributes: Dict[str, Any],
103+
event_name: Optional[str] = None, timestamp: Optional[int] = None) -> None:
104+
"""
105+
Extract and emit LLO attributes as log records.
106+
107+
Args:
108+
span: The span containing the LLO attributes
109+
attributes: Dictionary of attributes to check for LLO attributes
110+
event_name: Optional name of the event (if attributes are from an event)
111+
timestamp: Optional timestamp to use (default: span start time)
112+
"""
113+
if not self._logger:
114+
return
115+
116+
try:
117+
# Use span timestamp by default
118+
ts = timestamp or span.start_time
119+
120+
for key, value in attributes.items():
121+
# Only process LLO attributes
122+
if not self.is_llo_attribute(key):
123+
continue
124+
125+
# Create a structured body similar to OpenAI format
126+
# This matches the example log format provided
127+
body = {
128+
"attribute_key": key,
129+
"content": value
130+
}
131+
132+
# Add event information to body if available
133+
if event_name:
134+
body["event_name"] = event_name
135+
136+
# Add span name to body
137+
body["span_name"] = span.name
138+
139+
# Build attributes dict for the log record
140+
log_attributes = {
141+
"event.name": f"llo.attribute.{key.split('.')[-1]}", # Extract last part of attribute key
142+
}
143+
144+
# Add additional context from span if available
145+
for context_key in ["gen_ai.system", "gen_ai.operation.name", "gen_ai.request.model"]:
146+
if context_key in span.attributes:
147+
log_attributes[context_key] = span.attributes[context_key]
148+
149+
# Emit the log record directly using the logger
150+
self._logger.emit(
151+
LogRecord(
152+
timestamp=ts,
153+
observed_timestamp=ts,
154+
trace_id=span.context.trace_id,
155+
span_id=span.context.span_id,
156+
trace_flags=TraceFlags(0x01),
157+
severity_number=SeverityNumber.INFO,
158+
severity_text=None, # Set to None as in the example
159+
body=body, # Use the structured body
160+
attributes=log_attributes
161+
)
162+
)
163+
164+
_logger.debug(f"Emitted LLO log record for attribute: {key}")
165+
166+
except Exception as e:
167+
_logger.error(f"Error emitting LLO log records: {e}", exc_info=True)
168+
70169
def update_span_attributes(self, span: ReadableSpan) -> None:
71170
"""
72-
Update span attributes by filtering out LLO attributes.
171+
Update span attributes by:
172+
1. Emitting LLO attributes as log records (if logger is configured)
173+
2. Filtering out LLO attributes from the span
73174
74175
Args:
75176
span: The span to update
76177
"""
178+
# Emit LLO attributes as log records
179+
self.emit_llo_attributes(span, span.attributes)
180+
77181
# Filter out LLO attributes
78182
updated_attributes = self.filter_attributes(span.attributes)
79183

@@ -90,7 +194,9 @@ def update_span_attributes(self, span: ReadableSpan) -> None:
90194

91195
def process_span_events(self, span: ReadableSpan) -> None:
92196
"""
93-
Process events within a span by filtering out LLO attributes from event attributes.
197+
Process events within a span by:
198+
1. Emitting LLO attributes as log records (if logger is configured)
199+
2. Filtering out LLO attributes from event attributes
94200
95201
Args:
96202
span: The span containing events to process
@@ -106,6 +212,14 @@ def process_span_events(self, span: ReadableSpan) -> None:
106212
updated_events.append(event) # Keep the original event
107213
continue
108214

215+
# Emit LLO attributes as log records
216+
self.emit_llo_attributes(
217+
span,
218+
event.attributes,
219+
event_name=event.name,
220+
timestamp=event.timestamp
221+
)
222+
109223
# Filter out LLO attributes from event
110224
updated_event_attributes = self.filter_attributes(event.attributes)
111225

@@ -135,8 +249,9 @@ def process_span_events(self, span: ReadableSpan) -> None:
135249

136250
def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]:
137251
"""
138-
Process a list of spans by filtering out LLO attributes from both
139-
span attributes and event attributes.
252+
Process a list of spans by:
253+
1. Emitting LLO attributes as log records (if logger is configured)
254+
2. Filtering out LLO attributes from both span attributes and event attributes
140255
141256
Args:
142257
spans: List of spans to process

0 commit comments

Comments
 (0)