Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpMetricExporter, OtlpUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.sdk._configuration import (
Expand Down Expand Up @@ -62,6 +63,8 @@
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT"
METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL"
DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"

_logger: Logger = getLogger(__name__)

Expand Down Expand Up @@ -219,6 +222,8 @@ def _customize_sampler(sampler: Sampler) -> Sampler:
def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
if not _is_application_signals_enabled():
return span_exporter
if _is_lambda_environment():
return AwsMetricAttributesSpanExporterBuilder(OtlpUdpSpanExporter(), resource).build()
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()


Expand Down Expand Up @@ -262,6 +267,11 @@ def _is_application_signals_enabled():
)


def _is_lambda_environment():
# detect if running in AWS Lambda environment
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


class ApplicationSignalsExporterProvider:
_instance: ClassVar["ApplicationSignalsExporterProvider"] = None

Expand Down Expand Up @@ -289,6 +299,11 @@ def create_exporter(self):
]:
temporality_dict[typ] = AggregationTemporality.DELTA

if _is_lambda_environment():
# When running in Lambda, export Application Signals metrics over UDP
application_signals_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
return OtlpUdpMetricExporter(endpoint=application_signals_endpoint, preferred_temporality=temporality_dict)

if protocol == "http/protobuf":
application_signals_endpoint = os.environ.get(
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import socket
from logging import Logger, getLogger
from typing import Dict, Optional, Sequence, Tuple

from typing_extensions import override

from opentelemetry.exporter.otlp.proto.common.metrics_encoder import encode_metrics
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
from opentelemetry.sdk.metrics._internal.point import MetricsData
from opentelemetry.sdk.metrics.export import MetricExporter
from opentelemetry.sdk.metrics.view import Aggregation
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

DEFAULT_ENDPOINT = "127.0.0.1:2000"
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
PROTOCOL_DELIMITER = "\n"

_logger: Logger = getLogger(__name__)


class UdpExporter:
def __init__(self, endpoint: Optional[str] = None):
self._endpoint = endpoint or DEFAULT_ENDPOINT # TODO: read from some env var??
self._host, self._port = self._parse_endpoint(self._endpoint)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setblocking(False)

def send_data(self, data: str, signal_format: str):
udp_data = f'{{"format":"{signal_format}","data":{data}}}'
message = PROTOCOL_HEADER + udp_data

try:
print("Sending UDP data: ", message) # TODO: remove
self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))
except Exception as exc: # pylint: disable=broad-except
_logger.error("Error sending UDP data: %s", exc)

def shutdown(self):
self._socket.close()

# pylint: disable=no-self-use
def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:
try:
vals = endpoint.split(":")
host = vals[0]
port = int(vals[1])
except Exception as exc: # pylint: disable=broad-except
raise ValueError(f"Invalid endpoint: {endpoint}") from exc

return host, port


class OtlpUdpMetricExporter(MetricExporter):
def __init__(
self,
endpoint: Optional[str] = None,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, Aggregation] = None,
):
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self._udp_exporter = UdpExporter(endpoint=endpoint)

@override
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
serialized_data = encode_metrics(metrics_data).SerializeToString()
self._udp_exporter.send_data(data=serialized_data, signal_format="OTEL_V1_METRICS") # TODO: Convert to constant
return MetricExportResult.SUCCESS # TODO: send appropriate status back. Need to??

# pylint: disable=no-self-use
def force_flush(self, timeout_millis: float = 10_000) -> bool:
# TODO: implement force flush
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._udp_exporter.shutdown()


class OtlpUdpSpanExporter(SpanExporter):
def __init__(self, endpoint: Optional[str] = None):
self._udp_exporter = UdpExporter(endpoint=endpoint)

@override
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
serialized_data = encode_spans(spans).SerializeToString()
self._udp_exporter.send_data(data=serialized_data, signal_format="OTEL_V1_TRACES") # TODO: Convert to constant
return SpanExportResult.SUCCESS # TODO: send appropriate status back. Need to??

# pylint: disable=no-self-use
@override
def force_flush(self, timeout_millis: int = 30000) -> bool:
# TODO: implement force flush
return True

@override
def shutdown(self) -> None:
self._udp_exporter.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
)
from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpMetricExporter, OtlpUdpSpanExporter
from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import OTLPMetricExporterMixin
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as OTLPGrpcOTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
from opentelemetry.sdk.environment_variables import OTEL_TRACES_SAMPLER, OTEL_TRACES_SAMPLER_ARG
from opentelemetry.sdk.metrics._internal.export import MetricExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span, SpanProcessor, Tracer, TracerProvider
from opentelemetry.sdk.trace.export import SpanExporter
Expand Down Expand Up @@ -254,6 +256,16 @@ def test_customize_exporter(self):
self.assertEqual(mock_exporter, customized_exporter._delegate)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)

# when Application Signals is enabled and running in lambda
os.environ.setdefault("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "True")
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
customized_exporter = _customize_exporter(mock_exporter, Resource.get_empty())
self.assertNotEqual(mock_exporter, customized_exporter)
self.assertIsInstance(customized_exporter, AwsMetricAttributesSpanExporter)
self.assertIsInstance(customized_exporter._delegate, OtlpUdpSpanExporter)
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)

def test_customize_span_processors(self):
mock_tracer_provider: TracerProvider = MagicMock()
_customize_span_processors(mock_tracer_provider, Resource.get_empty())
Expand Down Expand Up @@ -286,6 +298,13 @@ def test_application_signals_exporter_provider(self):
self.assertIsInstance(exporter, OTLPHttpOTLPMetricExporter)
self.assertEqual("http://localhost:4316/v1/metrics", exporter._endpoint)

# When in Lambda, exporter should be UDP.
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
exporter: MetricExporter = ApplicationSignalsExporterProvider().create_exporter()
self.assertIsInstance(exporter, OtlpUdpMetricExporter)
self.assertEqual("127.0.0.1:2000", exporter._udp_exporter._endpoint)
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)


def validate_distro_environ():
tc: TestCase = TestCase()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import socket
import unittest
from unittest import TestCase
from unittest.mock import MagicMock, patch

from amazon.opentelemetry.distro.otlp_udp_exporter import (
DEFAULT_ENDPOINT,
PROTOCOL_HEADER,
OtlpUdpMetricExporter,
OtlpUdpSpanExporter,
UdpExporter,
)
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
from opentelemetry.sdk.trace.export import SpanExportResult


class TestUdpExporter(TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_udp_exporter_init_default(self, mock_socket):
exporter = UdpExporter()
self.assertEqual(exporter._endpoint, DEFAULT_ENDPOINT)
self.assertEqual(exporter._host, "127.0.0.1")
self.assertEqual(exporter._port, 2000)
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
mock_socket().setblocking.assert_called_once_with(False)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_udp_exporter_init_with_endpoint(self, mock_socket):
exporter = UdpExporter(endpoint="localhost:5000")
self.assertNotEqual(exporter._endpoint, DEFAULT_ENDPOINT)
self.assertEqual(exporter._host, "localhost")
self.assertEqual(exporter._port, 5000)
mock_socket.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM)
mock_socket().setblocking.assert_called_once_with(False)

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_udp_exporter_init_invalid_endpoint(self, mock_socket):
with self.assertRaises(ValueError):
UdpExporter(endpoint="invalidEndpoint:port")

# pylint: disable=no-self-use
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_send_data(self, mock_socket):
mock_socket_instance = mock_socket.return_value
exporter = UdpExporter()
exporter.send_data("encoded_data", "signal")
expected_message = PROTOCOL_HEADER + '{"format":"signal","data":encoded_data}'
mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000))

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
def test_shutdown(self, mock_socket):
mock_socket_instance = mock_socket.return_value
exporter = UdpExporter()
exporter.shutdown()
mock_socket_instance.close.assert_called_once()


class TestOtlpUdpMetricExporter(unittest.TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_metrics")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export(self, mock_udp_exporter, mock_encode_metrics):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data
exporter = OtlpUdpMetricExporter()
result = exporter.export(MagicMock())
mock_udp_exporter_instance.send_data.assert_called_once_with(
data=mock_encoded_data, signal_format="OTEL_V1_METRICS"
)
self.assertEqual(result, MetricExportResult.SUCCESS)

# pylint: disable=no-self-use
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_shutdown(self, mock_udp_exporter):
mock_udp_exporter_instance = mock_udp_exporter.return_value
exporter = OtlpUdpMetricExporter()
exporter.shutdown()
mock_udp_exporter_instance.shutdown.assert_called_once()


class TestOtlpUdpSpanExporter(unittest.TestCase):

@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_export(self, mock_udp_exporter, mock_encode_spans):
mock_udp_exporter_instance = mock_udp_exporter.return_value
mock_encoded_data = MagicMock()
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
exporter = OtlpUdpSpanExporter()
result = exporter.export(MagicMock())
mock_udp_exporter_instance.send_data.assert_called_once_with(
data=mock_encoded_data, signal_format="OTEL_V1_TRACES"
)
self.assertEqual(result, SpanExportResult.SUCCESS)

# pylint: disable=no-self-use
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
def test_shutdown(self, mock_udp_exporter):
mock_udp_exporter_instance = mock_udp_exporter.return_value
exporter = OtlpUdpSpanExporter()
exporter.shutdown()
mock_udp_exporter_instance.shutdown.assert_called_once()


# TODO: remove this line for final PR
if __name__ == "__main__":
unittest.main()
15 changes: 15 additions & 0 deletions test_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# test script for appsignals lambda behavior

from time import sleep

from opentelemetry import trace

if __name__ == "__main__":
tracer = trace.get_tracer("test-tracer")
with tracer.start_as_current_span("parent"):
# Attach a new child and update the current span
with tracer.start_as_current_span("child"):
# sleep for 1s
sleep(1)
# Close child span, set parent as current
# Close parent span, set default span as current
Loading