Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import os
import re
from logging import Logger, getLogger
from logging import NOTSET, Logger, getLogger
from typing import ClassVar, Dict, List, Type, Union

from importlib_metadata import version
Expand All @@ -21,11 +21,13 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
Expand All @@ -36,9 +38,10 @@
_import_exporters,
_import_id_generator,
_import_sampler,
_init_logging,
_OTelSDKConfigurator,
)
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED,
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL,
Expand Down Expand Up @@ -84,7 +87,15 @@
OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED"
SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
XRAY_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$"
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"

AWS_TRACES_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$"
AWS_LOGS_OTLP_ENDPOINT_PATTERN = r"https://logs\.([a-z0-9-]+)\.amazonaws\.com/v1/logs$"

AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"

# UDP package size is not larger than 64KB
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10

Expand Down Expand Up @@ -160,6 +171,29 @@ def _initialize_components():
_init_logging(log_exporters, resource)


def _init_logging(
exporters: Dict[str, Type[LogExporter]],
resource: Resource = None,
):

# Provides a default OTLP log exporter when none is specified.
# This is the behavior for the logs exporters for other languages.
if not exporters:
exporters.setdefault("otlp", OTLPLogExporter)

provider = LoggerProvider(resource=resource)
set_logger_provider(provider)

for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))

handler = LoggingHandler(level=NOTSET, logger_provider=provider)

getLogger().addHandler(handler)


def _init_tracing(
exporters: Dict[str, Type[SpanExporter]],
id_generator: IdGenerator = None,
Expand All @@ -177,7 +211,7 @@ def _init_tracing(
for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
span_exporter: SpanExporter = exporter_class(**exporter_args)
span_exporter = _customize_exporter(span_exporter, resource)
span_exporter = _customize_span_exporter(span_exporter, resource)
trace_provider.add_span_processor(
BatchSpanProcessor(span_exporter=span_exporter, max_export_batch_size=_span_export_batch_size())
)
Expand Down Expand Up @@ -312,19 +346,21 @@ def _customize_sampler(sampler: Sampler) -> Sampler:
return AlwaysRecordSampler(sampler)


def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
if _is_lambda_environment():
# Override OTLP http default endpoint to UDP
if isinstance(span_exporter, OTLPSpanExporter) and os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) is None:
if isinstance(span_exporter, OTLPSpanExporter) and traces_endpoint is None:
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint)

if is_xray_otlp_endpoint(os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)):
# TODO: Change this url once doc writer has added a section for using SigV4 without collector
_logger.info("Detected using AWS OTLP XRay Endpoint.")
if is_aws_otlp_endpoint(traces_endpoint, "xray"):
_logger.info("Detected using AWS OTLP Traces Endpoint.")

if isinstance(span_exporter, OTLPSpanExporter):
span_exporter = OTLPAwsSpanExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT))
span_exporter = OTLPSpanExporter(
endpoint=traces_endpoint, session=AwsAuthSession(traces_endpoint.split(".")[1], "xray")
)

else:
_logger.warning(
Expand All @@ -338,6 +374,23 @@ def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> Span
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter:
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)

if is_aws_otlp_endpoint(logs_endpoint, "logs"):
_logger.info("Detected using AWS OTLP Logs Endpoint.")

if isinstance(log_exporter, OTLPLogExporter) and validate_logs_headers():
return OTLPLogExporter(endpoint=logs_endpoint, session=AwsAuthSession(logs_endpoint.split(".")[1], "logs"))

_logger.warning(
"Improper configuration see: please export/set "
"OTEL_EXPORTER_OTLP_LOGS_PROTOCOL=http/protobuf and OTEL_LOGS_EXPORTER=otlp"
)

return log_exporter


def _customize_span_processors(provider: TracerProvider, resource: Resource) -> None:
# Add LambdaSpanProcessor to list of processors regardless of application signals.
if _is_lambda_environment():
Expand Down Expand Up @@ -458,12 +511,46 @@ def _is_lambda_environment():
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


def is_xray_otlp_endpoint(otlp_endpoint: str = None) -> bool:
"""Is the given endpoint the XRay OTLP endpoint?"""
def is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool:
"""Is the given endpoint an AWS OTLP endpoint?"""

pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN

if not otlp_endpoint:
return False

return bool(re.match(XRAY_OTLP_ENDPOINT_PATTERN, otlp_endpoint.lower()))
return bool(re.match(pattern, otlp_endpoint.lower()))


def validate_logs_headers() -> bool:
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
AWS OTLP Logs endpoint."""

logs_headers = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS)

if not logs_headers:
_logger.warning(
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
"to include x-aws-log-group and x-aws-log-stream"
)
return False

filtered_log_headers_count = 0

for pair in logs_headers.split(","):
if "=" in pair:
key = pair.split("=", 1)[0]
if key == AWS_OTLP_LOGS_GROUP_HEADER or key == AWS_OTLP_LOGS_STREAM_HEADER:
filtered_log_headers_count += 1

if filtered_log_headers_count != 2:
_logger.warning(
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
"to have values for x-aws-log-group and x-aws-log-stream"
)
return False

return True


def _get_metric_export_interval():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import logging

import requests

from amazon.opentelemetry.distro._utils import is_installed

_logger = logging.getLogger(__name__)


class AwsAuthSession(requests.Session):
"""
A custom requests Session that adds AWS SigV4 authentication to HTTP requests.

This class extends the standard requests.Session to automatically sign requests
with AWS Signature Version 4 (SigV4) authentication. It's specifically designed
for use with the OpenTelemetry Logs and Traces exporters that send data to AWS OTLP endpoints:
X-Ray (traces) and CloudWatch Logs.

The session requires botocore to be installed for signing headers. If botocore
is not available, the session will fall back to standard unauthenticated requests
and log an error message.

Usage:
session = AwsAuthSession(aws_region="us-west-2", service="logs")
response = session.request("POST", "https://logs.us-west-2.amazonaws.com/v1/logs",
data=payload, headers=headers)

Args:
aws_region (str): The AWS region to use for signing (e.g., "us-east-1")
service (str): The AWS service name for signing (e.g., "logs" or "xray")
"""

def __init__(self, aws_region, service):

self._has_required_dependencies = False

# Requires botocore to be installed to sign the headers. However,
# some users might not need to use this authenticator. In order not conflict
# with existing behavior, we check for botocore before initializing this exporter.

if aws_region and service and is_installed("botocore"):
# pylint: disable=import-outside-toplevel
from botocore import auth, awsrequest, session

self._boto_auth = auth
self._boto_aws_request = awsrequest
self._boto_session = session.Session()

self._aws_region = aws_region
self._service = service
self._has_required_dependencies = True

else:
_logger.error(
"botocore is required to enable SigV4 Authentication. Please install it using `pip install botocore`",
)

super().__init__()

def request(self, method, url, *args, data=None, headers=None, **kwargs):
if self._has_required_dependencies:

credentials = self._boto_session.get_credentials()

if credentials is not None:
signer = self._boto_auth.SigV4Auth(credentials, self._service, self._aws_region)

request = self._boto_aws_request.AWSRequest(
method="POST",
url=url,
data=data,
headers={"Content-Type": "application/x-protobuf"},
)

try:
signer.add_auth(request)

if headers is None:
headers = {}

headers.update(dict(request.headers))

except Exception as signing_error: # pylint: disable=broad-except
_logger.error("Failed to sign request: %s", signing_error)

return super().request(method=method, url=url, *args, data=data, headers=headers, **kwargs)

This file was deleted.

Loading
Loading