Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RUN sed -i "/opentelemetry-exporter-otlp-proto-grpc/d" ./aws-opentelemetry-distr
RUN mkdir workspace && pip install setuptools==75.2.0 urllib3==2.2.3 --target workspace ./aws-opentelemetry-distro

# Stage 2: Build the cp-utility binary
FROM public.ecr.aws/docker/library/rust:1.81 as builder
FROM public.ecr.aws/docker/library/rust:1.82 as builder

WORKDIR /usr/src/cp-utility
COPY ./tools/cp-utility .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import os
import re
from logging import Logger, getLogger
from logging import NOTSET, Logger, getLogger
from typing import ClassVar, Dict, List, Type, Union

from importlib_metadata import version
Expand All @@ -21,11 +21,14 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.metrics import set_meter_provider
Expand All @@ -36,9 +39,10 @@
_import_exporters,
_import_id_generator,
_import_sampler,
_init_logging,
_OTelSDKConfigurator,
)
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED,
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL,
Expand Down Expand Up @@ -84,7 +88,15 @@
OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED"
SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
XRAY_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$"
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"

AWS_TRACES_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$"
AWS_LOGS_OTLP_ENDPOINT_PATTERN = r"https://logs\.([a-z0-9-]+)\.amazonaws\.com/v1/logs$"

AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"

# UDP package size is not larger than 64KB
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10

Expand Down Expand Up @@ -160,6 +172,29 @@ def _initialize_components():
_init_logging(log_exporters, resource)


def _init_logging(
exporters: Dict[str, Type[LogExporter]],
resource: Resource = None,
):

# Provides a default OTLP log exporter when none is specified.
# This is the behavior for the logs exporters for other languages.
if not exporters:
exporters = {"otlp": OTLPLogExporter}

provider = LoggerProvider(resource=resource)
set_logger_provider(provider)

for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))

handler = LoggingHandler(level=NOTSET, logger_provider=provider)

getLogger().addHandler(handler)


def _init_tracing(
exporters: Dict[str, Type[SpanExporter]],
id_generator: IdGenerator = None,
Expand All @@ -177,7 +212,7 @@ def _init_tracing(
for _, exporter_class in exporters.items():
exporter_args: Dict[str, any] = {}
span_exporter: SpanExporter = exporter_class(**exporter_args)
span_exporter = _customize_exporter(span_exporter, resource)
span_exporter = _customize_span_exporter(span_exporter, resource)
trace_provider.add_span_processor(
BatchSpanProcessor(span_exporter=span_exporter, max_export_batch_size=_span_export_batch_size())
)
Expand Down Expand Up @@ -312,19 +347,19 @@ def _customize_sampler(sampler: Sampler) -> Sampler:
return AlwaysRecordSampler(sampler)


def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
if _is_lambda_environment():
# Override OTLP http default endpoint to UDP
if isinstance(span_exporter, OTLPSpanExporter) and os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) is None:
if isinstance(span_exporter, OTLPSpanExporter) and traces_endpoint is None:
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint)

if is_xray_otlp_endpoint(os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)):
# TODO: Change this url once doc writer has added a section for using SigV4 without collector
_logger.info("Detected using AWS OTLP XRay Endpoint.")
if _is_aws_otlp_endpoint(traces_endpoint, "xray"):
_logger.info("Detected using AWS OTLP Traces Endpoint.")

if isinstance(span_exporter, OTLPSpanExporter):
span_exporter = OTLPAwsSpanExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT))
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)

else:
_logger.warning(
Expand All @@ -338,6 +373,26 @@ def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> Span
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> LogExporter:
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)

if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
_logger.info("Detected using AWS OTLP Logs Endpoint.")

if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers():
# Setting default compression mode to Gzip as this is the behavior in upstream's
# collector otlp http exporter:
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
return OTLPAwsLogExporter(endpoint=logs_endpoint)

_logger.warning(
"Improper configuration see: please export/set "
"OTEL_EXPORTER_OTLP_LOGS_PROTOCOL=http/protobuf and OTEL_LOGS_EXPORTER=otlp"
)

return log_exporter


def _customize_span_processors(provider: TracerProvider, resource: Resource) -> None:
# Add LambdaSpanProcessor to list of processors regardless of application signals.
if _is_lambda_environment():
Expand Down Expand Up @@ -458,12 +513,48 @@ def _is_lambda_environment():
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


def is_xray_otlp_endpoint(otlp_endpoint: str = None) -> bool:
"""Is the given endpoint the XRay OTLP endpoint?"""
def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> bool:
"""Is the given endpoint an AWS OTLP endpoint?"""

pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN

if not otlp_endpoint:
return False

return bool(re.match(XRAY_OTLP_ENDPOINT_PATTERN, otlp_endpoint.lower()))
return bool(re.match(pattern, otlp_endpoint.lower()))


def _validate_logs_headers() -> bool:
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
AWS OTLP Logs endpoint."""

logs_headers = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_HEADERS)

if not logs_headers:
_logger.warning(
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
"to include x-aws-log-group and x-aws-log-stream"
)
return False

filtered_log_headers_count = 0

for pair in logs_headers.split(","):
if "=" in pair:
split = pair.split("=", 1)
key = split[0]
value = split[1]
if key in (AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER) and value:
filtered_log_headers_count += 1

if filtered_log_headers_count != 2:
_logger.warning(
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
"to have values for x-aws-log-group and x-aws-log-stream"
)
return False

return True


def _get_metric_export_interval():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import logging

import requests

from amazon.opentelemetry.distro._utils import is_installed

_logger = logging.getLogger(__name__)


class AwsAuthSession(requests.Session):
"""
A custom requests Session that adds AWS SigV4 authentication to HTTP requests.

This class extends the standard requests.Session to automatically sign requests
with AWS Signature Version 4 (SigV4) authentication. It's specifically designed
for use with the OpenTelemetry Logs and Traces exporters that send data to AWS OTLP endpoints:
X-Ray (traces) and CloudWatch Logs.

The session requires botocore to be installed for signing headers. If botocore
is not available, the session will fall back to standard unauthenticated requests
and log an error message.

Usage:
session = AwsAuthSession(aws_region="us-west-2", service="logs")
response = session.request("POST", "https://logs.us-west-2.amazonaws.com/v1/logs",
data=payload, headers=headers)

Args:
aws_region (str): The AWS region to use for signing (e.g., "us-east-1")
service (str): The AWS service name for signing (e.g., "logs" or "xray")
"""

def __init__(self, aws_region, service):

self._has_required_dependencies = False

# Requires botocore to be installed to sign the headers. However,
# some users might not need to use this authenticator. In order not conflict
# with existing behavior, we check for botocore before initializing this exporter.

if aws_region and service and is_installed("botocore"):
# pylint: disable=import-outside-toplevel
from botocore import auth, awsrequest, session

self._boto_auth = auth
self._boto_aws_request = awsrequest
self._boto_session = session.Session()

self._aws_region = aws_region
self._service = service
self._has_required_dependencies = True

else:
_logger.error(
"botocore is required to enable SigV4 Authentication. Please install it using `pip install botocore`",
)

super().__init__()

def request(self, method, url, *args, data=None, headers=None, **kwargs):
if self._has_required_dependencies:

credentials = self._boto_session.get_credentials()

if credentials is not None:
signer = self._boto_auth.SigV4Auth(credentials, self._service, self._aws_region)

request = self._boto_aws_request.AWSRequest(
method="POST",
url=url,
data=data,
headers={"Content-Type": "application/x-protobuf"},
)

try:
signer.add_auth(request)

if headers is None:
headers = {}

headers.update(dict(request.headers))

except Exception as signing_error: # pylint: disable=broad-except
_logger.error("Failed to sign request: %s", signing_error)

return super().request(method=method, url=url, *args, data=data, headers=headers, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from typing import Dict, Optional

from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter


class OTLPAwsLogExporter(OTLPLogExporter):
def __init__(
self,
endpoint: Optional[str] = None,
certificate_file: Optional[str] = None,
client_key_file: Optional[str] = None,
client_certificate_file: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
):
self._aws_region = None

if endpoint:
self._aws_region = endpoint.split(".")[1]

OTLPLogExporter.__init__(
self,
endpoint,
certificate_file,
client_key_file,
client_certificate_file,
headers,
timeout,
compression=Compression.Gzip,
session=AwsAuthSession(aws_region=self._aws_region, service="logs"),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from typing import Dict, Optional

from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter


class OTLPAwsSpanExporter(OTLPSpanExporter):
def __init__(
self,
endpoint: Optional[str] = None,
certificate_file: Optional[str] = None,
client_key_file: Optional[str] = None,
client_certificate_file: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):
self._aws_region = None

if endpoint:
self._aws_region = endpoint.split(".")[1]

OTLPSpanExporter.__init__(
self,
endpoint,
certificate_file,
client_key_file,
client_certificate_file,
headers,
timeout,
compression,
session=AwsAuthSession(aws_region=self._aws_region, service="xray"),
)
Loading
Loading