diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py index be241885d..129a4c0e2 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py @@ -4,6 +4,7 @@ import os from importlib.metadata import PackageNotFoundError, version from logging import Logger, getLogger +from typing import Optional from packaging.requirements import Requirement @@ -37,30 +38,40 @@ def is_agent_observability_enabled() -> bool: return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true" -def get_aws_region() -> str: - """Get AWS region using botocore session. +IS_BOTOCORE_INSTALLED: bool = is_installed("botocore") - botocore automatically checks in the following priority order: - 1. AWS_REGION environment variable - 2. AWS_DEFAULT_REGION environment variable - 3. AWS CLI config file (~/.aws/config) - 4. EC2 instance metadata service - Returns: - The AWS region if found, None otherwise. +def get_aws_session(): """ - if is_installed("botocore"): - try: - from botocore import session # pylint: disable=import-outside-toplevel - - botocore_session = session.Session() - if botocore_session.region_name: - return botocore_session.region_name - except (ImportError, AttributeError): - # botocore failed to determine region - pass - - _logger.warning( - "AWS region not found. Please set AWS_REGION environment variable or configure AWS CLI with 'aws configure'." - ) + Returns a botocore session only if botocore is installed, otherwise None. + If AWS Region is defined in `AWS_REGION` or `AWS_DEFAULT_REGION` environment variables, + then the region is set in the botocore session before returning. + + We do this to prevent runtime errors for ADOT customers that do not need + any features that require botocore. + """ + if IS_BOTOCORE_INSTALLED: + # pylint: disable=import-outside-toplevel + from botocore.session import Session + + session = Session() + # Botocore only looks up AWS_DEFAULT_REGION when creating a session/client + # See: https://docs.aws.amazon.com/sdkref/latest/guide/feature-region.html#feature-region-sdk-compat + region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") + if region: + session.set_config_variable("region", region) + return session return None + + +def get_aws_region() -> Optional[str]: + """Get AWS region from environment or botocore session. + + Returns the AWS region in the following priority order: + 1. AWS_REGION environment variable + 2. AWS_DEFAULT_REGION environment variable + 3. botocore session's region (if botocore is available) + 4. None if no region can be determined + """ + botocore_session = get_aws_session() + return botocore_session.get_config_variable("region") if botocore_session else None diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index 16fb06132..b36581719 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -12,7 +12,7 @@ from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE, AWS_SERVICE_TYPE from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute -from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed +from amazon.opentelemetry.distro._utils import get_aws_session, is_agent_observability_enabled from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( AttributePropagatingSpanProcessorBuilder, @@ -23,13 +23,6 @@ AwsMetricAttributesSpanExporterBuilder, ) from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder - -# pylint: disable=line-too-long -from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import ( - AwsCloudWatchOtlpBatchLogRecordProcessor, -) -from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter -from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader @@ -102,6 +95,8 @@ OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS" +XRAY_SERVICE = "xray" +LOGS_SERIVCE = "logs" AWS_TRACES_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$" AWS_LOGS_OTLP_ENDPOINT_PATTERN = r"https://logs\.([a-z0-9-]+)\.amazonaws\.com/v1/logs$" @@ -215,9 +210,9 @@ def _init_logging( for _, exporter_class in exporters.items(): exporter_args = {} - log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args)) - log_processor = _customize_log_record_processor(log_exporter) - provider.add_log_record_processor(log_processor) + _customize_log_record_processor( + logger_provider=provider, log_exporter=_customize_logs_exporter(exporter_class(**exporter_args)) + ) event_logger_provider = EventLoggerProvider(logger_provider=provider) set_event_logger_provider(event_logger_provider) @@ -304,10 +299,11 @@ def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvide return traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) + if traces_endpoint and _is_aws_otlp_endpoint(traces_endpoint, XRAY_SERVICE): + endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(traces_endpoint) + span_exporter = _create_aws_otlp_exporter(endpoint=endpoint, service=XRAY_SERVICE, region=region) - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider()) - - trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter)) + trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter)) def _is_defer_to_workers_enabled(): @@ -356,7 +352,7 @@ def _custom_import_sampler(sampler_name: str, resource: Resource) -> Sampler: if sampler_name is None: sampler_name = "parentbased_always_on" - if sampler_name == "xray": + if sampler_name == XRAY_SERVICE: # Example env var value # OTEL_TRACES_SAMPLER_ARG=endpoint=http://localhost:2000,polling_interval=360 sampler_argument_env: str = os.getenv(OTEL_TRACES_SAMPLER_ARG, None) @@ -402,25 +398,17 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000") span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint) - if _is_aws_otlp_endpoint(traces_endpoint, "xray"): + if traces_endpoint and _is_aws_otlp_endpoint(traces_endpoint, XRAY_SERVICE): _logger.info("Detected using AWS OTLP Traces Endpoint.") if isinstance(span_exporter, OTLPSpanExporter): - if is_agent_observability_enabled(): - # Span exporter needs an instance of logger provider in ai agent - # observability case because we need to split input/output prompts - # from span attributes and send them to the logs pipeline per - # the new Gen AI semantic convention from OTel - # ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/ - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider()) - else: - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) + endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(traces_endpoint) + return _create_aws_otlp_exporter(endpoint=endpoint, service=XRAY_SERVICE, region=region) - else: - _logger.warning( - "Improper configuration see: please export/set " - "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf and OTEL_TRACES_EXPORTER=otlp" - ) + _logger.warning( + "Improper configuration see: please export/set " + "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf and OTEL_TRACES_EXPORTER=otlp" + ) if not _is_application_signals_enabled(): return span_exporter @@ -428,24 +416,34 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build() -def _customize_log_record_processor(log_exporter: LogExporter): - if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled(): - return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter) +def _customize_log_record_processor(logger_provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None: + if not log_exporter: + return + + if is_agent_observability_enabled(): + # pylint: disable=import-outside-toplevel + from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import ( + AwsCloudWatchOtlpBatchLogRecordProcessor, + ) - return BatchLogRecordProcessor(exporter=log_exporter) + logger_provider.add_log_record_processor(AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)) + else: + logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter)) def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter: logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) - if _is_aws_otlp_endpoint(logs_endpoint, "logs"): + if logs_endpoint and _is_aws_otlp_endpoint(logs_endpoint, LOGS_SERIVCE): + _logger.info("Detected using AWS OTLP Logs Endpoint.") if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid: + endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(logs_endpoint) # Setting default compression mode to Gzip as this is the behavior in upstream's # collector otlp http exporter: # https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter - return OTLPAwsLogExporter(endpoint=logs_endpoint) + return _create_aws_otlp_exporter(endpoint=endpoint, service=LOGS_SERIVCE, region=region) _logger.warning( "Improper configuration see: please export/set " @@ -514,7 +512,7 @@ def _customize_metric_exporters( metric_readers.append(scope_based_periodic_exporting_metric_reader) if is_emf_enabled: - emf_exporter = create_emf_exporter() + emf_exporter = _create_emf_exporter() if emf_exporter: metric_readers.append(PeriodicExportingMetricReader(emf_exporter)) @@ -604,17 +602,24 @@ def _is_lambda_environment(): return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ -def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool: +def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str], service: str) -> bool: """Is the given endpoint an AWS OTLP endpoint?""" - pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN - if not otlp_endpoint: return False + pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == XRAY_SERVICE else AWS_LOGS_OTLP_ENDPOINT_PATTERN + return bool(re.match(pattern, otlp_endpoint.lower())) +def _extract_endpoint_and_region_from_otlp_endpoint(endpoint: str): + endpoint = endpoint.lower() + region = endpoint.split(".")[1] + + return endpoint, region + + def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: """Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to AWS OTLP Logs endpoint.""" @@ -631,7 +636,6 @@ def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: log_group = None log_stream = None namespace = None - filtered_log_headers_count = 0 for pair in logs_headers.split(","): if "=" in pair: @@ -640,14 +644,12 @@ def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting: value = split[1] if key == AWS_OTLP_LOGS_GROUP_HEADER and value: log_group = value - filtered_log_headers_count += 1 elif key == AWS_OTLP_LOGS_STREAM_HEADER and value: log_stream = value - filtered_log_headers_count += 1 elif key == AWS_EMF_METRICS_NAMESPACE and value: namespace = value - is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None + is_valid = log_group is not None and log_stream is not None if not is_valid: _logger.warning( @@ -769,11 +771,12 @@ def _check_emf_exporter_enabled() -> bool: return True -def create_emf_exporter(): +def _create_emf_exporter(): """Create and configure the CloudWatch EMF exporter.""" try: + session = get_aws_session() # Check if botocore is available before importing the EMF exporter - if not is_installed("botocore"): + if not session: _logger.warning("botocore is not installed. EMF exporter requires botocore") return None @@ -788,6 +791,7 @@ def create_emf_exporter(): return None return AwsCloudWatchEmfExporter( + session=session, namespace=log_header_setting.namespace, log_group_name=log_header_setting.log_group, log_stream_name=log_header_setting.log_stream, @@ -796,3 +800,39 @@ def create_emf_exporter(): except Exception as errors: _logger.error("Failed to create EMF exporter: %s", errors) return None + + +def _create_aws_otlp_exporter(endpoint: str, service: str, region: str): + """Create and configure the AWS OTLP exporters.""" + try: + session = get_aws_session() + # Check if botocore is available before importing the AWS exporter + if not session: + _logger.warning("Sigv4 Auth requires botocore to be enabled") + return None + + # pylint: disable=import-outside-toplevel + from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter + from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter + + if service == XRAY_SERVICE: + if is_agent_observability_enabled(): + # Span exporter needs an instance of logger provider in ai agent + # observability case because we need to split input/output prompts + # from span attributes and send them to the logs pipeline per + # the new Gen AI semantic convention from OTel + # ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/ + return OTLPAwsSpanExporter( + session=session, endpoint=endpoint, aws_region=region, logger_provider=get_logger_provider() + ) + + return OTLPAwsSpanExporter(session=session, endpoint=endpoint, aws_region=region) + + if service == LOGS_SERIVCE: + return OTLPAwsLogExporter(session=session, aws_region=region) + + return None + # pylint: disable=broad-exception-caught + except Exception as errors: + _logger.error("Failed to create AWS OTLP exporter: %s", errors) + return None diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index 9fde5e248..a7f73f4e1 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -86,8 +86,9 @@ def _configure(self, **kwargs): # Set GenAI capture content default os.environ.setdefault(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "true") - # Set OTLP endpoints with AWS region if not already set region = get_aws_region() + + # Set OTLP endpoints with AWS region if not already set if region: os.environ.setdefault( OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, f"https://xray.{region}.amazonaws.com/v1/traces" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py index 72236121f..b7daac12b 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py @@ -8,8 +8,8 @@ import uuid from typing import Any, Dict, List, Optional -import botocore.session from botocore.exceptions import ClientError +from botocore.session import Session logger = logging.getLogger(__name__) @@ -90,6 +90,7 @@ class CloudWatchLogClient: def __init__( self, log_group_name: str, + session: Session, log_stream_name: Optional[str] = None, aws_region: Optional[str] = None, **kwargs, @@ -105,8 +106,6 @@ def __init__( """ self.log_group_name = log_group_name self.log_stream_name = log_stream_name or self._generate_log_stream_name() - - session = botocore.session.Session() self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs) # Event batch to store logs before sending to CloudWatch diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/aws_auth_session.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/aws_auth_session.py index 2c383592b..564bfe9e2 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/aws_auth_session.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/aws_auth_session.py @@ -4,8 +4,9 @@ import logging import requests - -from amazon.opentelemetry.distro._utils import is_installed +from botocore.auth import SigV4Auth +from botocore.awsrequest import AWSRequest +from botocore.session import Session _logger = logging.getLogger(__name__) @@ -33,57 +34,36 @@ class AwsAuthSession(requests.Session): service (str): The AWS service name for signing (e.g., "logs" or "xray") """ - def __init__(self, aws_region, service): - - self._has_required_dependencies = False - - # Requires botocore to be installed to sign the headers. However, - # some users might not need to use this authenticator. In order not conflict - # with existing behavior, we check for botocore before initializing this exporter. - - if aws_region and service and is_installed("botocore"): - # pylint: disable=import-outside-toplevel - from botocore import auth, awsrequest, session - - self._boto_auth = auth - self._boto_aws_request = awsrequest - self._boto_session = session.Session() - - self._aws_region = aws_region - self._service = service - self._has_required_dependencies = True - - else: - _logger.error( - "botocore is required to enable SigV4 Authentication. Please install it using `pip install botocore`", - ) + def __init__(self, aws_region: str, service: str, session: Session): + self._aws_region: str = aws_region + self._service: str = service + self._session: Session = session super().__init__() def request(self, method, url, *args, data=None, headers=None, **kwargs): - if self._has_required_dependencies: - - credentials = self._boto_session.get_credentials() - - if credentials is not None: - signer = self._boto_auth.SigV4Auth(credentials, self._service, self._aws_region) - - request = self._boto_aws_request.AWSRequest( - method="POST", - url=url, - data=data, - headers={"Content-Type": "application/x-protobuf"}, - ) + credentials = self._session.get_credentials() + + if credentials: + signer = SigV4Auth(credentials, self._service, self._aws_region) + request = AWSRequest( + method="POST", + url=url, + data=data, + headers={"Content-Type": "application/x-protobuf"}, + ) - try: - signer.add_auth(request) + try: + signer.add_auth(request) - if headers is None: - headers = {} + if headers is None: + headers = {} - headers.update(dict(request.headers)) + headers.update(dict(request.headers)) - except Exception as signing_error: # pylint: disable=broad-except - _logger.error("Failed to sign request: %s", signing_error) + except Exception as signing_error: # pylint: disable=broad-except + _logger.error("Failed to sign request: %s", signing_error) + else: + _logger.error("Failed to load AWS Credentials: %s") return super().request(method=method, url=url, *args, data=data, headers=headers, **kwargs) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py index 5ad3c2c8e..fe90e1f90 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/_aws_cw_otlp_batch_log_record_processor.py @@ -247,3 +247,12 @@ def _estimate_utf8_size(s: str): # Estimate: ASCII chars (1 byte) + upper bound of non-ASCII chars 4 bytes return ascii_count + (non_ascii_count * 4) + + # Only export the logs once to avoid the race condition of the worker thread and force flush thread + # https://github.com/open-telemetry/opentelemetry-python/issues/3193 + # https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L199 + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + if self._shutdown: + return False + self._export(BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH) + return True diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py index 16a976d54..4ed3649c3 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py @@ -10,6 +10,7 @@ from time import time from typing import Dict, Optional, Sequence +from botocore.session import Session from requests import Response from requests.exceptions import ConnectionError as RequestsConnectionError from requests.structures import CaseInsensitiveDict @@ -38,6 +39,10 @@ class OTLPAwsLogExporter(OTLPLogExporter): def __init__( self, + aws_region: str, + session: Session, + log_group: Optional[str] = None, + log_stream: Optional[str] = None, endpoint: Optional[str] = None, certificate_file: Optional[str] = None, client_key_file: Optional[str] = None, @@ -45,10 +50,14 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, ): - self._aws_region = None + self._aws_region = aws_region - if endpoint: - self._aws_region = endpoint.split(".")[1] + if log_group and log_stream: + log_headers = {"x-aws-log-group": log_group, "x-aws-log-stream": log_stream} + if headers: + headers.update(log_headers) + else: + headers = log_headers OTLPLogExporter.__init__( self, @@ -59,7 +68,7 @@ def __init__( headers, timeout, compression=Compression.Gzip, - session=AwsAuthSession(aws_region=self._aws_region, service="logs"), + session=AwsAuthSession(session=session, aws_region=self._aws_region, service="logs"), ) self._shutdown_event = Event() diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 7f44b04e4..3589121d9 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -4,6 +4,8 @@ import logging from typing import Dict, Optional, Sequence +from botocore.session import Session + from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession from amazon.opentelemetry.distro.llo_handler import LLOHandler @@ -14,7 +16,7 @@ from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class OTLPAwsSpanExporter(OTLPSpanExporter): @@ -28,6 +30,8 @@ class OTLPAwsSpanExporter(OTLPSpanExporter): def __init__( self, + aws_region: str, + session: Session, endpoint: Optional[str] = None, certificate_file: Optional[str] = None, client_key_file: Optional[str] = None, @@ -37,13 +41,10 @@ def __init__( compression: Optional[Compression] = None, logger_provider: Optional[LoggerProvider] = None, ): - self._aws_region = None + self._aws_region = aws_region self._logger_provider = logger_provider self._llo_handler = None - if endpoint: - self._aws_region = endpoint.split(".")[1] - OTLPSpanExporter.__init__( self, endpoint, @@ -53,7 +54,7 @@ def __init__( headers, timeout, compression, - session=AwsAuthSession(aws_region=self._aws_region, service="xray"), + session=AwsAuthSession(session=session, aws_region=self._aws_region, service="xray"), ) def _ensure_llo_handler(self): @@ -63,7 +64,7 @@ def _ensure_llo_handler(self): try: self._logger_provider = get_logger_provider() except Exception as exc: # pylint: disable=broad-exception-caught - logger.debug("Failed to get logger provider: %s", exc) + _logger.debug("Failed to get logger provider: %s", exc) return False if self._logger_provider: diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py index 7ac67fddf..9a90c56a5 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_aws_cloudwatch_emf_exporter.py @@ -132,7 +132,9 @@ def setUp(self): mock_session.return_value = mock_session_instance mock_session_instance.create_client.return_value = mock_client - self.exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + self.exporter = AwsCloudWatchEmfExporter( + session=mock_session, namespace="TestNamespace", log_group_name="test-log-group" + ) def test_initialization(self): """Test exporter initialization.""" @@ -150,6 +152,7 @@ def test_initialization_with_custom_params(self, mock_session): mock_session_instance.create_client.return_value = mock_client exporter = AwsCloudWatchEmfExporter( + session=mock_session_instance, namespace="CustomNamespace", log_group_name="custom-log-group", log_stream_name="custom-stream", @@ -578,7 +581,9 @@ def test_initialization_with_env_region(self, mock_session, mock_env_get): mock_session.return_value = mock_session_instance mock_session_instance.create_client.return_value = mock_client - exporter = AwsCloudWatchEmfExporter(namespace="TestNamespace", log_group_name="test-log-group") + exporter = AwsCloudWatchEmfExporter( + session=mock_session, namespace="TestNamespace", log_group_name="test-log-group" + ) # Just verify the exporter was created successfully with region handling self.assertIsNotNone(exporter) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py index 0215962db..2793aeb34 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/aws/metrics/test_cloudwatch_log_client.py @@ -24,7 +24,7 @@ def setUp(self): mock_session.return_value = mock_session_instance mock_session_instance.create_client.return_value = mock_client - self.log_client = CloudWatchLogClient(log_group_name="test-log-group") + self.log_client = CloudWatchLogClient(session=mock_session, log_group_name="test-log-group") def test_initialization(self): """Test log client initialization.""" @@ -42,6 +42,7 @@ def test_initialization_with_custom_params(self, mock_session): mock_session_instance.create_client.return_value = mock_client log_client = CloudWatchLogClient( + session=mock_session, log_group_name="custom-log-group", log_stream_name="custom-stream", aws_region="us-west-2", @@ -479,7 +480,7 @@ def test_initialization_with_custom_log_stream_name(self, mock_session): mock_session.return_value.create_client.return_value = mock_client custom_stream = "my-custom-stream" - client = CloudWatchLogClient("test-group", log_stream_name=custom_stream) + client = CloudWatchLogClient(session=mock_session, log_group_name="test-group", log_stream_name=custom_stream) self.assertEqual(client.log_stream_name, custom_stream) def test_send_log_batch_empty_batch_no_aws_call(self): diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py index 7d6479251..11babbb7b 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/common/test_aws_auth_session.py @@ -1,12 +1,12 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from importlib.metadata import PackageNotFoundError from unittest import TestCase from unittest.mock import patch import requests from botocore.credentials import Credentials +from amazon.opentelemetry.distro._utils import get_aws_session from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession AWS_OTLP_TRACES_ENDPOINT = "https://xray.us-east-1.amazonaws.com/v1/traces" @@ -20,28 +20,12 @@ class TestAwsAuthSession(TestCase): - @patch("amazon.opentelemetry.distro._utils.version") - @patch.dict("sys.modules", {"botocore": None}) - @patch("requests.Session.request", return_value=requests.Response()) - def test_aws_auth_session_no_botocore(self, mock_request, mock_version): - """Tests that aws_auth_session will not inject SigV4 Headers if botocore is not installed.""" - mock_version.side_effect = PackageNotFoundError("botocore") - - session = AwsAuthSession("us-east-1", "xray") - actual_headers = {"test": "test"} - - session.request("POST", AWS_OTLP_TRACES_ENDPOINT, data="", headers=actual_headers) - - self.assertNotIn(AUTHORIZATION_HEADER, actual_headers) - self.assertNotIn(X_AMZ_DATE_HEADER, actual_headers) - self.assertNotIn(X_AMZ_SECURITY_TOKEN_HEADER, actual_headers) - @patch("requests.Session.request", return_value=requests.Response()) @patch("botocore.session.Session.get_credentials", return_value=None) def test_aws_auth_session_no_credentials(self, _, __): """Tests that aws_auth_session will not inject SigV4 Headers if retrieving credentials returns None.""" - session = AwsAuthSession("us-east-1", "xray") + session = AwsAuthSession("us-east-1", "xray", get_aws_session()) actual_headers = {"test": "test"} session.request("POST", AWS_OTLP_TRACES_ENDPOINT, data="", headers=actual_headers) @@ -55,7 +39,7 @@ def test_aws_auth_session_no_credentials(self, _, __): def test_aws_auth_session(self, _, __): """Tests that aws_auth_session will inject SigV4 Headers if botocore is installed.""" - session = AwsAuthSession("us-east-1", "xray") + session = AwsAuthSession("us-east-1", "xray", get_aws_session()) actual_headers = {"test": "test"} session.request("POST", AWS_OTLP_TRACES_ENDPOINT, data="", headers=actual_headers) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py index 2d019bce7..f22c18492 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_cw_otlp_batch_log_record_processor.py @@ -231,6 +231,47 @@ def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___): expected_size = expected_sizes[index] self.assertEqual(len(batch), expected_size) + def test_force_flush_returns_false_when_shutdown(self): + """Tests that force_flush returns False when processor is shutdown""" + self.processor.shutdown() + result = self.processor.force_flush() + + # Verify force_flush returns False and no export is called + self.assertFalse(result) + self.mock_exporter.export.assert_not_called() + + @patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.attach", + return_value=MagicMock(), + ) + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.detach") + @patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor.set_value") + def test_force_flush_exports_only_one_batch(self, _, __, ___): + """Tests that force_flush should try to at least export one batch of logs. Rest of the logs will be dropped""" + # Set max_export_batch_size to 5 to limit batch size + self.processor._max_export_batch_size = 5 + self.processor._shutdown = False + + # Add 6 logs to queue, after the export there should be 1 log remaining + log_count = 6 + test_logs = self.generate_test_log_data(log_body="test message", count=log_count) + + for log in test_logs: + self.processor._queue.appendleft(log) + + self.assertEqual(len(self.processor._queue), log_count) + + result = self.processor.force_flush() + + self.assertTrue(result) + self.assertEqual(len(self.processor._queue), 1) + self.mock_exporter.export.assert_called_once() + + # Verify only one batch of 5 logs was exported + args, _ = self.mock_exporter.export.call_args + exported_batch = args[0] + self.assertEqual(len(exported_batch), 5) + @staticmethod def generate_test_log_data( log_body, diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py index 8623a6696..5c75f63de 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_otlp_aws_logs_exporter.py @@ -7,6 +7,7 @@ import requests from requests.structures import CaseInsensitiveDict +from amazon.opentelemetry.distro._utils import get_aws_session from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import _MAX_RETRYS, OTLPAwsLogExporter from opentelemetry._logs.severity import SeverityNumber from opentelemetry.sdk._logs import LogData, LogRecord @@ -36,7 +37,7 @@ class TestOTLPAwsLogsExporter(TestCase): def setUp(self): self.logs = self.generate_test_log_data() - self.exporter = OTLPAwsLogExporter(endpoint=self._ENDPOINT) + self.exporter = OTLPAwsLogExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=self._ENDPOINT) @patch("requests.Session.post", return_value=good_response) def test_export_success(self, mock_request): @@ -200,7 +201,32 @@ def test_export_interrupted_by_shutdown(self, mock_request, mock_wait): # Should make one request, then get interrupted during retry wait self.assertEqual(mock_request.call_count, 1) self.assertEqual(result, LogExportResult.FAILURE) - mock_wait.assert_called_once() + + @patch("requests.Session.post", return_value=good_response) + def test_export_with_log_group_and_stream_headers(self, mock_request): + """Tests that log_group and log_stream are properly set as headers when provided.""" + log_group = "test-log-group" + log_stream = "test-log-stream" + + exporter = OTLPAwsLogExporter( + session=get_aws_session(), + aws_region="us-east-1", + endpoint=self._ENDPOINT, + log_group=log_group, + log_stream=log_stream, + ) + + result = exporter.export(self.logs) + + mock_request.assert_called_once() + self.assertEqual(result, LogExportResult.SUCCESS) + + # Verify headers contain log group and stream + session_headers = exporter._session.headers + self.assertIn("x-aws-log-group", session_headers) + self.assertIn("x-aws-log-stream", session_headers) + self.assertEqual(session_headers["x-aws-log-group"], log_group) + self.assertEqual(session_headers["x-aws-log-stream"], log_stream) @staticmethod def generate_test_log_data(count=5): diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/traces/test_otlp_aws_span_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/traces/test_otlp_aws_span_exporter.py index d0b2a004d..1553dd8e2 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/traces/test_otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/traces/test_otlp_aws_span_exporter.py @@ -4,6 +4,7 @@ from unittest import TestCase from unittest.mock import MagicMock, patch +from amazon.opentelemetry.distro._utils import get_aws_session from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk._logs import LoggerProvider @@ -17,7 +18,9 @@ def test_init_with_logger_provider(self): mock_logger_provider = MagicMock(spec=LoggerProvider) endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint, logger_provider=mock_logger_provider) + exporter = OTLPAwsSpanExporter( + session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint, logger_provider=mock_logger_provider + ) self.assertEqual(exporter._logger_provider, mock_logger_provider) self.assertEqual(exporter._aws_region, "us-east-1") @@ -26,7 +29,7 @@ def test_init_without_logger_provider(self): # Test initialization without logger_provider (default behavior) endpoint = "https://xray.us-west-2.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-west-2", endpoint=endpoint) self.assertIsNone(exporter._logger_provider) self.assertEqual(exporter._aws_region, "us-west-2") @@ -38,7 +41,7 @@ def test_ensure_llo_handler_when_disabled(self, mock_is_enabled): mock_is_enabled.return_value = False endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint) result = exporter._ensure_llo_handler() self.assertFalse(result) @@ -59,7 +62,7 @@ def test_ensure_llo_handler_lazy_initialization( mock_llo_handler_class.return_value = mock_llo_handler endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint) # First call should initialize result = exporter._ensure_llo_handler() @@ -87,7 +90,9 @@ def test_ensure_llo_handler_with_existing_logger_provider(self, mock_is_enabled, mock_logger_provider = MagicMock(spec=LoggerProvider) endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint, logger_provider=mock_logger_provider) + exporter = OTLPAwsSpanExporter( + session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint, logger_provider=mock_logger_provider + ) with patch( "amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.LLOHandler" @@ -110,7 +115,7 @@ def test_ensure_llo_handler_get_logger_provider_fails(self, mock_is_enabled, moc mock_get_logger_provider.side_effect = Exception("Failed to get logger provider") endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint) result = exporter._ensure_llo_handler() @@ -123,7 +128,7 @@ def test_export_with_llo_disabled(self, mock_is_enabled): mock_is_enabled.return_value = False endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint) # Mock the parent class export method with patch.object(OTLPSpanExporter, "export") as mock_parent_export: @@ -149,7 +154,7 @@ def test_export_with_llo_enabled(self, mock_llo_handler_class, mock_get_logger_p mock_llo_handler_class.return_value = mock_llo_handler endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint) # Mock spans and processed spans original_spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)] @@ -182,7 +187,7 @@ def test_export_with_llo_processing_failure( mock_llo_handler.process_spans.side_effect = Exception("LLO processing failed") endpoint = "https://xray.us-east-1.amazonaws.com/v1/traces" - exporter = OTLPAwsSpanExporter(endpoint=endpoint) + exporter = OTLPAwsSpanExporter(session=get_aws_session(), aws_region="us-east-1", endpoint=endpoint) spans = [MagicMock(spec=ReadableSpan), MagicMock(spec=ReadableSpan)] diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py index 66e168a57..2a01bbd5c 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py @@ -25,6 +25,8 @@ AwsOpenTelemetryConfigurator, OtlpLogHeaderSetting, _check_emf_exporter_enabled, + _create_aws_otlp_exporter, + _create_emf_exporter, _custom_import_sampler, _customize_log_record_processor, _customize_logs_exporter, @@ -41,7 +43,6 @@ _is_defer_to_workers_enabled, _is_wsgi_master_process, _validate_and_fetch_logs_header, - create_emf_exporter, ) from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor @@ -69,6 +70,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.metrics import get_meter_provider from opentelemetry.processor.baggage import BaggageSpanProcessor +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -386,6 +388,7 @@ def test_customize_span_processors_with_agent_observability(self): mock_tracer_provider.reset_mock() os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" + os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 2) @@ -395,6 +398,7 @@ def test_customize_span_processors_with_agent_observability(self): self.assertIsInstance(second_processor, BaggageSpanProcessor) os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None) def test_baggage_span_processor_session_id_filtering(self): """Test that BaggageSpanProcessor only set session.id filter by default""" @@ -695,6 +699,7 @@ def test_customize_span_processors(self): mock_tracer_provider.reset_mock() os.environ.setdefault("AGENT_OBSERVABILITY_ENABLED", "true") + os.environ.setdefault("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "https://xray.us-east-1.amazonaws.com/v1/traces") _customize_span_processors(mock_tracer_provider, Resource.get_empty()) self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 4) @@ -704,6 +709,8 @@ def test_customize_span_processors(self): self.assertIsInstance(processors[2], AttributePropagatingSpanProcessor) self.assertIsInstance(processors[3], AwsSpanMetricsProcessor) + os.environ.pop("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") + def test_customize_span_processors_lambda(self): mock_tracer_provider: TracerProvider = MagicMock() # Clean up environment to ensure consistent test state @@ -830,14 +837,15 @@ def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): mock_tracer_provider: TracerProvider = MagicMock() with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator.OTLPAwsSpanExporter" + "amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.OTLPAwsSpanExporter" ) as mock_aws_exporter: with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator.BatchUnsampledSpanProcessor" - ) as mock_processor: + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider" + ) as mock_logger_provider: with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider" - ) as mock_logger_provider: + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session" + ) as mock_session: + mock_session.return_value = MagicMock() os.environ["AGENT_OBSERVABILITY_ENABLED"] = "true" os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "https://xray.us-east-1.amazonaws.com/v1/traces" @@ -845,13 +853,13 @@ def test_export_unsampled_span_for_agent_observability_uses_aws_exporter(self): # Verify OTLPAwsSpanExporter is created with correct parameters mock_aws_exporter.assert_called_once_with( + session=mock_session.return_value, endpoint="https://xray.us-east-1.amazonaws.com/v1/traces", + aws_region="us-east-1", logger_provider=mock_logger_provider.return_value, ) - # Verify BatchUnsampledSpanProcessor wraps the exporter - mock_processor.assert_called_once_with(span_exporter=mock_aws_exporter.return_value) # Verify processor is added to tracer provider - mock_tracer_provider.add_span_processor.assert_called_once_with(mock_processor.return_value) + mock_tracer_provider.add_span_processor.assert_called_once() # Clean up os.environ.pop("AGENT_OBSERVABILITY_ENABLED", None) @@ -1015,30 +1023,45 @@ def test_validate_and_fetch_logs_header(self): # Clean up os.environ.pop(OTEL_EXPORTER_OTLP_LOGS_HEADERS, None) - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._is_aws_otlp_endpoint") - def test_customize_log_record_processor_with_agent_observability(self, mock_is_aws_endpoint, mock_is_agent_enabled): + @patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled", return_value=False + ) + def test_customize_log_record_processor_without_agent_observability(self, _): + """Test that BatchLogRecordProcessor is used when agent observability is not enabled""" + mock_logger_provider = MagicMock() + mock_exporter = MagicMock(spec=OTLPAwsLogExporter) + + _customize_log_record_processor(mock_logger_provider, mock_exporter) + + mock_logger_provider.add_log_record_processor.assert_called_once() + added_processor = mock_logger_provider.add_log_record_processor.call_args[0][0] + self.assertIsInstance(added_processor, BatchLogRecordProcessor) + + @patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled", return_value=True + ) + def test_customize_log_record_processor_with_agent_observability(self, _): """Test that AwsCloudWatchOtlpBatchLogRecordProcessor is used when agent observability is enabled""" + mock_logger_provider = MagicMock() mock_exporter = MagicMock(spec=OTLPAwsLogExporter) - mock_is_agent_enabled.return_value = True - mock_is_aws_endpoint.return_value = True - processor = _customize_log_record_processor(mock_exporter) + _customize_log_record_processor(mock_logger_provider, mock_exporter) - self.assertIsInstance(processor, AwsCloudWatchOtlpBatchLogRecordProcessor) + mock_logger_provider.add_log_record_processor.assert_called_once() + added_processor = mock_logger_provider.add_log_record_processor.call_args[0][0] + self.assertIsInstance(added_processor, AwsCloudWatchOtlpBatchLogRecordProcessor) @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator._validate_and_fetch_logs_header") - @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_installed") - def test_create_emf_exporter(self, mock_is_installed, mock_validate): + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_emf_exporter(self, mock_get_session, mock_validate): # Test when botocore is not installed - mock_is_installed.return_value = False - result = create_emf_exporter() + mock_get_session.return_value = None + result = _create_emf_exporter() self.assertIsNone(result) - mock_is_installed.assert_called_with("botocore") # Reset mock for subsequent tests - mock_is_installed.reset_mock() - mock_is_installed.return_value = True + mock_get_session.reset_mock() + mock_get_session.return_value = MagicMock() # Mock the EMF exporter class import by patching the module import with patch( @@ -1051,34 +1074,106 @@ def test_create_emf_exporter(self, mock_is_installed, mock_validate): # Test when headers are invalid mock_validate.return_value = OtlpLogHeaderSetting(None, None, None, False) - result = create_emf_exporter() + result = _create_emf_exporter() self.assertIsNone(result) # Test when namespace is missing (should still create exporter with default namespace) mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", None, True) - result = create_emf_exporter() + result = _create_emf_exporter() self.assertIsNotNone(result) self.assertEqual(result, mock_exporter_instance) # Verify that the EMF exporter was called with correct parameters mock_emf_exporter_class.assert_called_with( - namespace=None, log_group_name="test-group", log_stream_name="test-stream" + session=mock_get_session.return_value, + namespace=None, + log_group_name="test-group", + log_stream_name="test-stream", ) # Test with valid configuration mock_validate.return_value = OtlpLogHeaderSetting("test-group", "test-stream", "test-namespace", True) - result = create_emf_exporter() + result = _create_emf_exporter() self.assertIsNotNone(result) self.assertEqual(result, mock_exporter_instance) # Verify that the EMF exporter was called with correct parameters mock_emf_exporter_class.assert_called_with( - namespace="test-namespace", log_group_name="test-group", log_stream_name="test-stream" + session=mock_get_session.return_value, + namespace="test-namespace", + log_group_name="test-group", + log_stream_name="test-stream", ) # Test exception handling mock_validate.side_effect = Exception("Test exception") - result = create_emf_exporter() + result = _create_emf_exporter() self.assertIsNone(result) + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_logger_provider") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.is_agent_observability_enabled") + @patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.get_aws_session") + def test_create_aws_otlp_exporter(self, mock_get_session, mock_is_agent_enabled, mock_get_logger_provider): + # Test when botocore is not installed + mock_get_session.return_value = None + result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") + self.assertIsNone(result) + + # Reset mock for subsequent tests + mock_get_session.reset_mock() + mock_get_session.return_value = MagicMock() + mock_get_logger_provider.return_value = MagicMock() + + # Test xray service without agent observability + mock_is_agent_enabled.return_value = False + with patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.OTLPAwsSpanExporter" + ) as mock_span_exporter_class: + mock_exporter_instance = MagicMock() + mock_span_exporter_class.return_value = mock_exporter_instance + + result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + mock_span_exporter_class.assert_called_with( + session=mock_get_session.return_value, + endpoint="https://xray.us-east-1.amazonaws.com/v1/traces", + aws_region="us-east-1", + ) + + # Test xray service with agent observability + mock_is_agent_enabled.return_value = True + with patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter.OTLPAwsSpanExporter" + ) as mock_span_exporter_class: + mock_exporter_instance = MagicMock() + mock_span_exporter_class.return_value = mock_exporter_instance + + result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + mock_span_exporter_class.assert_called_with( + session=mock_get_session.return_value, + endpoint="https://xray.us-east-1.amazonaws.com/v1/traces", + aws_region="us-east-1", + logger_provider=mock_get_logger_provider.return_value, + ) + + # Test logs service + with patch( + "amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter.OTLPAwsLogExporter" + ) as mock_log_exporter_class: + mock_exporter_instance = MagicMock() + mock_log_exporter_class.return_value = mock_exporter_instance + + result = _create_aws_otlp_exporter("https://logs.us-east-1.amazonaws.com/v1/logs", "logs", "us-east-1") + self.assertIsNotNone(result) + self.assertEqual(result, mock_exporter_instance) + mock_log_exporter_class.assert_called_with(session=mock_get_session.return_value, aws_region="us-east-1") + + # Test exception handling + mock_get_session.side_effect = Exception("Test exception") + result = _create_aws_otlp_exporter("https://xray.us-east-1.amazonaws.com/v1/traces", "xray", "us-east-1") + self.assertIsNone(result) + def test_customize_metric_exporters_with_emf(self): metric_readers = [] views = [] @@ -1088,7 +1183,9 @@ def test_customize_metric_exporters_with_emf(self): self.assertEqual(len(metric_readers), 0) # Test with EMF enabled but create_emf_exporter returns None - with patch("amazon.opentelemetry.distro.aws_opentelemetry_configurator.create_emf_exporter", return_value=None): + with patch( + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", return_value=None + ): _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) self.assertEqual(len(metric_readers), 0) @@ -1099,7 +1196,7 @@ def test_customize_metric_exporters_with_emf(self): mock_emf_exporter._preferred_aggregation = {} with patch( - "amazon.opentelemetry.distro.aws_opentelemetry_configurator.create_emf_exporter", + "amazon.opentelemetry.distro.aws_opentelemetry_configurator._create_emf_exporter", return_value=mock_emf_exporter, ): _customize_metric_exporters(metric_readers, views, is_emf_enabled=True) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_utils.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_utils.py index adb690359..4c0cd709f 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_utils.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_utils.py @@ -9,6 +9,7 @@ from amazon.opentelemetry.distro._utils import ( AGENT_OBSERVABILITY_ENABLED, get_aws_region, + get_aws_session, is_agent_observability_enabled, is_installed, ) @@ -104,67 +105,71 @@ def test_is_agent_observability_enabled_various_values(self): del os.environ[AGENT_OBSERVABILITY_ENABLED] self.assertFalse(is_agent_observability_enabled()) + def test_get_aws_session_with_botocore(self): + """Test get_aws_session when botocore is installed""" + with patch("amazon.opentelemetry.distro._utils.IS_BOTOCORE_INSTALLED", True): + with patch("botocore.session.Session") as mock_session_class: + mock_session = MagicMock() + mock_session_class.return_value = mock_session + + session = get_aws_session() + self.assertEqual(session, mock_session) + mock_session_class.assert_called_once() + + def test_get_aws_session_without_botocore(self): + """Test get_aws_session when botocore is not installed""" + with patch("amazon.opentelemetry.distro._utils.IS_BOTOCORE_INSTALLED", False): + session = get_aws_session() + self.assertIsNone(session) + def test_get_aws_region_with_botocore(self): """Test get_aws_region when botocore is available and returns a region""" - with patch("amazon.opentelemetry.distro._utils.is_installed") as mock_is_installed: - mock_is_installed.return_value = True - - # Create a mock botocore session - mock_session_class = MagicMock() - mock_session_instance = MagicMock() - mock_session_instance.region_name = "us-east-1" - mock_session_class.Session.return_value = mock_session_instance + with patch("amazon.opentelemetry.distro._utils.get_aws_session") as mock_get_session: + mock_session = MagicMock() + mock_session.get_config_variable.return_value = "us-east-1" + mock_get_session.return_value = mock_session - # Patch the import statement directly in the function - with patch.dict("sys.modules", {"botocore": MagicMock(session=mock_session_class)}): - region = get_aws_region() - self.assertEqual(region, "us-east-1") + region = get_aws_region() + self.assertEqual(region, "us-east-1") + mock_session.get_config_variable.assert_called_once_with("region") def test_get_aws_region_without_botocore(self): """Test get_aws_region when botocore is not installed""" - with patch("amazon.opentelemetry.distro._utils.is_installed") as mock_is_installed: - mock_is_installed.return_value = False + with patch("amazon.opentelemetry.distro._utils.get_aws_session") as mock_get_session: + mock_get_session.return_value = None region = get_aws_region() self.assertIsNone(region) def test_get_aws_region_botocore_no_region(self): """Test get_aws_region when botocore is available but returns no region""" - with patch("amazon.opentelemetry.distro._utils.is_installed") as mock_is_installed: - mock_is_installed.return_value = True - - # Create a mock botocore session with no region - mock_session_class = MagicMock() - mock_session_instance = MagicMock() - mock_session_instance.region_name = None - mock_session_class.Session.return_value = mock_session_instance - - # Patch the import statement directly in the function - with patch.dict("sys.modules", {"botocore": MagicMock(session=mock_session_class)}): - region = get_aws_region() - self.assertIsNone(region) - - def test_get_aws_region_botocore_import_error(self): - """Test get_aws_region when botocore import fails""" - with patch("amazon.opentelemetry.distro._utils.is_installed") as mock_is_installed: - mock_is_installed.return_value = True - - # Mock ImportError when trying to import botocore - with patch.dict("sys.modules", {"botocore": None}): - with patch("builtins.__import__", side_effect=ImportError("Botocore not found")): - region = get_aws_region() - self.assertIsNone(region) - - def test_get_aws_region_botocore_attribute_error(self): - """Test get_aws_region when botocore has attribute errors""" - with patch("amazon.opentelemetry.distro._utils.is_installed") as mock_is_installed: - mock_is_installed.return_value = True - - # Mock the botocore import with AttributeError on Session - mock_session_module = MagicMock() - mock_session_module.Session.side_effect = AttributeError("Session class not found") - - # Patch the import statement directly in the function - with patch.dict("sys.modules", {"botocore": MagicMock(session=mock_session_module)}): - region = get_aws_region() - self.assertIsNone(region) + with patch("amazon.opentelemetry.distro._utils.get_aws_session") as mock_get_session: + mock_session = MagicMock() + mock_session.get_config_variable.return_value = None + mock_get_session.return_value = mock_session + + region = get_aws_region() + self.assertIsNone(region) + mock_session.get_config_variable.assert_called_once_with("region") + + def test_get_aws_region_with_aws_region_env(self): + """Test get_aws_region when AWS_REGION environment variable is set""" + os.environ.pop("AWS_REGION", None) + os.environ.pop("AWS_DEFAULT_REGION", None) + os.environ["AWS_REGION"] = "us-west-2" + + region = get_aws_region() + self.assertEqual(region, "us-west-2") + + os.environ.pop("AWS_REGION", None) + + def test_get_aws_region_with_aws_default_region_env(self): + """Test get_aws_region when AWS_DEFAULT_REGION environment variable is set""" + os.environ.pop("AWS_REGION", None) + os.environ.pop("AWS_DEFAULT_REGION", None) + os.environ["AWS_DEFAULT_REGION"] = "eu-west-1" + + region = get_aws_region() + self.assertEqual(region, "eu-west-1") + + os.environ.pop("AWS_DEFAULT_REGION", None)