diff --git a/aws-opentelemetry-distro/pyproject.toml b/aws-opentelemetry-distro/pyproject.toml index d4f9ca204..cc738c435 100644 --- a/aws-opentelemetry-distro/pyproject.toml +++ b/aws-opentelemetry-distro/pyproject.toml @@ -25,61 +25,61 @@ classifiers = [ ] dependencies = [ - "opentelemetry-api == 1.27.0", - "opentelemetry-sdk == 1.27.0", - "opentelemetry-exporter-otlp-proto-grpc == 1.27.0", - "opentelemetry-exporter-otlp-proto-http == 1.27.0", - "opentelemetry-propagator-b3 == 1.27.0", - "opentelemetry-propagator-jaeger == 1.27.0", - "opentelemetry-exporter-otlp-proto-common == 1.27.0", - "opentelemetry-sdk-extension-aws == 2.0.2", - "opentelemetry-propagator-aws-xray == 1.0.1", - "opentelemetry-distro == 0.48b0", - "opentelemetry-propagator-ot-trace == 0.48b0", - "opentelemetry-instrumentation == 0.48b0", - "opentelemetry-instrumentation-aws-lambda == 0.48b0", - "opentelemetry-instrumentation-aio-pika == 0.48b0", - "opentelemetry-instrumentation-aiohttp-client == 0.48b0", - "opentelemetry-instrumentation-aiopg == 0.48b0", - "opentelemetry-instrumentation-asgi == 0.48b0", - "opentelemetry-instrumentation-asyncpg == 0.48b0", - "opentelemetry-instrumentation-boto == 0.48b0", - "opentelemetry-instrumentation-boto3sqs == 0.48b0", - "opentelemetry-instrumentation-botocore == 0.48b0", - "opentelemetry-instrumentation-celery == 0.48b0", - "opentelemetry-instrumentation-confluent-kafka == 0.48b0", - "opentelemetry-instrumentation-dbapi == 0.48b0", - "opentelemetry-instrumentation-django == 0.48b0", - "opentelemetry-instrumentation-elasticsearch == 0.48b0", - "opentelemetry-instrumentation-falcon == 0.48b0", - "opentelemetry-instrumentation-fastapi == 0.48b0", - "opentelemetry-instrumentation-flask == 0.48b0", - "opentelemetry-instrumentation-grpc == 0.48b0", - "opentelemetry-instrumentation-httpx == 0.48b0", - "opentelemetry-instrumentation-jinja2 == 0.48b0", - "opentelemetry-instrumentation-kafka-python == 0.48b0", - "opentelemetry-instrumentation-logging == 0.48b0", - "opentelemetry-instrumentation-mysql == 0.48b0", - "opentelemetry-instrumentation-mysqlclient == 0.48b0", - "opentelemetry-instrumentation-pika == 0.48b0", - "opentelemetry-instrumentation-psycopg2 == 0.48b0", - "opentelemetry-instrumentation-pymemcache == 0.48b0", - "opentelemetry-instrumentation-pymongo == 0.48b0", - "opentelemetry-instrumentation-pymysql == 0.48b0", - "opentelemetry-instrumentation-pyramid == 0.48b0", - "opentelemetry-instrumentation-redis == 0.48b0", - "opentelemetry-instrumentation-remoulade == 0.48b0", - "opentelemetry-instrumentation-requests == 0.48b0", - "opentelemetry-instrumentation-sqlalchemy == 0.48b0", - "opentelemetry-instrumentation-sqlite3 == 0.48b0", - "opentelemetry-instrumentation-starlette == 0.48b0", - "opentelemetry-instrumentation-system-metrics == 0.48b0", - "opentelemetry-instrumentation-tornado == 0.48b0", - "opentelemetry-instrumentation-tortoiseorm == 0.48b0", - "opentelemetry-instrumentation-urllib == 0.48b0", - "opentelemetry-instrumentation-urllib3 == 0.48b0", - "opentelemetry-instrumentation-wsgi == 0.48b0", - "opentelemetry-instrumentation-cassandra == 0.48b0", + "opentelemetry-api >= 1.29.0", + "opentelemetry-sdk >= 1.29.0", + "opentelemetry-exporter-otlp-proto-grpc >= 1.29.0", + "opentelemetry-exporter-otlp-proto-http >= 1.29.0", + "opentelemetry-propagator-b3 >= 1.29.0", + "opentelemetry-propagator-jaeger >= 1.29.0", + "opentelemetry-exporter-otlp-proto-common >= 1.29.0", + "opentelemetry-sdk-extension-aws >= 2.0.2", + "opentelemetry-propagator-aws-xray >= 1.0.1", + "opentelemetry-distro >= 0.50b0", + "opentelemetry-propagator-ot-trace >= 0.50b0", + "opentelemetry-instrumentation >= 0.50b0", + "opentelemetry-instrumentation-aws-lambda >= 0.50b0", + "opentelemetry-instrumentation-aio-pika >= 0.50b0", + "opentelemetry-instrumentation-aiohttp-client >= 0.50b0", + "opentelemetry-instrumentation-aiopg >= 0.50b0", + "opentelemetry-instrumentation-asgi >= 0.50b0", + "opentelemetry-instrumentation-asyncpg >= 0.50b0", + "opentelemetry-instrumentation-boto >= 0.50b0", + "opentelemetry-instrumentation-boto3sqs >= 0.50b0", + "opentelemetry-instrumentation-botocore >= 0.50b0", + "opentelemetry-instrumentation-celery >= 0.50b0", + "opentelemetry-instrumentation-confluent-kafka >= 0.50b0", + "opentelemetry-instrumentation-dbapi >= 0.50b0", + "opentelemetry-instrumentation-django >= 0.50b0", + "opentelemetry-instrumentation-elasticsearch >= 0.50b0", + "opentelemetry-instrumentation-falcon >= 0.50b0", + "opentelemetry-instrumentation-fastapi >= 0.50b0", + "opentelemetry-instrumentation-flask >= 0.50b0", + "opentelemetry-instrumentation-grpc >= 0.50b0", + "opentelemetry-instrumentation-httpx >= 0.50b0", + "opentelemetry-instrumentation-jinja2 >= 0.50b0", + "opentelemetry-instrumentation-kafka-python >= 0.50b0", + "opentelemetry-instrumentation-logging >= 0.50b0", + "opentelemetry-instrumentation-mysql >= 0.50b0", + "opentelemetry-instrumentation-mysqlclient >= 0.50b0", + "opentelemetry-instrumentation-pika >= 0.50b0", + "opentelemetry-instrumentation-psycopg2 >= 0.50b0", + "opentelemetry-instrumentation-pymemcache >= 0.50b0", + "opentelemetry-instrumentation-pymongo >= 0.50b0", + "opentelemetry-instrumentation-pymysql >= 0.50b0", + "opentelemetry-instrumentation-pyramid >= 0.50b0", + "opentelemetry-instrumentation-redis >= 0.50b0", + "opentelemetry-instrumentation-remoulade >= 0.50b0", + "opentelemetry-instrumentation-requests >= 0.50b0", + "opentelemetry-instrumentation-sqlalchemy >= 0.50b0", + "opentelemetry-instrumentation-sqlite3 >= 0.50b0", + "opentelemetry-instrumentation-starlette >= 0.50b0", + "opentelemetry-instrumentation-system-metrics >= 0.50b0", + "opentelemetry-instrumentation-tornado >= 0.50b0", + "opentelemetry-instrumentation-tortoiseorm >= 0.50b0", + "opentelemetry-instrumentation-urllib >= 0.50b0", + "opentelemetry-instrumentation-urllib3 >= 0.50b0", + "opentelemetry-instrumentation-wsgi >= 0.50b0", + "opentelemetry-instrumentation-cassandra >= 0.50b0", ] [project.optional-dependencies] diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 16820721c..019bdad99 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -3,7 +3,7 @@ # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. import os import re -from logging import Logger, getLogger +from logging import NOTSET, Logger, getLogger from typing import ClassVar, Dict, List, Type, Union from importlib_metadata import version @@ -21,11 +21,13 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder +from amazon.opentelemetry.distro.otlp_aws_logs_exporter import OTLPAwsLogExporter from amazon.opentelemetry.distro.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView +from opentelemetry._logs import set_logger_provider from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.metrics import set_meter_provider @@ -36,9 +38,10 @@ _import_exporters, _import_id_generator, _import_sampler, - _init_logging, _OTelSDKConfigurator, ) +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter from opentelemetry.sdk.environment_variables import ( _OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, OTEL_EXPORTER_OTLP_METRICS_PROTOCOL, @@ -84,6 +87,8 @@ OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED" SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics" OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" +OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" +AGENT_OBSERVABILITY_ENABLED = "true" XRAY_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$" # UDP package size is not larger than 64KB LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10 @@ -160,6 +165,27 @@ def _initialize_components(): _init_logging(log_exporters, resource) +def _init_logging( + exporters: Dict[str, Type[LogExporter]], + resource: Resource = None, +): + provider = LoggerProvider(resource=resource) + set_logger_provider(provider) + + for _, exporter_class in exporters.items(): + exporter_args = {} + log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource) + provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) + + handler = LoggingHandler(level=NOTSET, logger_provider=provider) + + getLogger().addHandler(handler) + + +def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter: + return OTLPAwsLogExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)) + + def _init_tracing( exporters: Dict[str, Type[SpanExporter]], id_generator: IdGenerator = None, @@ -324,8 +350,15 @@ def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> Span _logger.info("Detected using AWS OTLP XRay Endpoint.") if isinstance(span_exporter, OTLPSpanExporter): - span_exporter = OTLPAwsSpanExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)) - + if AGENT_OBSERVABILITY_ENABLED == "true": + logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) + logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint) + span_exporter = OTLPAwsSpanExporter( + endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT), + logs_exporter=logs_exporter + ) + else: + span_exporter = OTLPAwsSpanExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)) else: _logger.warning( "Improper configuration see: please export/set " diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py new file mode 100644 index 000000000..8d0b334c0 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -0,0 +1,306 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +import logging +import re +from typing import Dict, Any, List, Sequence + +from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry._events import Event +from opentelemetry.sdk._events import EventLoggerProvider +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes as GenAIAttributes + +from amazon.opentelemetry.distro.otlp_aws_logs_exporter import OTLPAwsLogExporter + +_logger = logging.getLogger(__name__) + + +class LLOHandler: + """ + Utility class for handling Large Language Model Output (LLO) attributes. + This class identifies LLO attributes, emits them as log records, + and filters them out from telemetry data. + """ + + def __init__(self, logs_exporter: OTLPAwsLogExporter): + self._exact_match_patterns = [ + "traceloop.entity.input", + "traceloop.entity.output", + "message.content", + "input.value", + "output.value", + "gen_ai.prompt", + "gen_ai.completion", + "gen_ai.content.revised_prompt", + ] + + self._regex_match_patterns = [ + r"^gen_ai\.prompt\.\d+\.content$", + r"^gen_ai\.completion\.\d+\.content$", + r"^llm.input_messages\.\d+\.message.content$", + r"^llm.output_messages\.\d+\.message.content$", + ] + + self._logs_exporter = logs_exporter + self._logger_provider = LoggerProvider() + self._logger_provider.add_log_record_processor( + BatchLogRecordProcessor(self._logs_exporter) + ) + + + self._event_logger_provider = EventLoggerProvider(logger_provider=self._logger_provider) + self._event_logger = self._event_logger_provider.get_event_logger("gen_ai.events") + + def is_llo_attribute(self, key: str) -> bool: + """ + Determine if an attribute is LLO based on its key. + Strict matching is enforced to avoid unintended behavior. + + Args: + key: The attribute key to check + + Returns: + True if the key represents an LLO attribute, False otherwise + """ + return ( + any(pattern == key for pattern in self._exact_match_patterns) or + any(re.match(pattern, key) for pattern in self._regex_match_patterns) + ) + + def filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """ + Filter out LLO attributes from a dictionary of attributes. + + Args: + attributes: Dictionary of attribute key-value pairs + + Returns: + A new dictionary with LLO attributes removed + """ + filtered_attributes = {} + for key, value in attributes.items(): + if not self.is_llo_attribute(key): + filtered_attributes[key] = value + return filtered_attributes + + + def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]: + """ + Extract gen_ai prompt events from attributes. + + Returns: + A list of Event objects + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + prompt_timestamp = span.start_time + + prompt_content_pattern = re.compile(r"^gen_ai\.prompt\.(\d+)\.content$") + + for key, value in attributes.items(): + match = prompt_content_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"gen_ai.prompt.{index}.role" + role = attributes.get(role_key, "user") + + event = Event( + name=f"gen_ai.{role}.message", + attributes={ + GenAIAttributes.GEN_AI_SYSTEM: gen_ai_system, + "event.name": f"gen_ai.{role}.message", + "original_attribute.name": role_key + }, + body={ + "content": value + }, + timestamp=prompt_timestamp, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags + ) + events.append(event) + + return events + + + def _extract_gen_ai_completion_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]: + """ + Extract gen_ai completion events from attributes. + + Returns: + A list of Event objects + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + completion_timestamp = span.end_time + + completion_content_pattern = re.compile(r"^gen_ai\.completion\.(\d+)\.content$") + + for key, value in attributes.items(): + match = completion_content_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"gen_ai.completion.{index}.role" + role = attributes.get(role_key, "assistant") + + event = Event( + name="gen_ai.choice", + attributes={ + GenAIAttributes.GEN_AI_SYSTEM: gen_ai_system, + "event.name": "gen_ai.choice", + "original_attribute.name": role_key + }, + body={ + "index": int(index), + "finish_reason": attributes.get("gen_ai.finish_reason", "stop"), + "message": { + "role": role, + "content": value + } + }, + timestamp=completion_timestamp, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags + ) + events.append(event) + + return events + + + def _extract_traceloop_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]: + """ + Extract events from traceloop specific attributes. + + Returns: + A list of Event objects + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + start_timestamp = span.start_time + end_timestamp = span.end_time + + if "traceloop.entity.input" in attributes: + input_content = attributes["traceloop.entity.input"] + + event = Event( + name="gen_ai.framework.event", # Use generic framework event name for now + attributes={ + GenAIAttributes.GEN_AI_SYSTEM: gen_ai_system, + "framework.name": "traceloop", + "framework.event.type": "input", + "original_attribute.name": "traceloop.entity.input" + }, + body={ + "framework.traceloop.entity.input": input_content + }, + timestamp=start_timestamp, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags + ) + events.append(event) + + if "traceloop.entity.output" in attributes: + output_content = attributes["traceloop.entity.output"] + + event = Event( + name="gen_ai.framework.event", + attributes={ + GenAIAttributes.GEN_AI_SYSTEM: gen_ai_system, + "framework.name": "traceloop", + "framework.event.type": "output", + "original_attribute.name": "traceloop.entity.output" + }, + body={ + "framework.traceloop.entity.output": output_content + }, + timestamp=end_timestamp, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags + ) + events.append(event) + + return events + + + def emit_llo_attributes(self, span: ReadableSpan, attributes: Dict[str, Any]) -> None: + """ + Extract, transform, and emit LLO attributes as OpenTelemetry events. + + Args: + span: The span containing the LLO attributes + attributes: Dictionary of attributes to check for LLO attributes + """ + if not self._event_logger: + return + + try: + all_events = [] + all_events.extend(self._extract_gen_ai_prompt_events(span, attributes)) + all_events.extend(self._extract_gen_ai_completion_events(span, attributes)) + all_events.extend(self._extract_traceloop_events(span, attributes)) + + for event in all_events: + self._event_logger.emit(event) + _logger.debug(f"Emitted GenAI event: {event.name}") + except Exception as e: + _logger.error(f"Error emitting GenAI events: {e}", exc_info=True) + + + def update_span_attributes(self, span: ReadableSpan) -> None: + """ + Update span attributes by: + 1. Emitting LLO attributes as log records (if logger is configured) + 2. Filtering out LLO attributes from the span + + Args: + span: The span to update + """ + self.emit_llo_attributes(span, span.attributes) + updated_attributes = self.filter_attributes(span.attributes) + + if isinstance(span.attributes, BoundedAttributes): + span._attributes = BoundedAttributes( + maxlen=span.attributes.maxlen, + attributes=updated_attributes, + immutable=span.attributes._immutable, + max_value_len=span.attributes.max_value_len + ) + else: + span._attributes = updated_attributes + + + def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]: + """ + Process a list of spans by: + 1. Emitting LLO attributes as log records (if logger is configured) + 2. Filtering out LLO attributes from both span attributes and event attributes + + Args: + spans: List of spans to process + + Returns: + List of processed spans with LLO attributes removed + """ + modified_spans = [] + + for span in spans: + self.update_span_attributes(span) + modified_spans.append(span) + + return modified_spans diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_logs_exporter.py new file mode 100644 index 000000000..2b39034b5 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_logs_exporter.py @@ -0,0 +1,100 @@ +import logging +import os +from typing import Dict, Optional, Sequence + +import requests + +from amazon.opentelemetry.distro._utils import is_installed +from opentelemetry.exporter.otlp.proto.http import Compression +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.sdk._logs import LogRecord +from opentelemetry.sdk._logs.export import LogExportResult + +# For CloudWatch Logs, the service name is 'logs' not 'xray' +AWS_SERVICE = "logs" +AWS_CLOUDWATCH_LOG_GROUP_ENV = "AWS_CLOUDWATCH_LOG_GROUP" +AWS_CLOUDWATCH_LOG_STREAM_ENV = "AWS_CLOUDWATCH_LOG_STREAM" +# Set up more verbose logging +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.DEBUG) +# Add a console handler if not already present +if not _logger.handlers: + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + _logger.addHandler(console_handler) + +class OTLPAwsLogExporter(OTLPLogExporter): + def __init__( + self, + endpoint: Optional[str] = None, + certificate_file: Optional[str] = None, + client_key_file: Optional[str] = None, + client_certificate_file: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, + rsession: Optional[requests.Session] = None, + ): + self._aws_region = None + self._has_required_dependencies = False + + if endpoint and is_installed("botocore"): + # pylint: disable=import-outside-toplevel + from botocore import auth, awsrequest, session + self.boto_auth = auth + self.boto_aws_request = awsrequest + self.boto_session = session.Session() + + # For logs endpoint https://logs.[region].amazonaws.com/v1/logs + self._aws_region = endpoint.split(".")[1] + self._has_required_dependencies = True + else: + _logger.error( + "botocore is required to export logs to %s. Please install it using `pip install botocore`", + endpoint, + ) + + super().__init__( + endpoint=endpoint, + certificate_file=certificate_file, + client_key_file=client_key_file, + client_certificate_file=client_certificate_file, + headers=headers, + timeout=timeout, + compression=compression, + session=rsession, + ) + + def _export(self, serialized_data: bytes): + try: + if self._has_required_dependencies: + request = self.boto_aws_request.AWSRequest( + method="POST", + url=self._endpoint, + data=serialized_data, + headers={"Content-Type": "application/x-protobuf"}, + ) + + credentials = self.boto_session.get_credentials() + + if credentials is not None: + signer = self.boto_auth.SigV4Auth(credentials, AWS_SERVICE, self._aws_region) + + try: + signer.add_auth(request) + self._session.headers.update(dict(request.headers)) + except Exception as signing_error: # pylint: disable=broad-except + _logger.error(f"Failed to sign request: {signing_error}") + else: + _logger.error("Failed to obtain AWS credentials for SigV4 signing") + else: + _logger.warning(f"SigV4 authentication not available for {self._endpoint}. Falling back to unsigned request.") + + result = super()._export(serialized_data) + return result + except Exception as e: + _logger.exception(f"Exception in _export: {str(e)}") + # Still try to call the parent method in case it can handle the error + return super()._export(serialized_data) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py index 7c00b51a8..5ff3df025 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_aws_span_exporter.py @@ -1,15 +1,21 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import logging -from typing import Dict, Optional +import os +from typing import Dict, Optional, Sequence import requests from amazon.opentelemetry.distro._utils import is_installed +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from amazon.opentelemetry.distro.otlp_aws_logs_exporter import OTLPAwsLogExporter from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult AWS_SERVICE = "xray" +AGENT_OBSERVABILITY_ENABLED = os.environ.get("AGENT_OBSERVABILITY_ENABLED", "false") _logger = logging.getLogger(__name__) @@ -32,10 +38,12 @@ def __init__( timeout: Optional[int] = None, compression: Optional[Compression] = None, rsession: Optional[requests.Session] = None, + logs_exporter: Optional[OTLPAwsLogExporter] = None, ): self._aws_region = None self._has_required_dependencies = False + self._llo_handler = LLOHandler(logs_exporter) # Requires botocore to be installed to sign the headers. However, # some users might not need to use this exporter. In order not conflict # with existing behavior, we check for botocore before initializing this exporter. @@ -71,6 +79,16 @@ def __init__( session=rsession, ) + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + # Process spans to handle LLO attributes + if AGENT_OBSERVABILITY_ENABLED == "true": + spans_to_export = self._llo_handler.process_spans(spans) + else: + spans_to_export = spans + + # Export the modified spans + return super().export(spans_to_export) + # Overrides upstream's private implementation of _export. All behaviors are # the same except if the endpoint is an XRay OTLP endpoint, we will sign the request # with SigV4 in headers before sending it to the endpoint. Otherwise, we will skip signing.