Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
LOCAL_ROOT: str = "LOCAL_ROOT"

# Useful constants
_AWS_LAMBDA_FUNCTION_NAME: str = "AWS_LAMBDA_FUNCTION_NAME"
_BOTO3SQS_INSTRUMENTATION_SCOPE: str = "opentelemetry.instrumentation.boto3sqs"

# Max keyword length supported by parsing into remote_operation from DB_STATEMENT
Expand Down Expand Up @@ -50,7 +51,9 @@ def get_ingress_operation(__, span: ReadableSpan) -> str:
with the first API path parameter" if the default span name is None, UnknownOperation or http.method value.
"""
operation: str = span.name
if should_use_internal_operation(span):
if _AWS_LAMBDA_FUNCTION_NAME in os.environ:
operation = os.environ.get(_AWS_LAMBDA_FUNCTION_NAME) + "/Handler"
elif should_use_internal_operation(span):
operation = INTERNAL_OPERATION
elif not _is_valid_operation(span, operation):
operation = _generate_ingress_operation(span)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk._configuration import (
_get_exporter_names,
_get_id_generator,
Expand Down Expand Up @@ -67,6 +68,9 @@
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"
OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
# UDP package size is not larger than 64KB
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10

_logger: Logger = getLogger(__name__)

Expand Down Expand Up @@ -112,13 +116,18 @@ def _initialize_components():

auto_resource: Dict[str, any] = {}
auto_resource = _customize_versions(auto_resource)
resource = get_aggregated_resources(

resource_detectors = (
[
AwsEc2ResourceDetector(),
AwsEksResourceDetector(),
AwsEcsResourceDetector(),
]
).merge(Resource.create(auto_resource))
if not _is_lambda_environment()
else []
)

resource = get_aggregated_resources(resource_detectors).merge(Resource.create(auto_resource))

sampler_name = _get_sampler()
sampler = _custom_import_sampler(sampler_name, resource)
Expand Down Expand Up @@ -153,7 +162,9 @@ def _init_tracing(
exporter_args: Dict[str, any] = {}
span_exporter: SpanExporter = exporter_class(**exporter_args)
span_exporter = _customize_exporter(span_exporter, resource)
trace_provider.add_span_processor(BatchSpanProcessor(span_exporter))
trace_provider.add_span_processor(
BatchSpanProcessor(span_exporter=span_exporter, max_export_batch_size=_span_export_batch_size())
)

_customize_span_processors(trace_provider, resource)

Expand All @@ -175,7 +186,9 @@ def _export_unsampled_span_for_lambda(trace_provider: TracerProvider, resource:
OTLPUdpSpanExporter(endpoint=traces_endpoint, sampled=False), resource
).build()

trace_provider.add_span_processor(BatchUnsampledSpanProcessor(span_exporter))
trace_provider.add_span_processor(
BatchUnsampledSpanProcessor(span_exporter=span_exporter, max_export_batch_size=LAMBDA_SPAN_EXPORT_BATCH_SIZE)
)


def _is_defer_to_workers_enabled():
Expand Down Expand Up @@ -263,23 +276,30 @@ def _customize_sampler(sampler: Sampler) -> Sampler:


def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
if _is_lambda_environment():
# Override OTLP http default endpoint to UDP
if isinstance(span_exporter, OTLPSpanExporter) and os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) is None:
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint)

if not _is_application_signals_enabled():
return span_exporter
if _is_lambda_environment():
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
return AwsMetricAttributesSpanExporterBuilder(OTLPUdpSpanExporter(endpoint=traces_endpoint), resource).build()

return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


def _customize_span_processors(provider: TracerProvider, resource: Resource) -> None:
if not _is_application_signals_enabled():
return

_export_unsampled_span_for_lambda(provider, resource)

# Construct and set local and remote attributes span processor
provider.add_span_processor(AttributePropagatingSpanProcessorBuilder().build())

# Export 100% spans and not export Application-Signals metrics if on Lambda.
if _is_lambda_environment():
_export_unsampled_span_for_lambda(provider, resource)
return

# Construct meterProvider
_logger.info("AWS Application Signals enabled")
otel_metric_exporter = ApplicationSignalsExporterProvider().create_exporter()
Expand Down Expand Up @@ -318,6 +338,10 @@ def _is_lambda_environment():
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


def _span_export_batch_size():
return LAMBDA_SPAN_EXPORT_BATCH_SIZE if _is_lambda_environment() else None


class ApplicationSignalsExporterProvider:
_instance: ClassVar["ApplicationSignalsExporterProvider"] = None

Expand Down Expand Up @@ -345,11 +369,6 @@ def create_exporter(self):
]:
temporality_dict[typ] = AggregationTemporality.DELTA

if _is_lambda_environment():
# When running in Lambda, export Application Signals metrics over UDP
application_signals_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
return OTLPUdpMetricExporter(endpoint=application_signals_endpoint, preferred_temporality=temporality_dict)

if protocol == "http/protobuf":
application_signals_endpoint = os.environ.get(
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"

# TODO: update sampled and unsampled prefix later
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1"
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1"
FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S"
FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U"

_logger: Logger = getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from amazon.opentelemetry.distro.aws_batch_unsampled_span_processor import BatchUnsampledSpanProcessor
from amazon.opentelemetry.distro.aws_metric_attributes_span_exporter import AwsMetricAttributesSpanExporter
from amazon.opentelemetry.distro.aws_opentelemetry_configurator import (
LAMBDA_SPAN_EXPORT_BATCH_SIZE,
ApplicationSignalsExporterProvider,
AwsOpenTelemetryConfigurator,
_custom_import_sampler,
Expand All @@ -23,15 +24,15 @@
)
from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import OTLPMetricExporterMixin
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPGrpcOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG
from opentelemetry.sdk.metrics._internal.export import MetricExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
Expand Down Expand Up @@ -250,7 +251,7 @@ def test_customize_sampler(self):
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)

def test_customize_exporter(self):
mock_exporter: SpanExporter = MagicMock()
mock_exporter: SpanExporter = MagicMock(spec=OTLPSpanExporter)
customized_exporter: SpanExporter = _customize_exporter(mock_exporter, Resource.get_empty())
self.assertEqual(mock_exporter, customized_exporter)

Expand Down Expand Up @@ -285,6 +286,23 @@ def test_customize_span_processors(self):
self.assertIsInstance(second_processor, AwsSpanMetricsProcessor)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)

def test_customize_span_processors_lambda(self):
mock_tracer_provider: TracerProvider = MagicMock()
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 0)

os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
self.assertEqual(mock_tracer_provider.add_span_processor.call_count, 2)
first_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(first_processor, AttributePropagatingSpanProcessor)
second_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[1].args[0]
self.assertIsInstance(second_processor, BatchUnsampledSpanProcessor)
self.assertEqual(second_processor.max_export_batch_size, LAMBDA_SPAN_EXPORT_BATCH_SIZE)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

def test_application_signals_exporter_provider(self):
# Check default protocol - HTTP, as specified by AwsOpenTelemetryDistro.
exporter: OTLPMetricExporterMixin = ApplicationSignalsExporterProvider().create_exporter()
Expand All @@ -303,13 +321,6 @@ def test_application_signals_exporter_provider(self):
self.assertIsInstance(exporter, OTLPHttpOTLPMetricExporter)
self.assertEqual("http://localhost:4316/v1/metrics", exporter._endpoint)

# When in Lambda, exporter should be UDP.
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
exporter: MetricExporter = ApplicationSignalsExporterProvider().create_exporter()
self.assertIsInstance(exporter, OTLPUdpMetricExporter)
self.assertEqual("127.0.0.1:2000", exporter._udp_exporter._endpoint)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

def test_is_defer_to_workers_enabled(self):
os.environ.setdefault("OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED", "True")
self.assertTrue(_is_defer_to_workers_enabled())
Expand Down Expand Up @@ -366,6 +377,7 @@ def test_export_unsampled_span_for_lambda(self):
first_processor: SpanProcessor = mock_tracer_provider.add_span_processor.call_args_list[0].args[0]
self.assertIsInstance(first_processor, BatchUnsampledSpanProcessor)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)


def validate_distro_environ():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import os
from typing import List
from unittest import TestCase
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

from amazon.opentelemetry.distro._aws_attribute_keys import AWS_CONSUMER_PARENT_SPAN_KIND, AWS_LOCAL_OPERATION
from amazon.opentelemetry.distro._aws_span_processing_util import (
_AWS_LAMBDA_FUNCTION_NAME,
MAX_KEYWORD_LENGTH,
_get_dialect_keywords,
extract_api_path_value,
Expand Down Expand Up @@ -54,6 +56,14 @@ def test_get_ingress_operation_with_not_server(self):
actual_operation: str = get_ingress_operation(self, self.span_data_mock)
self.assertEqual(actual_operation, _INTERNAL_OPERATION)

@patch.dict(os.environ, {_AWS_LAMBDA_FUNCTION_NAME: "MyLambda"})
def test_get_ingress_operation_in_lambda(self):
valid_name: str = "ValidName"
self.span_data_mock.name = valid_name
self.span_data_mock.kind = SpanKind.SERVER
actual_operation: str = get_ingress_operation(self, self.span_data_mock)
self.assertEqual(actual_operation, "MyLambda/Handler")

def test_get_ingress_operation_http_method_name_and_no_fallback(self):
invalid_name: str = "GET"
self.span_data_mock.name = invalid_name
Expand Down
19 changes: 17 additions & 2 deletions lambda-layer/src/otel-instrument
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,27 @@ fi

export LAMBDA_RESOURCE_ATTRIBUTES="cloud.region=$AWS_REGION,cloud.provider=aws,faas.name=$AWS_LAMBDA_FUNCTION_NAME,faas.version=$AWS_LAMBDA_FUNCTION_VERSION,faas.instance=$AWS_LAMBDA_LOG_STREAM_NAME,aws.log.group.names=$AWS_LAMBDA_LOG_GROUP_NAME";


if [ -z "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" ]; then
export OTEL_AWS_APPLICATION_SIGNALS_ENABLED="true";
fi

# - If Application Signals is enabled

if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "true" ]; then
export OTEL_PYTHON_DISTRO="aws_distro";
export OTEL_PYTHON_CONFIGURATOR="aws_configurator";
export OTEL_METRICS_EXPORTER="none";
export OTEL_LOGS_EXPORTER="none";
if [ -z "${OTEL_METRICS_EXPORTER}" ]; then
export OTEL_METRICS_EXPORTER="none";
fi
fi

# - If Application Signals is disabled

if [ "${OTEL_AWS_APPLICATION_SIGNALS_ENABLED}" = "false" ]; then
if [ -z "${OTEL_METRICS_EXPORTER}" ]; then
export OTEL_METRICS_EXPORTER="none";
fi
fi

if [ -z "${OTEL_RESOURCE_ATTRIBUTES}" ]; then
Expand All @@ -128,6 +142,7 @@ fi
if [ -z ${OTEL_PYTHON_DISABLED_INSTRUMENTATIONS} ]; then
export OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="aio-pika,aiohttp-client,aiohttp-server,aiopg,asgi,asyncio,asyncpg,boto,boto3,cassandra,celery,confluent_kafka,dbapi,django,elasticsearch,falcon,fastapi,flask,grpc_client,grpc_server,grpc_aio_client,grpc_aio_server,httpx,jinja2,kafka,logging,mysql,mysqlclient,pika,psycopg,psycopg2,pymemcache,pymongo,pymysql,pyramid,redis,remoulade,requests,sklearn,sqlalchemy,sqlite3,starlette,system_metrics,threading,tornado,tortoiseorm,urllib,urllib3,wsgi"
fi
export OTEL_PYTHON_DISABLED_INSTRUMENTATIONS="$OTEL_PYTHON_DISABLED_INSTRUMENTATIONS,aws-lambda";

# - Use a wrapper because AWS Lambda's `python3 /var/runtime/bootstrap.py` will
# use `imp.load_module` to load the function from the `_HANDLER` environment
Expand Down
1 change: 0 additions & 1 deletion lambda-layer/terraform/lambda/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ module "test-function" {

environment_variables = {
AWS_LAMBDA_EXEC_WRAPPER = "/opt/otel-instrument"
OTEL_AWS_APPLICATION_SIGNALS_ENABLED = "true"
}

tracing_mode = var.tracing_mode
Expand Down
Loading