Skip to content

Commit e8fc471

Browse files
committed
update udp message schema
1 parent 07eae1f commit e8fc471

File tree

5 files changed

+48
-54
lines changed

5 files changed

+48
-54
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
AwsMetricAttributesSpanExporterBuilder,
1717
)
1818
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
19-
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpMetricExporter, OtlpUdpSpanExporter
19+
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter
2020
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2121
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
2222
from opentelemetry.sdk._configuration import (
@@ -40,6 +40,7 @@
4040
from opentelemetry.sdk.extension.aws.resource.ecs import AwsEcsResourceDetector
4141
from opentelemetry.sdk.extension.aws.resource.eks import AwsEksResourceDetector
4242
from opentelemetry.sdk.metrics import MeterProvider
43+
from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter
4344
from opentelemetry.sdk.metrics._internal.instrument import (
4445
Counter,
4546
Histogram,
@@ -223,7 +224,7 @@ def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> Span
223224
if not _is_application_signals_enabled():
224225
return span_exporter
225226
if _is_lambda_environment():
226-
return AwsMetricAttributesSpanExporterBuilder(OtlpUdpSpanExporter(), resource).build()
227+
return AwsMetricAttributesSpanExporterBuilder(OTLPUdpSpanExporter(), resource).build()
227228
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
228229

229230

@@ -302,7 +303,7 @@ def create_exporter(self):
302303
if _is_lambda_environment():
303304
# When running in Lambda, export Application Signals metrics over UDP
304305
application_signals_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
305-
return OtlpUdpMetricExporter(endpoint=application_signals_endpoint, preferred_temporality=temporality_dict)
306+
return OTLPUdpMetricExporter(endpoint=application_signals_endpoint, preferred_temporality=temporality_dict)
306307

307308
if protocol == "http/protobuf":
308309
application_signals_endpoint = os.environ.get(

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/otlp_udp_exporter.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
import base64
34
import socket
45
from logging import Logger, getLogger
56
from typing import Dict, Optional, Sequence, Tuple
@@ -18,27 +19,30 @@
1819

1920
DEFAULT_ENDPOINT = "127.0.0.1:2000"
2021
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
21-
PROTOCOL_DELIMITER = "\n"
22+
FORMAT_OTEL_METRICS_BINARY_PREFIX = "M1"
23+
FORMAT_OTEL_TRACES_BINARY_PREFIX = "T1"
2224

2325
_logger: Logger = getLogger(__name__)
2426

2527

2628
class UdpExporter:
2729
def __init__(self, endpoint: Optional[str] = None):
28-
self._endpoint = endpoint or DEFAULT_ENDPOINT # TODO: read from some env var??
30+
self._endpoint = endpoint or DEFAULT_ENDPOINT
2931
self._host, self._port = self._parse_endpoint(self._endpoint)
3032
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
3133
self._socket.setblocking(False)
3234

33-
def send_data(self, data: str, signal_format: str):
34-
udp_data = f'{{"format":"{signal_format}","data":{data}}}'
35-
message = PROTOCOL_HEADER + udp_data
35+
def send_data(self, data: bytes, signal_format_prefix: str):
36+
# base64 encoding and then converting to string with utf-8
37+
base64_encoded_string: str = base64.b64encode(data).decode("utf-8")
38+
message = f"{PROTOCOL_HEADER}{signal_format_prefix}{base64_encoded_string}"
3639

3740
try:
38-
print("Sending UDP data: ", message) # TODO: remove
41+
_logger.debug("Sending UDP data: ", message)
3942
self._socket.sendto(message.encode("utf-8"), (self._host, int(self._port)))
4043
except Exception as exc: # pylint: disable=broad-except
4144
_logger.error("Error sending UDP data: %s", exc)
45+
raise
4246

4347
def shutdown(self):
4448
self._socket.close()
@@ -55,7 +59,7 @@ def _parse_endpoint(self, endpoint: str) -> Tuple[str, int]:
5559
return host, port
5660

5761

58-
class OtlpUdpMetricExporter(MetricExporter):
62+
class OTLPUdpMetricExporter(MetricExporter):
5963
def __init__(
6064
self,
6165
endpoint: Optional[str] = None,
@@ -76,8 +80,13 @@ def export(
7680
**kwargs,
7781
) -> MetricExportResult:
7882
serialized_data = encode_metrics(metrics_data).SerializeToString()
79-
self._udp_exporter.send_data(data=serialized_data, signal_format="OTEL_V1_METRICS") # TODO: Convert to constant
80-
return MetricExportResult.SUCCESS # TODO: send appropriate status back. Need to??
83+
84+
try:
85+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_METRICS_BINARY_PREFIX)
86+
return MetricExportResult.SUCCESS
87+
except Exception as exc: # pylint: disable=broad-except
88+
_logger.error("Error exporting metrics: %s", exc)
89+
return MetricExportResult.FAILURE
8190

8291
# pylint: disable=no-self-use
8392
def force_flush(self, timeout_millis: float = 10_000) -> bool:
@@ -88,15 +97,20 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
8897
self._udp_exporter.shutdown()
8998

9099

91-
class OtlpUdpSpanExporter(SpanExporter):
100+
class OTLPUdpSpanExporter(SpanExporter):
92101
def __init__(self, endpoint: Optional[str] = None):
93102
self._udp_exporter = UdpExporter(endpoint=endpoint)
94103

95104
@override
96105
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
97106
serialized_data = encode_spans(spans).SerializeToString()
98-
self._udp_exporter.send_data(data=serialized_data, signal_format="OTEL_V1_TRACES") # TODO: Convert to constant
99-
return SpanExportResult.SUCCESS # TODO: send appropriate status back. Need to??
107+
108+
try:
109+
self._udp_exporter.send_data(data=serialized_data, signal_format_prefix=FORMAT_OTEL_TRACES_BINARY_PREFIX)
110+
return SpanExportResult.SUCCESS
111+
except Exception as exc:
112+
_logger.error("Error exporting spans: %s", exc)
113+
return SpanExportResult.FAILURE
100114

101115
# pylint: disable=no-self-use
102116
@override

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_aws_opentelementry_configurator.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
)
2020
from amazon.opentelemetry.distro.aws_opentelemetry_distro import AwsOpenTelemetryDistro
2121
from amazon.opentelemetry.distro.aws_span_metrics_processor import AwsSpanMetricsProcessor
22-
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpMetricExporter, OtlpUdpSpanExporter
22+
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpMetricExporter, OTLPUdpSpanExporter
2323
from amazon.opentelemetry.distro.sampler._aws_xray_sampling_client import _AwsXRaySamplingClient
2424
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2525
from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER
@@ -262,7 +262,7 @@ def test_customize_exporter(self):
262262
customized_exporter = _customize_exporter(mock_exporter, Resource.get_empty())
263263
self.assertNotEqual(mock_exporter, customized_exporter)
264264
self.assertIsInstance(customized_exporter, AwsMetricAttributesSpanExporter)
265-
self.assertIsInstance(customized_exporter._delegate, OtlpUdpSpanExporter)
265+
self.assertIsInstance(customized_exporter._delegate, OTLPUdpSpanExporter)
266266
os.environ.pop("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", None)
267267
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)
268268

@@ -301,7 +301,7 @@ def test_application_signals_exporter_provider(self):
301301
# When in Lambda, exporter should be UDP.
302302
os.environ.setdefault("AWS_LAMBDA_FUNCTION_NAME", "myLambdaFunc")
303303
exporter: MetricExporter = ApplicationSignalsExporterProvider().create_exporter()
304-
self.assertIsInstance(exporter, OtlpUdpMetricExporter)
304+
self.assertIsInstance(exporter, OTLPUdpMetricExporter)
305305
self.assertEqual("127.0.0.1:2000", exporter._udp_exporter._endpoint)
306306
os.environ.pop("AWS_LAMBDA_FUNCTION_NAME", None)
307307

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_otlp_udp_exporter.py

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
import base64
34
import socket
45
import unittest
56
from unittest import TestCase
@@ -8,8 +9,8 @@
89
from amazon.opentelemetry.distro.otlp_udp_exporter import (
910
DEFAULT_ENDPOINT,
1011
PROTOCOL_HEADER,
11-
OtlpUdpMetricExporter,
12-
OtlpUdpSpanExporter,
12+
OTLPUdpMetricExporter,
13+
OTLPUdpSpanExporter,
1314
UdpExporter,
1415
)
1516
from opentelemetry.sdk.metrics._internal.export import MetricExportResult
@@ -46,8 +47,10 @@ def test_udp_exporter_init_invalid_endpoint(self, mock_socket):
4647
def test_send_data(self, mock_socket):
4748
mock_socket_instance = mock_socket.return_value
4849
exporter = UdpExporter()
49-
exporter.send_data("encoded_data", "signal")
50-
expected_message = PROTOCOL_HEADER + '{"format":"signal","data":encoded_data}'
50+
input_bytes: bytes = b"hello"
51+
encoded_bytes: bytes = base64.b64encode(input_bytes)
52+
exporter.send_data(input_bytes, "signal_prefix")
53+
expected_message = PROTOCOL_HEADER + "signal_prefix" + encoded_bytes.decode("utf-8")
5154
mock_socket_instance.sendto.assert_called_once_with(expected_message.encode("utf-8"), ("127.0.0.1", 2000))
5255

5356
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.socket.socket")
@@ -58,54 +61,45 @@ def test_shutdown(self, mock_socket):
5861
mock_socket_instance.close.assert_called_once()
5962

6063

61-
class TestOtlpUdpMetricExporter(unittest.TestCase):
64+
class TestOTLPUdpMetricExporter(unittest.TestCase):
6265

6366
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_metrics")
6467
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
6568
def test_export(self, mock_udp_exporter, mock_encode_metrics):
6669
mock_udp_exporter_instance = mock_udp_exporter.return_value
6770
mock_encoded_data = MagicMock()
6871
mock_encode_metrics.return_value.SerializeToString.return_value = mock_encoded_data
69-
exporter = OtlpUdpMetricExporter()
72+
exporter = OTLPUdpMetricExporter()
7073
result = exporter.export(MagicMock())
71-
mock_udp_exporter_instance.send_data.assert_called_once_with(
72-
data=mock_encoded_data, signal_format="OTEL_V1_METRICS"
73-
)
74+
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="M1")
7475
self.assertEqual(result, MetricExportResult.SUCCESS)
7576

7677
# pylint: disable=no-self-use
7778
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
7879
def test_shutdown(self, mock_udp_exporter):
7980
mock_udp_exporter_instance = mock_udp_exporter.return_value
80-
exporter = OtlpUdpMetricExporter()
81+
exporter = OTLPUdpMetricExporter()
8182
exporter.shutdown()
8283
mock_udp_exporter_instance.shutdown.assert_called_once()
8384

8485

85-
class TestOtlpUdpSpanExporter(unittest.TestCase):
86+
class TestOTLPUdpSpanExporter(unittest.TestCase):
8687

8788
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.encode_spans")
8889
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
8990
def test_export(self, mock_udp_exporter, mock_encode_spans):
9091
mock_udp_exporter_instance = mock_udp_exporter.return_value
9192
mock_encoded_data = MagicMock()
9293
mock_encode_spans.return_value.SerializeToString.return_value = mock_encoded_data
93-
exporter = OtlpUdpSpanExporter()
94+
exporter = OTLPUdpSpanExporter()
9495
result = exporter.export(MagicMock())
95-
mock_udp_exporter_instance.send_data.assert_called_once_with(
96-
data=mock_encoded_data, signal_format="OTEL_V1_TRACES"
97-
)
96+
mock_udp_exporter_instance.send_data.assert_called_once_with(data=mock_encoded_data, signal_format_prefix="T1")
9897
self.assertEqual(result, SpanExportResult.SUCCESS)
9998

10099
# pylint: disable=no-self-use
101100
@patch("amazon.opentelemetry.distro.otlp_udp_exporter.UdpExporter")
102101
def test_shutdown(self, mock_udp_exporter):
103102
mock_udp_exporter_instance = mock_udp_exporter.return_value
104-
exporter = OtlpUdpSpanExporter()
103+
exporter = OTLPUdpSpanExporter()
105104
exporter.shutdown()
106105
mock_udp_exporter_instance.shutdown.assert_called_once()
107-
108-
109-
# TODO: remove this line for final PR
110-
if __name__ == "__main__":
111-
unittest.main()

test_lambda.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)