Skip to content

Commit b9b6edd

Browse files
committed
add support for openlit and llo span events
1 parent b34f569 commit b9b6edd

File tree

2 files changed

+314
-192
lines changed

2 files changed

+314
-192
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py

Lines changed: 128 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
import re
33

4-
from typing import Any, Dict, List, Sequence
4+
from typing import Any, Dict, List, Optional, Sequence
55

66
from opentelemetry.attributes import BoundedAttributes
77
from opentelemetry._events import Event
88
from opentelemetry.sdk._logs import LoggerProvider
99
from opentelemetry.sdk._events import EventLoggerProvider
10-
from opentelemetry.sdk.trace import ReadableSpan
10+
from opentelemetry.sdk.trace import ReadableSpan, Event as SpanEvent
1111

1212
GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message"
1313
GEN_AI_USER_MESSAGE = "gen_ai.user.message"
@@ -44,6 +44,9 @@ def __init__(self, logger_provider: LoggerProvider):
4444
self._exact_match_patterns = [
4545
"traceloop.entity.input",
4646
"traceloop.entity.output",
47+
"gen_ai.prompt",
48+
"gen_ai.completion",
49+
"gen_ai.content.revised_prompt",
4750
]
4851
self._regex_match_patterns = [
4952
r"^gen_ai\.prompt\.\d+\.content$",
@@ -78,11 +81,50 @@ def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]:
7881
else:
7982
span._attributes = updated_attributes
8083

84+
self.process_span_events(span)
85+
8186
modified_spans.append(span)
8287

8388
return modified_spans
8489

85-
def _emit_llo_attributes(self, span: ReadableSpan, attributes: Dict[str, Any]) -> None:
90+
def process_span_events(self, span: ReadableSpan) -> None:
91+
"""
92+
Process events within a span by:
93+
1. Emitting LLO attributes as Gen AI Events
94+
2. Filtering out LLO attributes from event attributes
95+
"""
96+
if not span.events:
97+
return
98+
99+
updated_events = []
100+
101+
for event in span.events:
102+
if not event.attributes:
103+
updated_events.append(event)
104+
continue
105+
106+
self._emit_llo_attributes(span, event.attributes, event_timestamp=event.timestamp)
107+
108+
updated_event_attributes = self._filter_attributes(event.attributes)
109+
110+
if len(updated_event_attributes) != len(event.attributes):
111+
limit = None
112+
if isinstance(event.attributes, BoundedAttributes):
113+
limit = event.attributes.maxlen
114+
115+
updated_event = SpanEvent(
116+
name=event.name, attributes=updated_event_attributes, timestamp=event.timestamp, limit=limit
117+
)
118+
119+
updated_events.append(updated_event)
120+
else:
121+
updated_events.append(event)
122+
123+
span._events = updated_events
124+
125+
def _emit_llo_attributes(
126+
self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None
127+
) -> None:
86128
"""
87129
Collects the Gen AI Events for each LLO attribute in the span and emits them
88130
using the event logger.
@@ -95,9 +137,10 @@ def _emit_llo_attributes(self, span: ReadableSpan, attributes: Dict[str, Any]) -
95137
None: Events are emitted via the event logger
96138
"""
97139
all_events = []
98-
all_events.extend(self._extract_gen_ai_prompt_events(span, attributes))
99-
all_events.extend(self._extract_gen_ai_completion_events(span, attributes))
100-
all_events.extend(self._extract_traceloop_events(span, attributes))
140+
all_events.extend(self._extract_gen_ai_prompt_events(span, attributes, event_timestamp))
141+
all_events.extend(self._extract_gen_ai_completion_events(span, attributes, event_timestamp))
142+
all_events.extend(self._extract_traceloop_events(span, attributes, event_timestamp))
143+
all_events.extend(self._extract_openlit_span_event_attributes(span, attributes, event_timestamp))
101144

102145
for event in all_events:
103146
self._event_logger.emit(event)
@@ -141,7 +184,9 @@ def _is_llo_attribute(self, key: str) -> bool:
141184
re.match(pattern, key) for pattern in self._regex_match_patterns
142185
)
143186

144-
def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]:
187+
def _extract_gen_ai_prompt_events(
188+
self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None
189+
) -> List[Event]:
145190
"""
146191
Extract gen_ai prompt events from attributes. Each item `gen_ai.prompt.{n}.content`
147192
maps has an associated `gen_ai.prompt.{n}.role` that determines the Event
@@ -165,7 +210,7 @@ def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str
165210
span_ctx = span.context
166211
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")
167212

168-
prompt_timestamp = span.start_time
213+
prompt_timestamp = event_timestamp if event_timestamp is not None else span.start_time
169214
prompt_content_pattern = re.compile(r"^gen_ai\.prompt\.(\d+)\.content$")
170215

171216
for key, value in attributes.items():
@@ -220,12 +265,25 @@ def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str
220265

221266
return events
222267

223-
def _extract_gen_ai_completion_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]:
268+
def _extract_gen_ai_completion_events(
269+
self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None
270+
) -> List[Event]:
271+
"""
272+
Extract gen_ai completion events from attributes.
273+
274+
Args:
275+
span: The source ReadableSpan that potentially contains LLO attributes
276+
attributes: Dictionary of span attributes to process
277+
event_timestamp: Optional timestamp to use instead of span.end_time
278+
279+
Returns:
280+
List[Event]: List of OpenTelemetry Events created from completion attributes
281+
"""
224282
events = []
225283
span_ctx = span.context
226284
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")
227285

228-
completion_timestamp = span.end_time
286+
completion_timestamp = event_timestamp if event_timestamp is not None else span.end_time
229287

230288
completion_content_pattern = re.compile(r"^gen_ai\.completion\.(\d+)\.content$")
231289

@@ -265,12 +323,28 @@ def _extract_gen_ai_completion_events(self, span: ReadableSpan, attributes: Dict
265323

266324
return events
267325

268-
def _extract_traceloop_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]:
326+
def _extract_traceloop_events(
327+
self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None
328+
) -> List[Event]:
329+
"""
330+
Extract traceloop events from attributes.
331+
332+
Args:
333+
span: The source ReadableSpan that potentially contains LLO attributes
334+
attributes: Dictionary of span attributes to process
335+
event_timestamp: Optional timestamp to use instead of span timestamps
336+
337+
Returns:
338+
List[Event]: List of OpenTelemetry Events created from traceloop attributes
339+
"""
269340
events = []
270341
span_ctx = span.context
271342
gen_ai_system = span.attributes.get("traceloop.entity.name", "unknown")
272343

273-
traceloop_attrs = [(TRACELOOP_ENTITY_INPUT, span.start_time), (TRACELOOP_ENTITY_OUTPUT, span.end_time)]
344+
input_timestamp = event_timestamp if event_timestamp is not None else span.start_time
345+
output_timestamp = event_timestamp if event_timestamp is not None else span.end_time
346+
347+
traceloop_attrs = [(TRACELOOP_ENTITY_INPUT, input_timestamp), (TRACELOOP_ENTITY_OUTPUT, output_timestamp)]
274348

275349
for attr_key, timestamp in traceloop_attrs:
276350
if attr_key in attributes:
@@ -285,6 +359,48 @@ def _extract_traceloop_events(self, span: ReadableSpan, attributes: Dict[str, An
285359

286360
return events
287361

362+
def _extract_openlit_span_event_attributes(
363+
self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None
364+
) -> List[Event]:
365+
"""
366+
Extract LLO attributes specifically from OpenLit span events, which use direct key-value pairs
367+
like `gen_ai.prompt` or `gen_ai.completion` in event attributes.
368+
"""
369+
events = []
370+
span_ctx = span.context
371+
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")
372+
373+
prompt_timestamp = event_timestamp if event_timestamp is not None else span.start_time
374+
completion_timestamp = event_timestamp if event_timestamp is not None else span.end_time
375+
376+
openlit_event_attrs = [
377+
("gen_ai.prompt", prompt_timestamp, "user"), # Assume user role for direct prompts
378+
("gen_ai.completion", completion_timestamp, "assistant"), # Assume assistant role for completions
379+
("gen_ai.content.revised_prompt", prompt_timestamp, "system"), # Assume system role for revised prompts
380+
]
381+
382+
for attr_key, timestamp, role in openlit_event_attrs:
383+
if attr_key in attributes:
384+
event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key}
385+
body = {"content": attributes[attr_key], "role": role}
386+
387+
if role == "user":
388+
event_name = GEN_AI_USER_MESSAGE
389+
elif role == "assistant":
390+
event_name = GEN_AI_ASSISTANT_MESSAGE
391+
elif role == "system":
392+
event_name = GEN_AI_SYSTEM_MESSAGE
393+
else:
394+
event_name = f"gen_ai.{gen_ai_system}.message"
395+
396+
event = self._get_gen_ai_event(
397+
name=event_name, span_ctx=span_ctx, timestamp=timestamp, attributes=event_attributes, body=body
398+
)
399+
400+
events.append(event)
401+
402+
return events
403+
288404
def _get_gen_ai_event(self, name, span_ctx, timestamp, attributes, body):
289405
"""
290406
Create and return a Gen AI Event with the provided parameters.

0 commit comments

Comments
 (0)