Skip to content
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
c34f3b9
add logs pipeline
liustve Jun 18, 2025
010e7df
add logs pipeline
liustve Jun 20, 2025
24f4308
Merge remote-tracking branch 'origin/mainline' into logs-mainline
liustve Jun 20, 2025
b75fe99
linting fix
liustve Jun 20, 2025
d588605
linting fix
liustve Jun 20, 2025
c78aca5
linting fix
liustve Jun 20, 2025
12eca32
linting fix
liustve Jun 20, 2025
83ec370
linting fix
liustve Jun 20, 2025
79bbf46
linting fix
liustve Jun 20, 2025
b6e1b97
remove gen ai handling logic
liustve Jun 23, 2025
17d0f90
fixed linting
liustve Jun 23, 2025
3d12858
refactor _init_logging to 1.33.1 version
liustve Jun 24, 2025
7f90bc7
refactored batch log record processor
liustve Jun 24, 2025
fdddb7a
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve Jun 24, 2025
4b7bb0e
linting
liustve Jun 24, 2025
8c64adb
lint fix
liustve Jun 24, 2025
01e3fd8
update configuration and tests
liustve Jun 24, 2025
2f0268c
lint fix
liustve Jun 24, 2025
7dbcb7e
linting fix
liustve Jun 24, 2025
886b009
Merge remote-tracking branch 'upstream/main' into logs-mainline
liustve Jun 28, 2025
48258c3
linting fix
liustve Jun 28, 2025
57bc772
add cycle detection
liustve Jun 28, 2025
7da6c75
add cycle detection unit tests
liustve Jun 28, 2025
ddea404
linting fix
liustve Jun 28, 2025
87c08bc
linting fix
liustve Jun 28, 2025
e68c4fd
linting fix
liustve Jun 28, 2025
93e2836
add cycle detection
liustve Jun 28, 2025
e1ff7b2
log processor race condition fix
liustve Jun 28, 2025
cb21d39
add comment about termination of loop
liustve Jun 30, 2025
5260e90
conolidate botocore sessions into utils
liustve Jun 30, 2025
0b43920
add explicit setting for aws region from environment variables
liustve Jun 30, 2025
651f283
Merge branch 'main' into logs-mainline
liustve Jul 1, 2025
ff2fb5d
refactored otlp aws log exporter, add comments aws batch log processor
liustve Jul 1, 2025
153679a
Merge branch 'main' into logs-mainline
liustve Jul 1, 2025
bce91dc
linting fix
liustve Jul 1, 2025
8479ac9
Merge branch 'logs-mainline' of https://github.com/liustve/aws-otel-p…
liustve Jul 1, 2025
76e4b47
remove shut down check before sleep
liustve Jul 1, 2025
77c700c
Merge remote-tracking branch 'upstream/consolidate-aws-setup' into ge…
liustve Jul 1, 2025
f0ebea2
linting fix
liustve Jul 1, 2025
6dd6a67
add better estimation for non-ascii characters
liustve Jul 2, 2025
502eb01
linting + formatting fix
liustve Jul 2, 2025
b30ad4f
fix unit test
liustve Jul 2, 2025
8b7e671
linting fix
liustve Jul 2, 2025
dc98cf8
add interruptible shutdown
liustve Jul 2, 2025
3450a11
fix sleep unit tests + renaming aws batch log processor
liustve Jul 2, 2025
7a83e92
linting fix
liustve Jul 2, 2025
a38d43d
fix test
liustve Jul 2, 2025
726a9a8
linting fix
liustve Jul 2, 2025
fc77123
linting fix
liustve Jul 2, 2025
f571ffb
linting fix
liustve Jul 2, 2025
f7fcaaa
Merge remote-tracking branch 'liustve/logs-mainline' into genesis-logs
liustve Jul 2, 2025
c7aeac7
Merge remote-tracking branch 'upstream/main' into genesis-logs
liustve Jul 2, 2025
d7575b8
fixing consolidation changes
liustve Jul 3, 2025
17d4cb9
add comments
liustve Jul 3, 2025
e8a1ec9
linting fix
liustve Jul 3, 2025
9f84183
update tests
liustve Jul 3, 2025
fb7f6c8
linting fix
liustve Jul 3, 2025
22a1bd1
addressed PR comments
liustve Jul 3, 2025
6b50a03
typo fix
liustve Jul 3, 2025
39ce39a
point logs to LOGS_SERVICE
liustve Jul 3, 2025
d1aebbb
removed unncessary comment
liustve Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
from importlib.metadata import PackageNotFoundError, version
from logging import Logger, getLogger
from typing import Optional

from packaging.requirements import Requirement

Expand Down Expand Up @@ -37,30 +38,37 @@ def is_agent_observability_enabled() -> bool:
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"


def get_aws_region() -> str:
"""Get AWS region using botocore session.
IS_BOTOCORE_INSTALLED: bool = is_installed("botocore")

botocore automatically checks in the following priority order:
1. AWS_REGION environment variable
2. AWS_DEFAULT_REGION environment variable
3. AWS CLI config file (~/.aws/config)
4. EC2 instance metadata service

Returns:
The AWS region if found, None otherwise.
def get_aws_session():
"""Returns a botocore session only if botocore is installed, otherwise None.

We do this to prevent runtime errors for ADOT customers that do not need
any features that require botocore.
"""
if is_installed("botocore"):
try:
from botocore import session # pylint: disable=import-outside-toplevel

botocore_session = session.Session()
if botocore_session.region_name:
return botocore_session.region_name
except (ImportError, AttributeError):
# botocore failed to determine region
pass

_logger.warning(
"AWS region not found. Please set AWS_REGION environment variable or configure AWS CLI with 'aws configure'."
)
if IS_BOTOCORE_INSTALLED:
# pylint: disable=import-outside-toplevel
from botocore.session import Session

session = Session()
# Botocore only looks up AWS_DEFAULT_REGION when creating a session/client
# See: https://docs.aws.amazon.com/sdkref/latest/guide/feature-region.html#feature-region-sdk-compat
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
if region:
session.set_config_variable("region", region)
return session
return None


def get_aws_region() -> Optional[str]:
"""Get AWS region from environment or botocore session.

Returns the AWS region in the following priority order:
1. AWS_REGION environment variable
2. AWS_DEFAULT_REGION environment variable
3. botocore session's region (if botocore is available)
4. None if no region can be determined
"""
botocore_session = get_aws_session()
return botocore_session.get_config_variable("region") if botocore_session else None
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE, AWS_SERVICE_TYPE
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
from amazon.opentelemetry.distro._utils import get_aws_session, is_agent_observability_enabled
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
AttributePropagatingSpanProcessorBuilder,
Expand All @@ -23,13 +23,6 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder

# pylint: disable=line-too-long
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
AwsCloudWatchOtlpBatchLogRecordProcessor,
)
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
Expand Down Expand Up @@ -215,9 +208,9 @@ def _init_logging(

for _, exporter_class in exporters.items():
exporter_args = {}
log_exporter: LogExporter = _customize_logs_exporter(exporter_class(**exporter_args))
log_processor = _customize_log_record_processor(log_exporter)
provider.add_log_record_processor(log_processor)
_customize_log_record_processor(
logger_provider=provider, log_exporter=_customize_logs_exporter(exporter_class(**exporter_args))
)

event_logger_provider = EventLoggerProvider(logger_provider=provider)
set_event_logger_provider(event_logger_provider)
Expand Down Expand Up @@ -304,10 +297,12 @@ def _export_unsampled_span_for_agent_observability(trace_provider: TracerProvide
return

traces_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
if traces_endpoint and _is_aws_otlp_endpoint(traces_endpoint):
endpoint = traces_endpoint.lower()
region = endpoint.split(".")[1]

span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())

trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))
span_exporter = _create_aws_otlp_exporter(endpoint=endpoint, service="xray", region=region)
trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter=span_exporter))


def _is_defer_to_workers_enabled():
Expand Down Expand Up @@ -402,50 +397,54 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint)

if _is_aws_otlp_endpoint(traces_endpoint, "xray"):
if traces_endpoint and _is_aws_otlp_endpoint(traces_endpoint, "xray"):
_logger.info("Detected using AWS OTLP Traces Endpoint.")

if isinstance(span_exporter, OTLPSpanExporter):
if is_agent_observability_enabled():
# Span exporter needs an instance of logger provider in ai agent
# observability case because we need to split input/output prompts
# from span attributes and send them to the logs pipeline per
# the new Gen AI semantic convention from OTel
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
else:
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
endpoint = traces_endpoint.lower()
region = endpoint.split(".")[1]
return _create_aws_otlp_exporter(endpoint=traces_endpoint, service="xray", region=region)

else:
_logger.warning(
"Improper configuration see: please export/set "
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf and OTEL_TRACES_EXPORTER=otlp"
)
_logger.warning(
"Improper configuration see: please export/set "
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf and OTEL_TRACES_EXPORTER=otlp"
)

if not _is_application_signals_enabled():
return span_exporter

return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


def _customize_log_record_processor(log_exporter: LogExporter):
if isinstance(log_exporter, OTLPAwsLogExporter) and is_agent_observability_enabled():
return AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter)
def _customize_log_record_processor(logger_provider: LoggerProvider, log_exporter: Optional[LogExporter]) -> None:
if not log_exporter:
return

return BatchLogRecordProcessor(exporter=log_exporter)
if is_agent_observability_enabled():
# pylint: disable=import-outside-toplevel
from amazon.opentelemetry.distro.exporter.otlp.aws.logs._aws_cw_otlp_batch_log_record_processor import (
AwsCloudWatchOtlpBatchLogRecordProcessor,
)

logger_provider.add_log_record_processor(AwsCloudWatchOtlpBatchLogRecordProcessor(exporter=log_exporter))
else:
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))


def _customize_logs_exporter(log_exporter: LogExporter) -> LogExporter:
logs_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)

if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
if logs_endpoint and _is_aws_otlp_endpoint(logs_endpoint, "logs"):

_logger.info("Detected using AWS OTLP Logs Endpoint.")

if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid:
endpoint = logs_endpoint.lower()
region = endpoint.split(".")[1]
# Setting default compression mode to Gzip as this is the behavior in upstream's
# collector otlp http exporter:
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
return OTLPAwsLogExporter(endpoint=logs_endpoint)
return _create_aws_otlp_exporter(endpoint=logs_endpoint, service="logs", region=region)

_logger.warning(
"Improper configuration see: please export/set "
Expand Down Expand Up @@ -607,11 +606,11 @@ def _is_lambda_environment():
def _is_aws_otlp_endpoint(otlp_endpoint: Optional[str] = None, service: str = "xray") -> bool:
"""Is the given endpoint an AWS OTLP endpoint?"""

pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN

if not otlp_endpoint:
return False

pattern = AWS_TRACES_OTLP_ENDPOINT_PATTERN if service == "xray" else AWS_LOGS_OTLP_ENDPOINT_PATTERN

return bool(re.match(pattern, otlp_endpoint.lower()))


Expand Down Expand Up @@ -772,8 +771,9 @@ def _check_emf_exporter_enabled() -> bool:
def create_emf_exporter():
"""Create and configure the CloudWatch EMF exporter."""
try:
session = get_aws_session()
# Check if botocore is available before importing the EMF exporter
if not is_installed("botocore"):
if not session:
_logger.warning("botocore is not installed. EMF exporter requires botocore")
return None

Expand All @@ -788,6 +788,7 @@ def create_emf_exporter():
return None

return AwsCloudWatchEmfExporter(
session=session,
namespace=log_header_setting.namespace,
log_group_name=log_header_setting.log_group,
log_stream_name=log_header_setting.log_stream,
Expand All @@ -796,3 +797,39 @@ def create_emf_exporter():
except Exception as errors:
_logger.error("Failed to create EMF exporter: %s", errors)
return None


def _create_aws_otlp_exporter(endpoint: str, service: str, region: str):
"""Create and configure the AWS OTLP exporters."""
try:
session = get_aws_session()
# Check if botocore is available before importing the AWS exporter
if not session:
_logger.warning("SigV4 Auth requires botocore to be enabled")
return None

# pylint: disable=import-outside-toplevel
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter

if service == "xray":
if is_agent_observability_enabled():
# Span exporter needs an instance of logger provider in ai agent
# observability case because we need to split input/output prompts
# from span attributes and send them to the logs pipeline per
# the new Gen AI semantic convention from OTel
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/
return OTLPAwsSpanExporter(
session=session, endpoint=endpoint, aws_region=region, logger_provider=get_logger_provider()
)

return OTLPAwsSpanExporter(session=session, endpoint=endpoint, aws_region=region)

if service == "logs":
return OTLPAwsLogExporter(session=session, aws_region=region)

return None
# pylint: disable=broad-exception-caught
except Exception as errors:
_logger.error("Failed to create AWS OTLP exporter: %s", errors)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ def _configure(self, **kwargs):
# Set GenAI capture content default
os.environ.setdefault(OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, "true")

# Set OTLP endpoints with AWS region if not already set
region = get_aws_region()

# Set OTLP endpoints with AWS region if not already set
if region:
os.environ.setdefault(
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, f"https://xray.{region}.amazonaws.com/v1/traces"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import uuid
from typing import Any, Dict, List, Optional

import botocore.session
from botocore.exceptions import ClientError
from botocore.session import Session

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,6 +90,7 @@ class CloudWatchLogClient:
def __init__(
self,
log_group_name: str,
session: Session,
log_stream_name: Optional[str] = None,
aws_region: Optional[str] = None,
**kwargs,
Expand All @@ -105,8 +106,6 @@ def __init__(
"""
self.log_group_name = log_group_name
self.log_stream_name = log_stream_name or self._generate_log_stream_name()

session = botocore.session.Session()
self.logs_client = session.create_client("logs", region_name=aws_region, **kwargs)

# Event batch to store logs before sending to CloudWatch
Expand Down
Loading
Loading