Skip to content

Commit 13e674d

Browse files
committed
emit llo span data to logs pipeline
1 parent 28f493d commit 13e674d

File tree

1 file changed

+41
-56
lines changed

1 file changed

+41
-56
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py

Lines changed: 41 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,24 @@
33
import logging
44
from typing import Dict, Optional, Sequence
55

6+
import os
67
import re
78
import requests
89

910
from amazon.opentelemetry.distro._utils import is_installed
1011
from amazon.opentelemetry.distro.llo_sender_client import LLOSenderClient
12+
from amazon.opentelemetry.distro.otlp_aws_logs_exporter import OTLPAwsLogExporter
1113
from opentelemetry.attributes import BoundedAttributes
1214
from opentelemetry.exporter.otlp.proto.http import Compression
1315
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14-
from opentelemetry.sdk.trace import ReadableSpan, Event
16+
from opentelemetry.sdk.trace import ReadableSpan
1517
from opentelemetry.sdk.trace.export import SpanExportResult
18+
from opentelemetry.sdk._logs import LoggerProvider, LogRecord
19+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
20+
from opentelemetry._logs import get_logger
21+
from opentelemetry._logs.severity import SeverityNumber
22+
from opentelemetry.trace import TraceFlags
23+
1624

1725
AWS_SERVICE = "xray"
1826
_logger = logging.getLogger(__name__)
@@ -42,6 +50,11 @@ def __init__(
4250
self._aws_region = None
4351
self._has_required_dependencies = False
4452
self._llo_sender_client = LLOSenderClient()
53+
# set up logger for LLO data
54+
self._logs_exporter = OTLPAwsLogExporter(endpoint=os.getenv("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"))
55+
self._logger_provider = LoggerProvider()
56+
self._logger_provider.add_log_record_processor(BatchLogRecordProcessor(self._logs_exporter))
57+
self._logger = get_logger("llo_logger", logger_provider=self._logger_provider)
4558
# Requires botocore to be installed to sign the headers. However,
4659
# some users might not need to use this exporter. In order not conflict
4760
# with existing behavior, we check for botocore before initializing this exporter.
@@ -94,11 +107,30 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
94107
"span_name": span.name
95108
}
96109

97-
# Get S3 pointer from LLOSenderClient
98-
s3_pointer = self._llo_sender_client.upload(value, metadata)
110+
# Emit a log record for LLO data
111+
llo_attributes = {
112+
"llo.attribute_name": metadata.get('attribute_name', 'unknown'),
113+
"llo.content": value,
114+
"llo.span_name": metadata.get('span_name', ''),
115+
"service.name": "llm-app",
116+
117+
}
118+
119+
self._logger.emit(
120+
LogRecord(
121+
timestamp=span.start_time,
122+
observed_timestamp=span.start_time,
123+
trace_id=span.context.trace_id,
124+
span_id=span.context.span_id,
125+
trace_flags=TraceFlags(0x01),
126+
severity_text="INFO",
127+
severity_number=SeverityNumber.INFO,
128+
body=f"LLO attribute: {metadata.get('attribute_name')}",
129+
attributes=llo_attributes
130+
)
131+
)
99132

100-
# Store the S3 pointer instead of original value to trim span
101-
updated_attributes[key] = s3_pointer
133+
# We drop the LLO attribute (don't add to updated_attributes)
102134
else:
103135
# Keep original value if it is not LLO
104136
updated_attributes[key] = value
@@ -114,62 +146,15 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
114146
else:
115147
span._attributes = updated_attributes
116148

117-
# Process span events
118-
if span.events:
119-
updated_events = []
120-
121-
for event in span.events:
122-
# Check if this event has any attributes to process
123-
if not event.attributes:
124-
updated_events.append(event) # Keep the original event
125-
continue
126-
127-
# Process event attributes for LLO content
128-
updated_event_attributes = {}
129-
need_to_update = False
130-
131-
for key, value in event.attributes.items():
132-
if self._should_offload(key):
133-
metadata = {
134-
"trace_id": format(span.context.trace_id, 'x'),
135-
"span_id": format(span.context.span_id, 'x'),
136-
"attribute_name": key,
137-
"event_name": event.name,
138-
"event_time": str(event.timestamp)
139-
}
140-
141-
s3_pointer = self._llo_sender_client.upload(value, metadata)
142-
updated_event_attributes[key] = s3_pointer
143-
need_to_update = True
144-
else:
145-
updated_event_attributes[key] = value
146-
147-
if need_to_update:
148-
# Create new Event with the updated attributes
149-
limit = None
150-
if isinstance(event.attributes, BoundedAttributes):
151-
limit = event.attributes.maxlen
152-
153-
updated_event = Event(
154-
name=event.name,
155-
attributes=updated_event_attributes,
156-
timestamp=event.timestamp,
157-
limit=limit
158-
)
159-
160-
updated_events.append(updated_event)
161-
else:
162-
# Keep the original event
163-
updated_events.append(event)
164-
165-
# Update the span's events with processed events
166-
span._events = updated_events
167-
149+
# Add modified span to list
168150
modified_spans.append(span)
169151

152+
# TODO: Process span events
153+
170154
# Export the modified spans
171155
return super().export(modified_spans)
172156

157+
173158
def _should_offload(self, key):
174159
"""Determine if LLO based on the attribute key. Strict matching is enforced as to not introduce unintended behavior."""
175160
exact_match_patterns = [

0 commit comments

Comments
 (0)