11# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22# SPDX-License-Identifier: Apache-2.0
3+ import logging
4+ import os
35import re
46from typing import Dict , Any , List , Optional , Sequence
57
68from opentelemetry .attributes import BoundedAttributes
79from opentelemetry .sdk .trace import ReadableSpan , Event
10+ from opentelemetry .sdk ._logs import LoggerProvider , LogRecord
11+ from opentelemetry .sdk ._logs .export import BatchLogRecordProcessor
12+ from opentelemetry ._logs import get_logger
13+ from opentelemetry ._logs .severity import SeverityNumber
14+ from opentelemetry .trace import TraceFlags
15+
16+ from amazon .opentelemetry .distro .otlp_aws_logs_exporter import OTLPAwsLogExporter
17+
18+ _logger = logging .getLogger (__name__ )
819
920
1021class LLOHandler :
1122 """
1223 Utility class for handling Large Language Model Output (LLO) attributes.
13- This class identifies LLO attributes and determines whether they should be
14- processed or filtered out from telemetry data.
24+ This class identifies LLO attributes, emits them as log records,
25+ and filters them out from telemetry data.
1526 """
1627
1728 def __init__ (self ):
18- # List of exact attribute keys that should be considered LLO attributes
1929 self ._exact_match_patterns = [
2030 "traceloop.entity.input" ,
2131 "traceloop.entity.output" ,
@@ -27,14 +37,32 @@ def __init__(self):
2737 "gen_ai.content.revised_prompt" ,
2838 ]
2939
30- # List of regex patterns that should be considered LLO attributes
3140 self ._regex_match_patterns = [
3241 r"^gen_ai\.prompt\.\d+\.content$" ,
3342 r"^gen_ai\.completion\.\d+\.content$" ,
3443 r"^llm.input_messages\.\d+\.message.content$" ,
3544 r"^llm.output_messages\.\d+\.message.content$" ,
3645 ]
3746
47+ self ._setup_logger ()
48+
49+ def _setup_logger (self ):
50+ """
51+ Set up the logger with OTLP AWS Logs Exporter
52+ """
53+ logs_endpoint = os .getenv ("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" )
54+ if logs_endpoint :
55+ self ._logs_exporter = OTLPAwsLogExporter (endpoint = logs_endpoint )
56+ self ._logger_provider = LoggerProvider ()
57+ self ._logger_provider .add_log_record_processor (
58+ BatchLogRecordProcessor (self ._logs_exporter )
59+ )
60+ self ._logger = get_logger ("llo_logger" , logger_provider = self ._logger_provider )
61+ _logger .debug (f"Initialized LLO logger with AWS OTLP Logs exporter at { logs_endpoint } " )
62+ else :
63+ self ._logger = None
64+ _logger .warning ("No OTEL_EXPORTER_OTLP_LOGS_ENDPOINT specified, LLO attributes will be filtered but not emitted as logs" )
65+
3866 def is_llo_attribute (self , key : str ) -> bool :
3967 """
4068 Determine if an attribute is LLO based on its key.
@@ -67,17 +95,76 @@ def filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
6795 filtered_attributes [key ] = value
6896 return filtered_attributes
6997
98+ def emit_llo_attributes (self , span : ReadableSpan , attributes : Dict [str , Any ],
99+ event_name : Optional [str ] = None , event_timestamp : Optional [int ] = None ) -> None :
100+ """
101+ Extract and emit LLO attributes as log records.
102+
103+ Args:
104+ span: The span containing the LLO attributes
105+ attributes: Dictionary of attributes to check for LLO attributes
106+ event_name: Optional name of the event (if attributes are from an event)
107+ event_timestamp: Optional timestamp for events (span.start_time used for span attributes)
108+ """
109+ if not self ._logger :
110+ return
111+
112+ try :
113+ timestamp = event_timestamp or span .start_time
114+
115+ for key , value in attributes .items ():
116+ if not self .is_llo_attribute (key ):
117+ continue
118+
119+ body = {
120+ "attribute_key" : key ,
121+ "content" : value
122+ }
123+
124+ if event_name :
125+ body ["event_name" ] = event_name
126+
127+ body ["span_name" ] = span .name
128+
129+ log_attributes = {
130+ "event.name" : f"llo.attribute.{ key .split ('.' )[- 1 ]} " ,
131+ }
132+
133+ for context_key in ["gen_ai.system" , "gen_ai.operation.name" , "gen_ai.request.model" ]:
134+ if context_key in span .attributes :
135+ log_attributes [context_key ] = span .attributes [context_key ]
136+
137+ self ._logger .emit (
138+ LogRecord (
139+ timestamp = timestamp ,
140+ observed_timestamp = timestamp ,
141+ trace_id = span .context .trace_id ,
142+ span_id = span .context .span_id ,
143+ trace_flags = TraceFlags (0x01 ),
144+ severity_number = SeverityNumber .INFO ,
145+ severity_text = None ,
146+ body = body ,
147+ attributes = log_attributes
148+ )
149+ )
150+
151+ _logger .debug (f"Emitted LLO log record for attribute: { key } " )
152+
153+ except Exception as e :
154+ _logger .error (f"Error emitting LLO log records: { e } " , exc_info = True )
155+
70156 def update_span_attributes (self , span : ReadableSpan ) -> None :
71157 """
72- Update span attributes by filtering out LLO attributes.
158+ Update span attributes by:
159+ 1. Emitting LLO attributes as log records (if logger is configured)
160+ 2. Filtering out LLO attributes from the span
73161
74162 Args:
75163 span: The span to update
76164 """
77- # Filter out LLO attributes
165+ self . emit_llo_attributes ( span , span . attributes )
78166 updated_attributes = self .filter_attributes (span .attributes )
79167
80- # Update span attributes
81168 if isinstance (span .attributes , BoundedAttributes ):
82169 span ._attributes = BoundedAttributes (
83170 maxlen = span .attributes .maxlen ,
@@ -90,7 +177,9 @@ def update_span_attributes(self, span: ReadableSpan) -> None:
90177
91178 def process_span_events (self , span : ReadableSpan ) -> None :
92179 """
93- Process events within a span by filtering out LLO attributes from event attributes.
180+ Process events within a span by:
181+ 1. Emitting LLO attributes as log records (if logger is configured)
182+ 2. Filtering out LLO attributes from event attributes
94183
95184 Args:
96185 span: The span containing events to process
@@ -101,19 +190,22 @@ def process_span_events(self, span: ReadableSpan) -> None:
101190 updated_events = []
102191
103192 for event in span .events :
104- # Check if this event has any attributes to process
105193 if not event .attributes :
106- updated_events .append (event ) # Keep the original event
194+ updated_events .append (event )
107195 continue
108196
109- # Filter out LLO attributes from event
197+ self .emit_llo_attributes (
198+ span ,
199+ event .attributes ,
200+ event_name = event .name ,
201+ event_timestamp = event .timestamp
202+ )
203+
110204 updated_event_attributes = self .filter_attributes (event .attributes )
111205
112- # Check if attributes were changed
113206 need_to_update = len (updated_event_attributes ) != len (event .attributes )
114207
115208 if need_to_update :
116- # Create new Event with the updated attributes
117209 limit = None
118210 if isinstance (event .attributes , BoundedAttributes ):
119211 limit = event .attributes .maxlen
@@ -127,16 +219,15 @@ def process_span_events(self, span: ReadableSpan) -> None:
127219
128220 updated_events .append (updated_event )
129221 else :
130- # Keep the original event
131222 updated_events .append (event )
132223
133- # Update the span's events with processed events
134224 span ._events = updated_events
135225
136226 def process_spans (self , spans : Sequence [ReadableSpan ]) -> List [ReadableSpan ]:
137227 """
138- Process a list of spans by filtering out LLO attributes from both
139- span attributes and event attributes.
228+ Process a list of spans by:
229+ 1. Emitting LLO attributes as log records (if logger is configured)
230+ 2. Filtering out LLO attributes from both span attributes and event attributes
140231
141232 Args:
142233 spans: List of spans to process
@@ -147,13 +238,8 @@ def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]:
147238 modified_spans = []
148239
149240 for span in spans :
150- # Update span attributes
151241 self .update_span_attributes (span )
152-
153- # Process span events
154242 self .process_span_events (span )
155-
156- # Add the modified span to the result list
157243 modified_spans .append (span )
158244
159245 return modified_spans
0 commit comments