|
3 | 3 | import logging |
4 | 4 | from typing import Dict, Optional, Sequence |
5 | 5 |
|
6 | | -import re |
7 | 6 | import requests |
8 | 7 |
|
9 | 8 | from amazon.opentelemetry.distro._utils import is_installed |
10 | | -from amazon.opentelemetry.distro.llo_sender_client import LLOSenderClient |
11 | | -from opentelemetry.attributes import BoundedAttributes |
| 9 | +from amazon.opentelemetry.distro.llo_handler import LLOHandler |
12 | 10 | from opentelemetry.exporter.otlp.proto.http import Compression |
13 | 11 | from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter |
14 | | -from opentelemetry.sdk.trace import ReadableSpan, Event |
| 12 | +from opentelemetry.sdk.trace import ReadableSpan |
15 | 13 | from opentelemetry.sdk.trace.export import SpanExportResult |
16 | 14 |
|
17 | 15 | AWS_SERVICE = "xray" |
@@ -41,7 +39,7 @@ def __init__( |
41 | 39 |
|
42 | 40 | self._aws_region = None |
43 | 41 | self._has_required_dependencies = False |
44 | | - self._llo_sender_client = LLOSenderClient() |
| 42 | + self._llo_handler = LLOHandler() |
45 | 43 | # Requires botocore to be installed to sign the headers. However, |
46 | 44 | # some users might not need to use this exporter. In order not conflict |
47 | 45 | # with existing behavior, we check for botocore before initializing this exporter. |
@@ -78,123 +76,12 @@ def __init__( |
78 | 76 | ) |
79 | 77 |
|
80 | 78 | def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: |
81 | | - modified_spans = [] |
82 | | - |
83 | | - for span in spans: |
84 | | - # Process span attributes |
85 | | - updated_attributes = {} |
86 | | - |
87 | | - # Copy all original attributes and handle LLO data |
88 | | - for key, value in span.attributes.items(): |
89 | | - if self._should_offload(key): |
90 | | - metadata = { |
91 | | - "trace_id": format(span.context.trace_id, 'x'), |
92 | | - "span_id": format(span.context.span_id, 'x'), |
93 | | - "attribute_name": key, |
94 | | - "span_name": span.name |
95 | | - } |
96 | | - |
97 | | - # Get S3 pointer from LLOSenderClient |
98 | | - s3_pointer = self._llo_sender_client.upload(value, metadata) |
99 | | - |
100 | | - # Store the S3 pointer instead of original value to trim span |
101 | | - updated_attributes[key] = s3_pointer |
102 | | - else: |
103 | | - # Keep original value if it is not LLO |
104 | | - updated_attributes[key] = value |
105 | | - |
106 | | - # Update span attributes |
107 | | - if isinstance(span.attributes, BoundedAttributes): |
108 | | - span._attributes = BoundedAttributes( |
109 | | - maxlen=span.attributes.maxlen, |
110 | | - attributes=updated_attributes, |
111 | | - immutable=span.attributes._immutable, |
112 | | - max_value_len=span.attributes.max_value_len |
113 | | - ) |
114 | | - else: |
115 | | - span._attributes = updated_attributes |
116 | | - |
117 | | - # Process span events |
118 | | - if span.events: |
119 | | - updated_events = [] |
120 | | - |
121 | | - for event in span.events: |
122 | | - # Check if this event has any attributes to process |
123 | | - if not event.attributes: |
124 | | - updated_events.append(event) # Keep the original event |
125 | | - continue |
126 | | - |
127 | | - # Process event attributes for LLO content |
128 | | - updated_event_attributes = {} |
129 | | - need_to_update = False |
130 | | - |
131 | | - for key, value in event.attributes.items(): |
132 | | - if self._should_offload(key): |
133 | | - metadata = { |
134 | | - "trace_id": format(span.context.trace_id, 'x'), |
135 | | - "span_id": format(span.context.span_id, 'x'), |
136 | | - "attribute_name": key, |
137 | | - "event_name": event.name, |
138 | | - "event_time": str(event.timestamp) |
139 | | - } |
140 | | - |
141 | | - s3_pointer = self._llo_sender_client.upload(value, metadata) |
142 | | - updated_event_attributes[key] = s3_pointer |
143 | | - need_to_update = True |
144 | | - else: |
145 | | - updated_event_attributes[key] = value |
146 | | - |
147 | | - if need_to_update: |
148 | | - # Create new Event with the updated attributes |
149 | | - limit = None |
150 | | - if isinstance(event.attributes, BoundedAttributes): |
151 | | - limit = event.attributes.maxlen |
152 | | - |
153 | | - updated_event = Event( |
154 | | - name=event.name, |
155 | | - attributes=updated_event_attributes, |
156 | | - timestamp=event.timestamp, |
157 | | - limit=limit |
158 | | - ) |
159 | | - |
160 | | - updated_events.append(updated_event) |
161 | | - else: |
162 | | - # Keep the original event |
163 | | - updated_events.append(event) |
164 | | - |
165 | | - # Update the span's events with processed events |
166 | | - span._events = updated_events |
167 | | - |
168 | | - modified_spans.append(span) |
| 79 | + # Process spans to handle LLO attributes |
| 80 | + modified_spans = self._llo_handler.process_spans(spans) |
169 | 81 |
|
170 | 82 | # Export the modified spans |
171 | 83 | return super().export(modified_spans) |
172 | 84 |
|
173 | | - def _should_offload(self, key): |
174 | | - """Determine if LLO based on the attribute key. Strict matching is enforced as to not introduce unintended behavior.""" |
175 | | - exact_match_patterns = [ |
176 | | - "traceloop.entity.input", |
177 | | - "traceloop.entity.output", |
178 | | - "message.content", |
179 | | - "input.value", |
180 | | - "output.value", |
181 | | - "gen_ai.prompt", |
182 | | - "gen_ai.completion", |
183 | | - "gen_ai.content.revised_prompt", |
184 | | - ] |
185 | | - |
186 | | - regex_match_patterns = [ |
187 | | - r"^gen_ai\.prompt\.\d+\.content$", |
188 | | - r"^gen_ai\.completion\.\d+\.content$", |
189 | | - r"^llm.input_messages\.\d+\.message.content$", |
190 | | - r"^llm.output_messages\.\d+\.message.content$", |
191 | | - ] |
192 | | - |
193 | | - return ( |
194 | | - any(pattern == key for pattern in exact_match_patterns) or |
195 | | - any(re.match(pattern, key) for pattern in regex_match_patterns) |
196 | | - ) |
197 | | - |
198 | 85 | # Overrides upstream's private implementation of _export. All behaviors are |
199 | 86 | # the same except if the endpoint is an XRay OTLP endpoint, we will sign the request |
200 | 87 | # with SigV4 in headers before sending it to the endpoint. Otherwise, we will skip signing. |
|
0 commit comments