Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 55 additions & 55 deletions aws-opentelemetry-distro/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,61 +25,61 @@ classifiers = [
]

dependencies = [
"opentelemetry-api == 1.27.0",
"opentelemetry-sdk == 1.27.0",
"opentelemetry-exporter-otlp-proto-grpc == 1.27.0",
"opentelemetry-exporter-otlp-proto-http == 1.27.0",
"opentelemetry-propagator-b3 == 1.27.0",
"opentelemetry-propagator-jaeger == 1.27.0",
"opentelemetry-exporter-otlp-proto-common == 1.27.0",
"opentelemetry-sdk-extension-aws == 2.0.2",
"opentelemetry-propagator-aws-xray == 1.0.1",
"opentelemetry-distro == 0.48b0",
"opentelemetry-propagator-ot-trace == 0.48b0",
"opentelemetry-instrumentation == 0.48b0",
"opentelemetry-instrumentation-aws-lambda == 0.48b0",
"opentelemetry-instrumentation-aio-pika == 0.48b0",
"opentelemetry-instrumentation-aiohttp-client == 0.48b0",
"opentelemetry-instrumentation-aiopg == 0.48b0",
"opentelemetry-instrumentation-asgi == 0.48b0",
"opentelemetry-instrumentation-asyncpg == 0.48b0",
"opentelemetry-instrumentation-boto == 0.48b0",
"opentelemetry-instrumentation-boto3sqs == 0.48b0",
"opentelemetry-instrumentation-botocore == 0.48b0",
"opentelemetry-instrumentation-celery == 0.48b0",
"opentelemetry-instrumentation-confluent-kafka == 0.48b0",
"opentelemetry-instrumentation-dbapi == 0.48b0",
"opentelemetry-instrumentation-django == 0.48b0",
"opentelemetry-instrumentation-elasticsearch == 0.48b0",
"opentelemetry-instrumentation-falcon == 0.48b0",
"opentelemetry-instrumentation-fastapi == 0.48b0",
"opentelemetry-instrumentation-flask == 0.48b0",
"opentelemetry-instrumentation-grpc == 0.48b0",
"opentelemetry-instrumentation-httpx == 0.48b0",
"opentelemetry-instrumentation-jinja2 == 0.48b0",
"opentelemetry-instrumentation-kafka-python == 0.48b0",
"opentelemetry-instrumentation-logging == 0.48b0",
"opentelemetry-instrumentation-mysql == 0.48b0",
"opentelemetry-instrumentation-mysqlclient == 0.48b0",
"opentelemetry-instrumentation-pika == 0.48b0",
"opentelemetry-instrumentation-psycopg2 == 0.48b0",
"opentelemetry-instrumentation-pymemcache == 0.48b0",
"opentelemetry-instrumentation-pymongo == 0.48b0",
"opentelemetry-instrumentation-pymysql == 0.48b0",
"opentelemetry-instrumentation-pyramid == 0.48b0",
"opentelemetry-instrumentation-redis == 0.48b0",
"opentelemetry-instrumentation-remoulade == 0.48b0",
"opentelemetry-instrumentation-requests == 0.48b0",
"opentelemetry-instrumentation-sqlalchemy == 0.48b0",
"opentelemetry-instrumentation-sqlite3 == 0.48b0",
"opentelemetry-instrumentation-starlette == 0.48b0",
"opentelemetry-instrumentation-system-metrics == 0.48b0",
"opentelemetry-instrumentation-tornado == 0.48b0",
"opentelemetry-instrumentation-tortoiseorm == 0.48b0",
"opentelemetry-instrumentation-urllib == 0.48b0",
"opentelemetry-instrumentation-urllib3 == 0.48b0",
"opentelemetry-instrumentation-wsgi == 0.48b0",
"opentelemetry-instrumentation-cassandra == 0.48b0",
"opentelemetry-api >= 1.29.0",
"opentelemetry-sdk >= 1.29.0",
"opentelemetry-exporter-otlp-proto-grpc >= 1.29.0",
"opentelemetry-exporter-otlp-proto-http >= 1.29.0",
"opentelemetry-propagator-b3 >= 1.29.0",
"opentelemetry-propagator-jaeger >= 1.29.0",
"opentelemetry-exporter-otlp-proto-common >= 1.29.0",
"opentelemetry-sdk-extension-aws >= 2.0.2",
"opentelemetry-propagator-aws-xray >= 1.0.1",
"opentelemetry-distro >= 0.50b0",
"opentelemetry-propagator-ot-trace >= 0.50b0",
"opentelemetry-instrumentation >= 0.50b0",
"opentelemetry-instrumentation-aws-lambda >= 0.50b0",
"opentelemetry-instrumentation-aio-pika >= 0.50b0",
"opentelemetry-instrumentation-aiohttp-client >= 0.50b0",
"opentelemetry-instrumentation-aiopg >= 0.50b0",
"opentelemetry-instrumentation-asgi >= 0.50b0",
"opentelemetry-instrumentation-asyncpg >= 0.50b0",
"opentelemetry-instrumentation-boto >= 0.50b0",
"opentelemetry-instrumentation-boto3sqs >= 0.50b0",
"opentelemetry-instrumentation-botocore >= 0.50b0",
"opentelemetry-instrumentation-celery >= 0.50b0",
"opentelemetry-instrumentation-confluent-kafka >= 0.50b0",
"opentelemetry-instrumentation-dbapi >= 0.50b0",
"opentelemetry-instrumentation-django >= 0.50b0",
"opentelemetry-instrumentation-elasticsearch >= 0.50b0",
"opentelemetry-instrumentation-falcon >= 0.50b0",
"opentelemetry-instrumentation-fastapi >= 0.50b0",
"opentelemetry-instrumentation-flask >= 0.50b0",
"opentelemetry-instrumentation-grpc >= 0.50b0",
"opentelemetry-instrumentation-httpx >= 0.50b0",
"opentelemetry-instrumentation-jinja2 >= 0.50b0",
"opentelemetry-instrumentation-kafka-python >= 0.50b0",
"opentelemetry-instrumentation-logging >= 0.50b0",
"opentelemetry-instrumentation-mysql >= 0.50b0",
"opentelemetry-instrumentation-mysqlclient >= 0.50b0",
"opentelemetry-instrumentation-pika >= 0.50b0",
"opentelemetry-instrumentation-psycopg2 >= 0.50b0",
"opentelemetry-instrumentation-pymemcache >= 0.50b0",
"opentelemetry-instrumentation-pymongo >= 0.50b0",
"opentelemetry-instrumentation-pymysql >= 0.50b0",
"opentelemetry-instrumentation-pyramid >= 0.50b0",
"opentelemetry-instrumentation-redis >= 0.50b0",
"opentelemetry-instrumentation-remoulade >= 0.50b0",
"opentelemetry-instrumentation-requests >= 0.50b0",
"opentelemetry-instrumentation-sqlalchemy >= 0.50b0",
"opentelemetry-instrumentation-sqlite3 >= 0.50b0",
"opentelemetry-instrumentation-starlette >= 0.50b0",
"opentelemetry-instrumentation-system-metrics >= 0.50b0",
"opentelemetry-instrumentation-tornado >= 0.50b0",
"opentelemetry-instrumentation-tortoiseorm >= 0.50b0",
"opentelemetry-instrumentation-urllib >= 0.50b0",
"opentelemetry-instrumentation-urllib3 >= 0.50b0",
"opentelemetry-instrumentation-wsgi >= 0.50b0",
"opentelemetry-instrumentation-cassandra >= 0.50b0",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

from typing import Dict

class LLOSenderClient:
"""Skeleton client for handling Large Language Objects (LLO)"""

def __init__(self):
self._bucket_name = "genai-llo-bucket"
self._logger = logging.getLogger(__name__)
self._logger.info("Initialized mock LLO sender client")

def upload(self, llo_content: str, metadata: Dict[str, str]) -> str:
"""Mock upload that returns a dummy S3 pointer

Args:
llo_content: For now we assume this will be a str that contains the LLM input/output
metadata: Metadata associated with the LLO content, such as trace_id, span_id

Returns:
str: S3 pointer to the uploaded LLO content
"""
attribute_name = metadata.get("attribute_name", "unknown")
self._logger.debug(f"LLO content: {llo_content}")
self._logger.debug(f"Mock upload of LLO attribute: {attribute_name}")
return f"s3://{self._bucket_name}/{metadata.get('trace_id', 'trace')}/{metadata.get('span_id', 'span')}/{attribute_name}"
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Dict, Optional
from typing import Dict, Optional, Sequence

import re
import requests

from amazon.opentelemetry.distro._utils import is_installed
from amazon.opentelemetry.distro.llo_sender_client import LLOSenderClient
from opentelemetry.attributes import BoundedAttributes
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExportResult

AWS_SERVICE = "xray"
_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -36,6 +41,7 @@ def __init__(

self._aws_region = None
self._has_required_dependencies = False
self._llo_sender_client = LLOSenderClient()
# Requires botocore to be installed to sign the headers. However,
# some users might not need to use this exporter. In order not conflict
# with existing behavior, we check for botocore before initializing this exporter.
Expand Down Expand Up @@ -71,6 +77,69 @@ def __init__(
session=rsession,
)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
modified_spans = []

for span in spans:
updated_attributes = {}

# Copy all original attributes and handle LLO data
for key, value in span.attributes.items():
if self._should_offload(key):
metadata = {
"trace_id": format(span.context.trace_id, 'x'),
"span_id": format(span.context.span_id, 'x'),
"attribute_name": key,
"span_name": span.name
}

# Get S3 pointer from LLOSenderClient
s3_pointer = self._llo_sender_client.upload(value, metadata)

# Store the S3 pointer instead of original value to trim span
updated_attributes[key] = s3_pointer
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do if LLO handling fails?

else:
# Keep original value if it is not LLO
updated_attributes[key] = value

# Create a new span with updated attributes
if isinstance(span.attributes, BoundedAttributes):
span._attributes = BoundedAttributes(
maxlen=span.attributes.maxlen,
attributes=updated_attributes,
immutable=span.attributes._immutable,
max_value_len=span.attributes.max_value_len
)
else:
span._attributes = updated_attributes

modified_spans.append(span)

# Export the modified spans
return super().export(modified_spans)

def _should_offload(self, key):
"""Determine if LLO based on the attribute key. Strict matching is enforced as to not introduce unintended behavior."""
exact_match_patterns = [
"traceloop.entity.input",
"traceloop.entity.output",
"message.content",
"input.value",
"output.value",
]

regex_match_patterns = [
r"^gen_ai\.prompt\.\d+\.content$",
r"^gen_ai\.completion\.\d+\.content$",
r"^llm.input_messages\.\d+\.message.content$",
r"^llm.output_messages\.\d+\.message.content$",
]

return (
any(pattern == key for pattern in exact_match_patterns) or
any(re.match(pattern, key) for pattern in regex_match_patterns)
)

# Overrides upstream's private implementation of _export. All behaviors are
# the same except if the endpoint is an XRay OTLP endpoint, we will sign the request
# with SigV4 in headers before sending it to the endpoint. Otherwise, we will skip signing.
Expand Down