Skip to content

Commit 77c700c

Browse files
committed
Merge remote-tracking branch 'upstream/consolidate-aws-setup' into genesis-logs
2 parents 76e4b47 + 0b43920 commit 77c700c

File tree

16 files changed

+263
-215
lines changed

16 files changed

+263
-215
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
from importlib.metadata import PackageNotFoundError, version
66
from logging import Logger, getLogger
7+
from typing import Optional
78

89
from packaging.requirements import Requirement
910

@@ -37,30 +38,22 @@ def is_agent_observability_enabled() -> bool:
3738
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"
3839

3940

40-
def get_aws_region() -> str:
41-
"""Get AWS region using botocore session.
42-
43-
botocore automatically checks in the following priority order:
44-
1. AWS_REGION environment variable
45-
2. AWS_DEFAULT_REGION environment variable
46-
3. AWS CLI config file (~/.aws/config)
47-
4. EC2 instance metadata service
48-
49-
Returns:
50-
The AWS region if found, None otherwise.
51-
"""
52-
if is_installed("botocore"):
53-
try:
54-
from botocore import session # pylint: disable=import-outside-toplevel
55-
56-
botocore_session = session.Session()
57-
if botocore_session.region_name:
58-
return botocore_session.region_name
59-
except (ImportError, AttributeError):
60-
# botocore failed to determine region
61-
pass
62-
63-
_logger.warning(
64-
"AWS region not found. Please set AWS_REGION environment variable or configure AWS CLI with 'aws configure'."
65-
)
41+
IS_BOTOCORE_INSTALLED: bool = is_installed("botocore")
42+
43+
44+
def get_aws_session():
45+
if IS_BOTOCORE_INSTALLED:
46+
# pylint: disable=import-outside-toplevel
47+
from botocore.session import Session
48+
49+
session = Session()
50+
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
51+
if region:
52+
session.set_config_variable("region", region)
53+
return session
6654
return None
55+
56+
57+
def get_aws_region() -> Optional[str]:
58+
botocore_session = get_aws_session()
59+
return botocore_session.get_config_variable("region") if botocore_session else None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE, AWS_SERVICE_TYPE
1414
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
15-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
15+
from amazon.opentelemetry.distro._utils import IS_BOTOCORE_INSTALLED, get_aws_session, is_agent_observability_enabled
1616
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1717
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1818
AttributePropagatingSpanProcessorBuilder,
@@ -23,11 +23,6 @@
2323
AwsMetricAttributesSpanExporterBuilder,
2424
)
2525
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
26-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import (
27-
AwsCloudWatchOtlpBatchLogRecordProcessor,
28-
)
29-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
30-
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
3126
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
3227
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
3328
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
@@ -214,8 +209,7 @@ def _init_logging(
214209
for _, exporter_class in exporters.items():
215210
exporter_args = {}
216211
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
217-
log_processor = _customize_log_record_processor(log_exporter)
218-
provider.add_log_record_processor(log_processor)
212+
_customize_log_record_processor(provider, log_exporter)
219213

220214
event_logger_provider = EventLoggerProvider(logger_provider=provider)
221215
set_event_logger_provider(event_logger_provider)
@@ -303,7 +297,7 @@ def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvide
303297

304298
traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
305299

306-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
300+
span_exporter = _create_aws_exporter(endpoint=traces_endpoint)
307301

308302
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))
309303

@@ -404,15 +398,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
404398
_logger.info("Detected using AWS OTLP Traces Endpoint.")
405399

406400
if isinstance(span_exporter, OTLPSpanExporter):
407-
if is_agent_observability_enabled():
408-
# Span exporter needs an instance of logger provider in ai agent
409-
# observability case because we need to split input/output prompts
410-
# from span attributes and send them to the logs pipeline per
411-
# the new Gen AI semantic convention from OTel
412-
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
413-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
414-
else:
415-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
401+
return _create_aws_exporter(endpoint=traces_endpoint)
416402

417403
else:
418404
_logger.warning(
@@ -426,14 +412,20 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
426412
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
427413

428414

429-
def _customize_log_record_processor(log_exporter: LogExporter):
430-
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
431-
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)
415+
def _customize_log_record_processor(provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None:
416+
if log_exporter is None:
417+
return
418+
if is_agent_observability_enabled() and IS_BOTOCORE_INSTALLED:
419+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import (
420+
AwsCloudWatchOtlpBatchLogRecordProcessor,
421+
)
432422

433-
return BatchLogRecordProcessor(exporter=log_exporter)
423+
provider.add_log_record_processor(AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter))
424+
else:
425+
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
434426

435427

436-
def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
428+
def _customize_logs_exporter(log_exporter: LogExporter) -> Optional[LogExporter]:
437429
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
438430

439431
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
@@ -443,7 +435,7 @@ def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
443435
# Setting default compression mode to Gzip as this is the behavior in upstream's
444436
# collector otlp http exporter:
445437
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
446-
return OTLPAwsLogExporter(endpoint=logs_endpoint)
438+
return _create_aws_exporter(endpoint=logs_endpoint)
447439

448440
_logger.warning(
449441
"Improper configuration see: please export/set "
@@ -770,8 +762,9 @@ def _check_emf_exporter_enabled() -> bool:
770762
def create_emf_exporter():
771763
"""Create and configure the CloudWatch EMF exporter."""
772764
try:
765+
session = get_aws_session()
773766
# Check if botocore is available before importing the EMF exporter
774-
if not is_installed("botocore"):
767+
if not session:
775768
_logger.warning("botocore is not installed. EMF exporter requires botocore")
776769
return None
777770

@@ -786,6 +779,7 @@ def create_emf_exporter():
786779
return None
787780

788781
return AwsCloudWatchEmfExporter(
782+
session=session,
789783
namespace=log_header_setting.namespace,
790784
log_group_name=log_header_setting.log_group,
791785
log_stream_name=log_header_setting.log_stream,
@@ -794,3 +788,43 @@ def create_emf_exporter():
794788
except Exception as errors:
795789
_logger.error("Failed to create EMF exporter: %s", errors)
796790
return None
791+
792+
793+
def _create_aws_exporter(endpoint: str):
794+
"""Create and configure the AWS OTLP exporters."""
795+
try:
796+
session = get_aws_session()
797+
# Check if botocore is available before importing the AWS exporter
798+
if not session:
799+
_logger.warning("SigV4 Auth requires botocore to be enabled")
800+
return None
801+
802+
# pylint: disable=import-outside-toplevel
803+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
804+
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
805+
806+
endpoint = endpoint.lower()
807+
split = endpoint.split(".")
808+
service = split[0]
809+
region = split[1]
810+
811+
if "xray" in service:
812+
if is_agent_observability_enabled():
813+
# Span exporter needs an instance of logger provider in ai agent
814+
# observability case because we need to split input/output prompts
815+
# from span attributes and send them to the logs pipeline per
816+
# the new Gen AI semantic convention from OTel
817+
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
818+
return OTLPAwsSpanExporter(
819+
session=session, endpoint=endpoint, aws_region=region, logger_provider=get_logger_provider()
820+
)
821+
822+
return OTLPAwsSpanExporter(session=session, endpoint=endpoint, aws_region=region)
823+
824+
if "logs" in service:
825+
return OTLPAwsLogExporter(session=session, aws_region=region)
826+
827+
# pylint: disable=broad-exception-caught
828+
except Exception as errors:
829+
_logger.error("Failed to create AWS OTLP exporter: %s", errors)
830+
return None

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ def _configure(self, **kwargs):
8686
# Set GenAI capture content default
8787
os.environ.setdefault(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "true")
8888

89-
# Set OTLP endpoints with AWS region if not already set
9089
region = get_aws_region()
90+
91+
# Set OTLP endpoints with AWS region if not already set
9192
if region:
9293
os.environ.setdefault(
9394
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, f"https://xray.{region}.amazonaws.com/v1/traces"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/aws/metrics/_cloudwatch_log_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import uuid
99
from typing import Any, Dict, List, Optional
1010

11-
import botocore.session
1211
from botocore.exceptions import ClientError
12+
from botocore.session import Session
1313

1414
logger = logging.getLogger(__name__)
1515

@@ -90,6 +90,7 @@ class CloudWatchLogClient:
9090
def __init__(
9191
self,
9292
log_group_name: str,
93+
session: Session = Session(),
9394
log_stream_name: Optional[str] = None,
9495
aws_region: Optional[str] = None,
9596
**kwargs,
@@ -105,8 +106,6 @@ def __init__(
105106
"""
106107
self.log_group_name = log_group_name
107108
self.log_stream_name = log_stream_name or self._generate_log_stream_name()
108-
109-
session = botocore.session.Session()
110109
self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs)
111110

112111
# Event batch to store logs before sending to CloudWatch

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/common/aws_auth_session.py

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import logging
55

66
import requests
7-
8-
from amazon.opentelemetry.distro._utils import is_installed
7+
from botocore.auth import SigV4Auth
8+
from botocore.awsrequest import AWSRequest
9+
from botocore.session import Session
910

1011
_logger = logging.getLogger(__name__)
1112

@@ -33,57 +34,36 @@ class AwsAuthSession(requests.Session):
3334
service (str): The AWS service name for signing (e.g., "logs" or "xray")
3435
"""
3536

36-
def __init__(self, aws_region, service):
37-
38-
self._has_required_dependencies = False
39-
40-
# Requires botocore to be installed to sign the headers. However,
41-
# some users might not need to use this authenticator. In order not conflict
42-
# with existing behavior, we check for botocore before initializing this exporter.
43-
44-
if aws_region and service and is_installed("botocore"):
45-
# pylint: disable=import-outside-toplevel
46-
from botocore import auth, awsrequest, session
47-
48-
self._boto_auth = auth
49-
self._boto_aws_request = awsrequest
50-
self._boto_session = session.Session()
51-
52-
self._aws_region = aws_region
53-
self._service = service
54-
self._has_required_dependencies = True
55-
56-
else:
57-
_logger.error(
58-
"botocore is required to enable SigV4 Authentication. Please install it using `pip install botocore`",
59-
)
37+
def __init__(self, aws_region: str, service: str, session: Session = Session()):
38+
self._aws_region: str = aws_region
39+
self._service: str = service
40+
self._session: Session = session
6041

6142
super().__init__()
6243

6344
def request(self, method, url, *args, data=None, headers=None, **kwargs):
64-
if self._has_required_dependencies:
65-
66-
credentials = self._boto_session.get_credentials()
67-
68-
if credentials is not None:
69-
signer = self._boto_auth.SigV4Auth(credentials, self._service, self._aws_region)
70-
71-
request = self._boto_aws_request.AWSRequest(
72-
method="POST",
73-
url=url,
74-
data=data,
75-
headers={"Content-Type": "application/x-protobuf"},
76-
)
45+
credentials = self._session.get_credentials()
46+
47+
if credentials:
48+
signer = SigV4Auth(credentials, self._service, self._aws_region)
49+
request = AWSRequest(
50+
method="POST",
51+
url=url,
52+
data=data,
53+
headers={"Content-Type": "application/x-protobuf"},
54+
)
7755

78-
try:
79-
signer.add_auth(request)
56+
try:
57+
signer.add_auth(request)
8058

81-
if headers is None:
82-
headers = {}
59+
if headers is None:
60+
headers = {}
8361

84-
headers.update(dict(request.headers))
62+
headers.update(dict(request.headers))
8563

86-
except Exception as signing_error: # pylint: disable=broad-except
87-
_logger.error("Failed to sign request: %s", signing_error)
64+
except Exception as signing_error: # pylint: disable=broad-except
65+
_logger.error("Failed to sign request: %s", signing_error)
66+
else:
67+
_logger.error("Failed to load AWS Credentials: %s")
8868

8969
return super().request(method=method, url=url, *args, data=data, headers=headers, **kwargs)

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,19 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
175175
if next_val is None:
176176
continue
177177

178+
if isinstance(next_val, bool):
179+
size += 4 if next_val else 5
180+
continue
181+
178182
if isinstance(next_val, (str, bytes)):
179183
size += len(next_val)
180184
continue
181185

182-
if isinstance(next_val, (float, int, bool)):
186+
if isinstance(next_val, (float, int)):
183187
size += len(str(next_val))
184188
continue
185189

186-
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"]
187-
# See: https://github.com/open-telemetry/opentelemetry-python/blob/\
188-
# 9426d6da834cfb4df7daedd4426bba0aa83165b5/opentelemetry-api/src/opentelemetry/util/types.py#L20
190+
# next_val must be Sequence["AnyValue"] or Mapping[str, "AnyValue"],
189191
if current_depth <= depth:
190192
obj_id = id(
191193
next_val
@@ -210,3 +212,12 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
210212
queue = new_queue
211213

212214
return size
215+
216+
# Only export the logs once to avoid the race condition of the worker thread and force flush thread
217+
# https://github.com/open-telemetry/opentelemetry-python/issues/3193
218+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L199
219+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
220+
if self._shutdown:
221+
return False
222+
self._export(BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH)
223+
return True

0 commit comments

Comments
 (0)