Skip to content

Commit 5260e90

Browse files
committed
conolidate botocore sessions into utils
1 parent e1ff7b2 commit 5260e90

File tree

14 files changed

+202
-211
lines changed

14 files changed

+202
-211
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
from importlib.metadata import PackageNotFoundError, version
66
from logging import Logger, getLogger
7+
from typing import Optional
78

89
from packaging.requirements import Requirement
910

@@ -37,30 +38,18 @@ def is_agent_observability_enabled() -> bool:
3738
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"
3839

3940

40-
def get_aws_region() -> str:
41-
"""Get AWS region using botocore session.
42-
43-
botocore automatically checks in the following priority order:
44-
1. AWS_REGION environment variable
45-
2. AWS_DEFAULT_REGION environment variable
46-
3. AWS CLI config file (~/.aws/config)
47-
4. EC2 instance metadata service
48-
49-
Returns:
50-
The AWS region if found, None otherwise.
51-
"""
52-
if is_installed("botocore"):
53-
try:
54-
from botocore import session # pylint: disable=import-outside-toplevel
55-
56-
botocore_session = session.Session()
57-
if botocore_session.region_name:
58-
return botocore_session.region_name
59-
except (ImportError, AttributeError):
60-
# botocore failed to determine region
61-
pass
62-
63-
_logger.warning(
64-
"AWS region not found. Please set AWS_REGION environment variable or configure AWS CLI with 'aws configure'."
65-
)
41+
IS_BOTOCORE_INSTALLED: bool = is_installed("botocore")
42+
43+
44+
def get_aws_session():
45+
if IS_BOTOCORE_INSTALLED:
46+
# pylint: disable=import-outside-toplevel
47+
from botocore.session import Session
48+
49+
return Session()
6650
return None
51+
52+
53+
def get_aws_region() -> Optional[str]:
54+
botocore_session = get_aws_session()
55+
return botocore_session.get_config_variable("region") if botocore_session else None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
1414
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
15-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
15+
from amazon.opentelemetry.distro._utils import IS_BOTOCORE_INSTALLED, get_aws_session, is_agent_observability_enabled
1616
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1717
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1818
AttributePropagatingSpanProcessorBuilder,
@@ -23,11 +23,6 @@
2323
AwsMetricAttributesSpanExporterBuilder,
2424
)
2525
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
26-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import (
27-
AwsCloudWatchOtlpBatchLogRecordProcessor,
28-
)
29-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
30-
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
3126
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
3227
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
3328
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
@@ -214,8 +209,7 @@ def _init_logging(
214209
for _, exporter_class in exporters.items():
215210
exporter_args = {}
216211
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
217-
log_processor = _customize_log_record_processor(log_exporter)
218-
provider.add_log_record_processor(log_processor)
212+
_customize_log_record_processor(provider, log_exporter)
219213

220214
event_logger_provider = EventLoggerProvider(logger_provider=provider)
221215
set_event_logger_provider(event_logger_provider)
@@ -303,7 +297,7 @@ def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvide
303297

304298
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
305299

306-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
300+
span_exporter = _create_aws_exporter(endpoint=traces_endpoint)
307301

308302
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))
309303

@@ -404,15 +398,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
404398
_logger.info("Detected using AWS OTLP Traces Endpoint.")
405399

406400
if isinstance(span_exporter, OTLPSpanExporter):
407-
if is_agent_observability_enabled():
408-
# Span exporter needs an instance of logger provider in ai agent
409-
# observability case because we need to split input/output prompts
410-
# from span attributes and send them to the logs pipeline per
411-
# the new Gen AI semantic convention from OTel
412-
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
413-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
414-
else:
415-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
401+
return _create_aws_exporter(endpoint=traces_endpoint)
416402

417403
else:
418404
_logger.warning(
@@ -426,14 +412,20 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
426412
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
427413

428414

429-
def _customize_log_record_processor(log_exporter: LogExporter):
430-
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
431-
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)
415+
def _customize_log_record_processor(provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None:
416+
if log_exporter is None:
417+
return
418+
if is_agent_observability_enabled() and IS_BOTOCORE_INSTALLED:
419+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import (
420+
AwsCloudWatchOtlpBatchLogRecordProcessor,
421+
)
432422

433-
return BatchLogRecordProcessor(exporter=log_exporter)
423+
provider.add_log_record_processor(AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter))
424+
else:
425+
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
434426

435427

436-
def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
428+
def _customize_logs_exporter(log_exporter: LogExporter) -> Optional[LogExporter]:
437429
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
438430

439431
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
@@ -443,7 +435,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
443435
# Setting default compression mode to Gzip as this is the behavior in upstream's
444436
# collector otlp http exporter:
445437
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
446-
return OTLPAwsLogExporter(endpoint=logs_endpoint)
438+
return _create_aws_exporter(endpoint=logs_endpoint)
447439

448440
_logger.warning(
449441
"Improper configuration see: please export/set "
@@ -762,8 +754,9 @@ def _check_emf_exporter_enabled() -> bool:
762754
def create_emf_exporter():
763755
"""Create and configure the CloudWatch EMF exporter."""
764756
try:
757+
session = get_aws_session()
765758
# Check if botocore is available before importing the EMF exporter
766-
if not is_installed("botocore"):
759+
if not session:
767760
_logger.warning("botocore is not installed. EMF exporter requires botocore")
768761
return None
769762

@@ -778,6 +771,7 @@ def create_emf_exporter():
778771
return None
779772

780773
return AwsCloudWatchEmfExporter(
774+
session=session,
781775
namespace=log_header_setting.namespace,
782776
log_group_name=log_header_setting.log_group,
783777
log_stream_name=log_header_setting.log_stream,
@@ -786,3 +780,43 @@ def create_emf_exporter():
786780
except Exception as errors:
787781
_logger.error("Failed to create EMF exporter: %s", errors)
788782
return None
783+
784+
785+
def _create_aws_exporter(endpoint: str):
786+
"""Create and configure the AWS OTLP exporters."""
787+
try:
788+
session = get_aws_session()
789+
# Check if botocore is available before importing the AWS exporter
790+
if not session:
791+
_logger.warning("SigV4 Auth requires botocore to be enabled")
792+
return None
793+
794+
# pylint: disable=import-outside-toplevel
795+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
796+
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
797+
798+
endpoint = endpoint.lower()
799+
split = endpoint.split(".")
800+
service = split[0]
801+
region = split[1]
802+
803+
if "xray" in service:
804+
if is_agent_observability_enabled():
805+
# Span exporter needs an instance of logger provider in ai agent
806+
# observability case because we need to split input/output prompts
807+
# from span attributes and send them to the logs pipeline per
808+
# the new Gen AI semantic convention from OTel
809+
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
810+
return OTLPAwsSpanExporter(
811+
session=session, endpoint=endpoint, aws_region=region, logger_provider=get_logger_provider()
812+
)
813+
814+
return OTLPAwsSpanExporter(session=session, endpoint=endpoint, aws_region=region)
815+
816+
if "logs" in service:
817+
return OTLPAwsLogExporter(session=session, aws_region=region)
818+
819+
# pylint: disable=broad-exception-caught
820+
except Exception as errors:
821+
_logger.error("Failed to create AWS OTLP exporter: %s", errors)
822+
return None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ def _configure(self, **kwargs):
8888
# Set GenAI capture content default
8989
os.environ.setdefault(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "true")
9090

91-
# Set OTLP endpoints with AWS region if not already set
9291
region = get_aws_region()
92+
93+
# Set OTLP endpoints with AWS region if not already set
9394
if region:
9495
os.environ.setdefault(
9596
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, f"https://xray.{region}.amazonaws.com/v1/traces"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import uuid
99
from typing import Any, Dict, List, Optional
1010

11-
import botocore.session
1211
from botocore.exceptions import ClientError
12+
from botocore.session import Session
1313

1414
logger = logging.getLogger(__name__)
1515

@@ -90,6 +90,7 @@ class CloudWatchLogClient:
9090
def __init__(
9191
self,
9292
log_group_name: str,
93+
session: Session = Session(),
9394
log_stream_name: Optional[str] = None,
9495
aws_region: Optional[str] = None,
9596
**kwargs,
@@ -105,8 +106,6 @@ def __init__(
105106
"""
106107
self.log_group_name = log_group_name
107108
self.log_stream_name = log_stream_name or self._generate_log_stream_name()
108-
109-
session = botocore.session.Session()
110109
self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs)
111110

112111
# Event batch to store logs before sending to CloudWatch

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/aws_auth_session.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import logging
55

66
import requests
7-
8-
from amazon.opentelemetry.distro._utils import is_installed
7+
from botocore.auth import SigV4Auth
8+
from botocore.awsrequest import AWSRequest
9+
from botocore.session import Session
910

1011
_logger = logging.getLogger(__name__)
1112

@@ -33,57 +34,36 @@ class AwsAuthSession(requests.Session):
3334
service (str): The AWS service name for signing (e.g., "logs" or "xray")
3435
"""
3536

36-
def __init__(self, aws_region, service):
37-
38-
self._has_required_dependencies = False
39-
40-
# Requires botocore to be installed to sign the headers. However,
41-
# some users might not need to use this authenticator. In order not conflict
42-
# with existing behavior, we check for botocore before initializing this exporter.
43-
44-
if aws_region and service and is_installed("botocore"):
45-
# pylint: disable=import-outside-toplevel
46-
from botocore import auth, awsrequest, session
47-
48-
self._boto_auth = auth
49-
self._boto_aws_request = awsrequest
50-
self._boto_session = session.Session()
51-
52-
self._aws_region = aws_region
53-
self._service = service
54-
self._has_required_dependencies = True
55-
56-
else:
57-
_logger.error(
58-
"botocore is required to enable SigV4 Authentication. Please install it using `pip install botocore`",
59-
)
37+
def __init__(self, aws_region: str, service: str, session: Session = Session()):
38+
self._aws_region: str = aws_region
39+
self._service: str = service
40+
self._session: Session = session
6041

6142
super().__init__()
6243

6344
def request(self, method, url, *args, data=None, headers=None, **kwargs):
64-
if self._has_required_dependencies:
65-
66-
credentials = self._boto_session.get_credentials()
67-
68-
if credentials is not None:
69-
signer = self._boto_auth.SigV4Auth(credentials, self._service, self._aws_region)
70-
71-
request = self._boto_aws_request.AWSRequest(
72-
method="POST",
73-
url=url,
74-
data=data,
75-
headers={"Content-Type": "application/x-protobuf"},
76-
)
45+
credentials = self._session.get_credentials()
46+
47+
if credentials:
48+
signer = SigV4Auth(credentials, self._service, self._aws_region)
49+
request = AWSRequest(
50+
method="POST",
51+
url=url,
52+
data=data,
53+
headers={"Content-Type": "application/x-protobuf"},
54+
)
7755

78-
try:
79-
signer.add_auth(request)
56+
try:
57+
signer.add_auth(request)
8058

81-
if headers is None:
82-
headers = {}
59+
if headers is None:
60+
headers = {}
8361

84-
headers.update(dict(request.headers))
62+
headers.update(dict(request.headers))
8563

86-
except Exception as signing_error: # pylint: disable=broad-except
87-
_logger.error("Failed to sign request: %s", signing_error)
64+
except Exception as signing_error: # pylint: disable=broad-except
65+
_logger.error("Failed to sign request: %s", signing_error)
66+
else:
67+
_logger.error("Failed to load AWS Credentials: %s")
8868

8969
return super().request(method=method, url=url, *args, data=data, headers=headers, **kwargs)

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from time import sleep
88
from typing import Dict, Optional, Sequence
99

10+
from botocore.session import Session
1011
from requests import Response
1112
from requests.exceptions import ConnectionError as RequestsConnectionError
1213
from requests.structures import CaseInsensitiveDict
@@ -28,17 +29,25 @@ class OTLPAwsLogExporter(OTLPLogExporter):
2829

2930
def __init__(
3031
self,
32+
aws_region: str,
33+
session: Session = Session(),
34+
log_group: Optional[str] = None,
35+
log_stream: Optional[str] = None,
3136
endpoint: Optional[str] = None,
3237
certificate_file: Optional[str] = None,
3338
client_key_file: Optional[str] = None,
3439
client_certificate_file: Optional[str] = None,
3540
headers: Optional[Dict[str, str]] = None,
3641
timeout: Optional[int] = None,
3742
):
38-
self._aws_region = None
43+
self._aws_region = aws_region
3944

40-
if endpoint:
41-
self._aws_region = endpoint.split(".")[1]
45+
if log_group and log_stream:
46+
log_headers = {"x-aws-log-group": log_group, "x-aws-log-stream": log_stream}
47+
if headers:
48+
headers.update(log_headers)
49+
else:
50+
headers = log_headers
4251

4352
OTLPLogExporter.__init__(
4453
self,
@@ -49,7 +58,7 @@ def __init__(
4958
headers,
5059
timeout,
5160
compression=Compression.Gzip,
52-
session=AwsAuthSession(aws_region=self._aws_region, service="logs"),
61+
session=AwsAuthSession(session=session, aws_region=self._aws_region, service="logs"),
5362
)
5463

5564
def export(self, batch: Sequence[LogData]) -> LogExportResult:

0 commit comments

Comments
 (0)