Skip to content

Commit 7e430b6

Browse files
authored
Consolidate botocore usage in EMF and Sigv4 Exporters (#419)
*Description of changes:* This PR cleans up botocore usages in EMF and Sigv4 exporters and migrates them to the `_utils` class. - Added get_aws_session() function in _utils.py to handle botocore session creation - Refactored get_aws_region() to properly handle AWS_REGION and AWS_DEFAULT_REGION environment variables - Added IS_BOTOCORE_INSTALLED flag to prevent runtime errors for customers not using botocore-dependent features - Added _create_aws_otlp_exporter() function to prevent runtime errors from instantiating AWS OTLP exporter creation if the user does not have botocore installed - Changed OTLP log exporter to allow optional constructor parameters for `x-aws-log-group` and `x-aws-log-stream` headers to consolidate it with EMF exporter Force flush race condition: Added force flush implementation to prevent race conditions in `AwsBatchLogRecordProcessor` as here: #407 By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 80a9fd3 commit 7e430b6

16 files changed

+461
-247
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
from importlib.metadata import PackageNotFoundError, version
66
from logging import Logger, getLogger
7+
from typing import Optional
78

89
from packaging.requirements import Requirement
910

@@ -37,30 +38,40 @@ def is_agent_observability_enabled() -> bool:
3738
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"
3839

3940

40-
def get_aws_region() -> str:
41-
"""Get AWS region using botocore session.
41+
IS_BOTOCORE_INSTALLED: bool = is_installed("botocore")
4242

43-
botocore automatically checks in the following priority order:
44-
1. AWS_REGION environment variable
45-
2. AWS_DEFAULT_REGION environment variable
46-
3. AWS CLI config file (~/.aws/config)
47-
4. EC2 instance metadata service
4843

49-
Returns:
50-
The AWS region if found, None otherwise.
44+
def get_aws_session():
5145
"""
52-
if is_installed("botocore"):
53-
try:
54-
from botocore import session # pylint: disable=import-outside-toplevel
55-
56-
botocore_session = session.Session()
57-
if botocore_session.region_name:
58-
return botocore_session.region_name
59-
except (ImportError, AttributeError):
60-
# botocore failed to determine region
61-
pass
62-
63-
_logger.warning(
64-
"AWS region not found. Please set AWS_REGION environment variable or configure AWS CLI with 'aws configure'."
65-
)
46+
Returns a botocore session only if botocore is installed, otherwise None.
47+
If AWS Region is defined in `AWS_REGION` or `AWS_DEFAULT_REGION` environment variables,
48+
then the region is set in the botocore session before returning.
49+
50+
We do this to prevent runtime errors for ADOT customers that do not need
51+
any features that require botocore.
52+
"""
53+
if IS_BOTOCORE_INSTALLED:
54+
# pylint: disable=import-outside-toplevel
55+
from botocore.session import Session
56+
57+
session = Session()
58+
# Botocore only looks up AWS_DEFAULT_REGION when creating a session/client
59+
# See: https://docs.aws.amazon.com/sdkref/latest/guide/feature-region.html#feature-region-sdk-compat
60+
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
61+
if region:
62+
session.set_config_variable("region", region)
63+
return session
6664
return None
65+
66+
67+
def get_aws_region() -> Optional[str]:
68+
"""Get AWS region from environment or botocore session.
69+
70+
Returns the AWS region in the following priority order:
71+
1. AWS_REGION environment variable
72+
2. AWS_DEFAULT_REGION environment variable
73+
3. botocore session's region (if botocore is available)
74+
4. None if no region can be determined
75+
"""
76+
botocore_session = get_aws_session()
77+
return botocore_session.get_config_variable("region") if botocore_session else None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 86 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE, AWS_SERVICE_TYPE
1414
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
15-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
15+
from amazon.opentelemetry.distro._utils import get_aws_session, is_agent_observability_enabled
1616
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1717
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1818
AttributePropagatingSpanProcessorBuilder,
@@ -23,13 +23,6 @@
2323
AwsMetricAttributesSpanExporterBuilder,
2424
)
2525
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
26-
27-
# pylint: disable=line-too-long
28-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
29-
AwsCloudWatchOtlpBatchLogRecordProcessor,
30-
)
31-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
32-
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
3326
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
3427
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
3528
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
@@ -102,6 +95,8 @@
10295
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
10396
OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS"
10497

98+
XRAY_SERVICE = "xray"
99+
LOGS_SERIVCE = "logs"
105100
AWS_TRACES_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$"
106101
AWS_LOGS_OTLP_ENDPOINT_PATTERN = r"https://logs\.([a-z0-9-]+)\.amazonaws\.com/v1/logs$"
107102

@@ -215,9 +210,9 @@ def _init_logging(
215210

216211
for _, exporter_class in exporters.items():
217212
exporter_args = {}
218-
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
219-
log_processor = _customize_log_record_processor(log_exporter)
220-
provider.add_log_record_processor(log_processor)
213+
_customize_log_record_processor(
214+
logger_provider=provider, log_exporter=_customize_logs_exporter(exporter_class(**exporter_args))
215+
)
221216

222217
event_logger_provider = EventLoggerProvider(logger_provider=provider)
223218
set_event_logger_provider(event_logger_provider)
@@ -304,10 +299,11 @@ def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvide
304299
return
305300

306301
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
302+
if traces_endpoint and _is_aws_otlp_endpoint(traces_endpoint, XRAY_SERVICE):
303+
endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(traces_endpoint)
304+
span_exporter = _create_aws_otlp_exporter(endpoint=endpoint, service=XRAY_SERVICE, region=region)
307305

308-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
309-
310-
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))
306+
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))
311307

312308

313309
def _is_defer_to_workers_enabled():
@@ -356,7 +352,7 @@ def _custom_import_sampler(sampler_name: str, resource: Resource) -> Sampler:
356352
if sampler_name is None:
357353
sampler_name = "parentbased_always_on"
358354

359-
if sampler_name == "xray":
355+
if sampler_name == XRAY_SERVICE:
360356
# Example env var value
361357
# OTEL_TRACES_SAMPLER_ARG=endpoint=http://localhost:2000,polling_interval=360
362358
sampler_argument_env: str = os.getenv(OTEL_TRACES_SAMPLER_ARG, None)
@@ -402,50 +398,52 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
402398
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
403399
span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint)
404400

405-
if _is_aws_otlp_endpoint(traces_endpoint, "xray"):
401+
if traces_endpoint and _is_aws_otlp_endpoint(traces_endpoint, XRAY_SERVICE):
406402
_logger.info("Detected using AWS OTLP Traces Endpoint.")
407403

408404
if isinstance(span_exporter, OTLPSpanExporter):
409-
if is_agent_observability_enabled():
410-
# Span exporter needs an instance of logger provider in ai agent
411-
# observability case because we need to split input/output prompts
412-
# from span attributes and send them to the logs pipeline per
413-
# the new Gen AI semantic convention from OTel
414-
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
415-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
416-
else:
417-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
405+
endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(traces_endpoint)
406+
return _create_aws_otlp_exporter(endpoint=endpoint, service=XRAY_SERVICE, region=region)
418407

419-
else:
420-
_logger.warning(
421-
"Improper configuration see: please export/set "
422-
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf and OTEL_TRACES_EXPORTER=otlp"
423-
)
408+
_logger.warning(
409+
"Improper configuration see: please export/set "
410+
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf and OTEL_TRACES_EXPORTER=otlp"
411+
)
424412

425413
if not _is_application_signals_enabled():
426414
return span_exporter
427415

428416
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
429417

430418

431-
def _customize_log_record_processor(log_exporter: LogExporter):
432-
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
433-
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)
419+
def _customize_log_record_processor(logger_provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None:
420+
if not log_exporter:
421+
return
422+
423+
if is_agent_observability_enabled():
424+
# pylint: disable=import-outside-toplevel
425+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
426+
AwsCloudWatchOtlpBatchLogRecordProcessor,
427+
)
434428

435-
return BatchLogRecordProcessor(exporter=log_exporter)
429+
logger_provider.add_log_record_processor(AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter))
430+
else:
431+
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
436432

437433

438434
def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
439435
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
440436

441-
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
437+
if logs_endpoint and _is_aws_otlp_endpoint(logs_endpoint, LOGS_SERIVCE):
438+
442439
_logger.info("Detected using AWS OTLP Logs Endpoint.")
443440

444441
if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid:
442+
endpoint, region = _extract_endpoint_and_region_from_otlp_endpoint(logs_endpoint)
445443
# Setting default compression mode to Gzip as this is the behavior in upstream's
446444
# collector otlp http exporter:
447445
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
448-
return OTLPAwsLogExporter(endpoint=logs_endpoint)
446+
return _create_aws_otlp_exporter(endpoint=endpoint, service=LOGS_SERIVCE, region=region)
449447

450448
_logger.warning(
451449
"Improper configuration see: please export/set "
@@ -514,7 +512,7 @@ def _customize_metric_exporters(
514512
metric_readers.append(scope_based_periodic_exporting_metric_reader)
515513

516514
if is_emf_enabled:
517-
emf_exporter = create_emf_exporter()
515+
emf_exporter = _create_emf_exporter()
518516
if emf_exporter:
519517
metric_readers.append(PeriodicExportingMetricReader(emf_exporter))
520518

@@ -604,17 +602,24 @@ def _is_lambda_environment():
604602
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ
605603

606604

607-
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool:
605+
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str], service: str) -> bool:
608606
"""Is the given endpoint an AWS OTLP endpoint?"""
609607

610-
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN
611-
612608
if not otlp_endpoint:
613609
return False
614610

611+
pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == XRAY_SERVICE else AWS_LOGS_OTLP_ENDPOINT_PATTERN
612+
615613
return bool(re.match(pattern, otlp_endpoint.lower()))
616614

617615

616+
def _extract_endpoint_and_region_from_otlp_endpoint(endpoint: str):
617+
endpoint = endpoint.lower()
618+
region = endpoint.split(".")[1]
619+
620+
return endpoint, region
621+
622+
618623
def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
619624
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
620625
AWS OTLP Logs endpoint."""
@@ -631,7 +636,6 @@ def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
631636
log_group = None
632637
log_stream = None
633638
namespace = None
634-
filtered_log_headers_count = 0
635639

636640
for pair in logs_headers.split(","):
637641
if "=" in pair:
@@ -640,14 +644,12 @@ def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
640644
value = split[1]
641645
if key == AWS_OTLP_LOGS_GROUP_HEADER and value:
642646
log_group = value
643-
filtered_log_headers_count += 1
644647
elif key == AWS_OTLP_LOGS_STREAM_HEADER and value:
645648
log_stream = value
646-
filtered_log_headers_count += 1
647649
elif key == AWS_EMF_METRICS_NAMESPACE and value:
648650
namespace = value
649651

650-
is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None
652+
is_valid = log_group is not None and log_stream is not None
651653

652654
if not is_valid:
653655
_logger.warning(
@@ -769,11 +771,12 @@ def _check_emf_exporter_enabled() -> bool:
769771
return True
770772

771773

772-
def create_emf_exporter():
774+
def _create_emf_exporter():
773775
"""Create and configure the CloudWatch EMF exporter."""
774776
try:
777+
session = get_aws_session()
775778
# Check if botocore is available before importing the EMF exporter
776-
if not is_installed("botocore"):
779+
if not session:
777780
_logger.warning("botocore is not installed. EMF exporter requires botocore")
778781
return None
779782

@@ -788,6 +791,7 @@ def create_emf_exporter():
788791
return None
789792

790793
return AwsCloudWatchEmfExporter(
794+
session=session,
791795
namespace=log_header_setting.namespace,
792796
log_group_name=log_header_setting.log_group,
793797
log_stream_name=log_header_setting.log_stream,
@@ -796,3 +800,39 @@ def create_emf_exporter():
796800
except Exception as errors:
797801
_logger.error("Failed to create EMF exporter: %s", errors)
798802
return None
803+
804+
805+
def _create_aws_otlp_exporter(endpoint: str, service: str, region: str):
806+
"""Create and configure the AWS OTLP exporters."""
807+
try:
808+
session = get_aws_session()
809+
# Check if botocore is available before importing the AWS exporter
810+
if not session:
811+
_logger.warning("Sigv4 Auth requires botocore to be enabled")
812+
return None
813+
814+
# pylint: disable=import-outside-toplevel
815+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
816+
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
817+
818+
if service == XRAY_SERVICE:
819+
if is_agent_observability_enabled():
820+
# Span exporter needs an instance of logger provider in ai agent
821+
# observability case because we need to split input/output prompts
822+
# from span attributes and send them to the logs pipeline per
823+
# the new Gen AI semantic convention from OTel
824+
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
825+
return OTLPAwsSpanExporter(
826+
session=session, endpoint=endpoint, aws_region=region, logger_provider=get_logger_provider()
827+
)
828+
829+
return OTLPAwsSpanExporter(session=session, endpoint=endpoint, aws_region=region)
830+
831+
if service == LOGS_SERIVCE:
832+
return OTLPAwsLogExporter(session=session, aws_region=region)
833+
834+
return None
835+
# pylint: disable=broad-exception-caught
836+
except Exception as errors:
837+
_logger.error("Failed to create AWS OTLP exporter: %s", errors)
838+
return None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ def _configure(self, **kwargs):
8686
# Set GenAI capture content default
8787
os.environ.setdefault(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "true")
8888

89-
# Set OTLP endpoints with AWS region if not already set
9089
region = get_aws_region()
90+
91+
# Set OTLP endpoints with AWS region if not already set
9192
if region:
9293
os.environ.setdefault(
9394
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, f"https://xray.{region}.amazonaws.com/v1/traces"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import uuid
99
from typing import Any, Dict, List, Optional
1010

11-
import botocore.session
1211
from botocore.exceptions import ClientError
12+
from botocore.session import Session
1313

1414
logger = logging.getLogger(__name__)
1515

@@ -90,6 +90,7 @@ class CloudWatchLogClient:
9090
def __init__(
9191
self,
9292
log_group_name: str,
93+
session: Session,
9394
log_stream_name: Optional[str] = None,
9495
aws_region: Optional[str] = None,
9596
**kwargs,
@@ -105,8 +106,6 @@ def __init__(
105106
"""
106107
self.log_group_name = log_group_name
107108
self.log_stream_name = log_stream_name or self._generate_log_stream_name()
108-
109-
session = botocore.session.Session()
110109
self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs)
111110

112111
# Event batch to store logs before sending to CloudWatch

0 commit comments

Comments
 (0)