Skip to content

Commit f9e6af2

Browse files
committed
abstract llo-related logic to utility class
1 parent 99ef646 commit f9e6af2

File tree

2 files changed

+337
-78
lines changed

2 files changed

+337
-78
lines changed
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
import logging
2+
import os
3+
import re
4+
import json
5+
from typing import Dict, Optional, Any
6+
7+
from opentelemetry.sdk.trace import ReadableSpan
8+
from opentelemetry.sdk._logs import LogRecord
9+
from opentelemetry._logs.severity import SeverityNumber
10+
from opentelemetry.trace import TraceFlags
11+
from opentelemetry.sdk._logs import LoggerProvider
12+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
13+
from opentelemetry._logs import get_logger
14+
from amazon.opentelemetry.distro.otlp_aws_logs_exporter import OTLPAwsLogExporter
15+
16+
class LLOHandler:
17+
"""
18+
Handler for Large Language Object (LLO) data in spans.
19+
Processes LLO attributes from various instrumentation libraries and
20+
converts them to standardized log events.
21+
"""
22+
# Class variable to store the configured logger
23+
_logger = None
24+
25+
@classmethod
26+
def configure_logger(cls, logs_endpoint: str):
27+
"""
28+
Configure the LLO logger with the specified logs endpoint.
29+
"""
30+
try:
31+
logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint)
32+
logger_provider = LoggerProvider()
33+
logger_provider.add_log_record_processor(BatchLogRecordProcessor(logs_exporter))
34+
cls._logger = get_logger("llo_logger", logger_provider=logger_provider)
35+
return True
36+
except Exception as e:
37+
logging.error(f"Failed to configure LLO logger: {e}")
38+
return False
39+
40+
@classmethod
41+
def process_span_attributes(cls, span: ReadableSpan) -> Dict[str, Any]:
42+
"""
43+
Process a span's attributes to handle LLO data.
44+
"""
45+
updated_attributes = {}
46+
47+
# Copy all original attributes and handle LLO data
48+
for key, value in span.attributes.items():
49+
if cls.should_offload(key):
50+
log_record = cls.create_log_record_for_llo(span, key, value)
51+
52+
if log_record and cls._logger:
53+
try:
54+
cls._logger.emit(log_record)
55+
except Exception as e:
56+
logging.warning(f"Failed to emit LLO log record: {e}")
57+
else:
58+
updated_attributes[key] = value
59+
60+
return updated_attributes
61+
62+
@staticmethod
63+
def should_offload(key: str) -> bool:
64+
"""
65+
Determine if an attribute key represents LLO data that should be offloaded.
66+
"""
67+
openinference_patterns = [
68+
"input.value",
69+
"output.value",
70+
]
71+
openinference_regex = [
72+
r"^llm\.input_messages\.\d+\.message\.content$",
73+
r"^llm\.output_messages\.\d+\.message\.content$",
74+
]
75+
76+
# Traceloop patterns
77+
traceloop_patterns = [
78+
"traceloop.entity.input",
79+
"traceloop.entity.output",
80+
]
81+
traceloop_regex = [
82+
r"^gen_ai\.prompt\.\d+\.content$",
83+
r"^gen_ai\.completion\.\d+\.content$",
84+
]
85+
86+
# Generic OTel patterns (relevant for multiple libraries)
87+
gen_ai_patterns = [
88+
"message.content",
89+
"gen_ai.prompt",
90+
"gen_ai.completion",
91+
"gen_ai.content.revised_prompt",
92+
]
93+
94+
# Combine all patterns
95+
exact_match_patterns = openinference_patterns + traceloop_patterns + gen_ai_patterns
96+
regex_match_patterns = openinference_regex + traceloop_regex
97+
98+
return (
99+
any(pattern == key for pattern in exact_match_patterns) or
100+
any(re.match(pattern, key) for pattern in regex_match_patterns)
101+
)
102+
103+
@staticmethod
104+
def identify_instrumentation_library(span: ReadableSpan) -> str:
105+
"""
106+
Identify which instrumentation library generated this span.
107+
"""
108+
if hasattr(span, "instrumentation_scope") and span.instrumentation_scope:
109+
scope_name = span.instrumentation_scope.name
110+
if 'openinference' in scope_name:
111+
return 'openinference'
112+
if 'traceloop' in scope_name:
113+
return 'traceloop'
114+
115+
return 'unknown'
116+
117+
@staticmethod
118+
def create_log_record_for_llo(span: ReadableSpan, key: str, value: Any) -> Optional[LogRecord]:
119+
"""
120+
Create a log record for an LLO attribute if it should be offloaded.
121+
"""
122+
if not LLOHandler.should_offload(key):
123+
return None
124+
125+
# Identify the instrumentation library
126+
library = LLOHandler.identify_instrumentation_library(span)
127+
128+
# Get event information based on the library
129+
event_info = LLOHandler.get_event_info(key, value, span, library)
130+
131+
if not event_info:
132+
return None
133+
134+
# Extract event details
135+
event_name = event_info["event_name"]
136+
body = event_info["body"]
137+
138+
gen_ai_system = LLOHandler.determine_gen_ai_system(span, library)
139+
140+
# Create and return log record
141+
log_attributes = {
142+
"event.name": event_name,
143+
"gen_ai.system": gen_ai_system
144+
}
145+
146+
return LogRecord(
147+
timestamp=span.start_time,
148+
observed_timestamp=span.start_time,
149+
trace_id=span.context.trace_id,
150+
span_id=span.context.span_id,
151+
trace_flags=span.context.trace_flags if hasattr(span.context, 'trace_flags') else TraceFlags(0x01),
152+
severity_text="INFO",
153+
severity_number=SeverityNumber.INFO,
154+
body=body,
155+
attributes=log_attributes
156+
)
157+
158+
@staticmethod
159+
def get_event_info(key:str, value: Any, span: ReadableSpan, library: str) -> Optional[Dict]:
160+
"""
161+
Get event information based on the attribute and key library.
162+
"""
163+
if library == 'openinference':
164+
return LLOHandler.get_openinference_event_info(key, value, span)
165+
elif library == 'traceloop':
166+
return LLOHandler.get_traceloop_event_info(key, value, span)
167+
else:
168+
# Generic handling for unknown libraries
169+
return LLOHandler.get_generic_event_info(key, value, span)
170+
171+
@staticmethod
172+
def get_openinference_event_info(key:str, value: Any, span: ReadableSpan) -> Optional[Dict]:
173+
"""
174+
Extract event info for OpenInference attributes.
175+
"""
176+
# Input message patterns
177+
if re.match(r"^llm\.input_messages\.\d+\.message\.content$", key) or key == "input.value":
178+
return {
179+
"event_name": "gen_ai.user.message",
180+
"body": {"content": value}
181+
}
182+
# Output message patterns
183+
elif re.match(r"^llm\.output_messages\.\d+\.message\.content$", key) or key == "output.value":
184+
# Try to extract finish reason and index if available
185+
finish_reason = "stop" # Default value
186+
index = 0 # Default value
187+
188+
for attr_key, attr_val in span.attributes.items():
189+
if "finish_reason" in attr_key:
190+
finish_reason = attr_val
191+
break
192+
193+
return {
194+
"event_name": "gen_ai.choice",
195+
"body": {
196+
"message": {
197+
"content": value,
198+
"role": "assistant"
199+
},
200+
"index": index,
201+
"finish_reason": finish_reason
202+
}
203+
}
204+
205+
return None # Return None for attributes that don't match
206+
207+
@staticmethod
208+
def get_traceloop_event_info(key: str, value: Any, span: ReadableSpan) -> Optional[Dict]:
209+
"""Extract event info for Traceloop attributes."""
210+
# Handle structured input/output JSON data
211+
if key == "traceloop.entity.input" or key == "traceloop.entity.output":
212+
try:
213+
# Attempt to parse as JSON
214+
data = json.loads(value)
215+
216+
# Input handling
217+
if key == "traceloop.entity.input":
218+
# Extract content from inputs if available
219+
content = None
220+
if "inputs" in data and "input" in data["inputs"]:
221+
content = data["inputs"]["input"]
222+
223+
if content:
224+
return {
225+
"event_name": "gen_ai.user.message",
226+
"body": {"content": content}
227+
}
228+
229+
# Output handling
230+
elif key == "traceloop.entity.output":
231+
# Extract content from outputs if available
232+
content = None
233+
if "outputs" in data and "text" in data["outputs"]:
234+
content = data["outputs"]["text"]
235+
236+
if content:
237+
return {
238+
"event_name": "gen_ai.choice",
239+
"body": {
240+
"message": {
241+
"content": content,
242+
"role": "assistant"
243+
},
244+
"index": 0,
245+
"finish_reason": "stop"
246+
}
247+
}
248+
except (json.JSONDecodeError, TypeError):
249+
# If JSON parsing fails, treat as raw text
250+
pass
251+
252+
# Handle direct gen_ai attributes
253+
elif re.match(r"^gen_ai\.prompt\.\d+\.content$", key):
254+
return {
255+
"event_name": "gen_ai.user.message",
256+
"body": {"content": value}
257+
}
258+
elif re.match(r"^gen_ai\.completion\.\d+\.content$", key):
259+
return {
260+
"event_name": "gen_ai.choice",
261+
"body": {
262+
"message": {
263+
"content": value,
264+
"role": "assistant"
265+
},
266+
"index": 0,
267+
"finish_reason": "stop"
268+
}
269+
}
270+
271+
return None
272+
273+
@staticmethod
274+
def get_generic_event_info(key: str, value: Any, span: ReadableSpan) -> Optional[Dict]:
275+
"""Extract event info for generic LLO attributes."""
276+
# Basic pattern detection - input/prompt vs output/completion
277+
if any(pattern in key for pattern in ["input", "prompt"]):
278+
return {
279+
"event_name": "gen_ai.user.message",
280+
"body": {"content": value}
281+
}
282+
elif any(pattern in key for pattern in ["output", "completion"]):
283+
return {
284+
"event_name": "gen_ai.choice",
285+
"body": {
286+
"message": {
287+
"content": value,
288+
"role": "assistant"
289+
},
290+
"index": 0,
291+
"finish_reason": "stop"
292+
}
293+
}
294+
295+
return None
296+
297+
@staticmethod
298+
def determine_gen_ai_system(span: ReadableSpan, library: str) -> str:
299+
"""Determine which gen_ai system is being used."""
300+
# Check for direct attribute
301+
if "gen_ai.system" in span.attributes:
302+
system = span.attributes["gen_ai.system"]
303+
if isinstance(system, str):
304+
return system.lower()
305+
306+
# Check for model name patterns
307+
for key, value in span.attributes.items():
308+
if "model" in key and isinstance(value, str):
309+
if any(model in value.lower() for model in ["gpt", "openai"]):
310+
return "openai"
311+
if any(model in value.lower() for model in ["claude", "anthropic"]):
312+
return "anthropic"
313+
if any(model in value.lower() for model in ["llama", "meta"]):
314+
return "meta"
315+
316+
# Library-specific defaults
317+
if library == "traceloop":
318+
# Check for provider in traceloop properties
319+
for key, value in span.attributes.items():
320+
if "provider" in key and isinstance(value, str):
321+
if "openai" in value.lower():
322+
return "openai"
323+
324+
# Default fallback
325+
return "openai" # Default to openai as it's the most common

0 commit comments

Comments
 (0)