Skip to content

Commit a8a895c

Browse files
committed
some refactoring
1 parent 570def6 commit a8a895c

File tree

3 files changed

+61
-41
lines changed

3 files changed

+61
-41
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_metric_attributes_span_exporter.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ def __init__(self, delegate: SpanExporter, generator: MetricAttributeGenerator,
4343

4444
@override
4545
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
46-
print("IN AWS EXPORTER:::")
4746
modified_spans: Sequence[ReadableSpan] = self._add_metric_attributes(spans)
4847
return self._delegate.export(modified_spans)
4948

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
1919
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2020
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpMetricExporter
21-
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpTraceExporter
21+
from amazon.opentelemetry.distro.otlp_udp_exporter import OtlpUdpSpanExporter
2222
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
2323
from opentelemetry.sdk._configuration import (
2424
_get_exporter_names,
@@ -221,8 +221,9 @@ def _customize_sampler(sampler: Sampler) -> Sampler:
221221
def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> SpanExporter:
222222
if not _is_application_signals_enabled():
223223
return span_exporter
224-
udp_span_exporter = OtlpUdpTraceExporter(endpoint="something")
225-
return AwsMetricAttributesSpanExporterBuilder(udp_span_exporter, resource).build()
224+
if _is_lambda_environment():
225+
return AwsMetricAttributesSpanExporterBuilder(OtlpUdpSpanExporter(), resource).build()
226+
return AwsMetricAttributesSpanExporterBuilder(span_exporter, resource).build()
226227

227228

228229
def _customize_span_processors(provider: TracerProvider, resource: Resource) -> None:
@@ -298,7 +299,8 @@ def create_exporter(self):
298299
temporality_dict[typ] = AggregationTemporality.DELTA
299300

300301
if _is_lambda_environment():
301-
return OtlpUdpMetricExporter(temporality_dict=temporality_dict)
302+
# When running in Lambda, export Application Signals metrics over UDP
303+
return OtlpUdpMetricExporter(preferred_temporality=temporality_dict)
302304

303305
if protocol == "http/protobuf":
304306
application_signals_endpoint = os.environ.get(
Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import typing
2-
from typing import Optional, Sequence
1+
from typing import Optional, Sequence, Dict
32

3+
from opentelemetry.sdk.metrics._internal.aggregation import AggregationTemporality
4+
from opentelemetry.sdk.metrics.view import Aggregation
45
from opentelemetry.sdk.trace import ReadableSpan
56
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
67
from opentelemetry.sdk.metrics.export import MetricExporter
@@ -16,44 +17,57 @@
1617

1718
import socket
1819

19-
PROTOCOL_HEADER = "{\"format\":\"json\",\"version\":1}"
20+
DEFAULT_ENDPOINT = "127.0.0.1:2000"
21+
PROTOCOL_HEADER = '{"format":"json","version":1}\n'
2022
PROTOCOL_DELIMITER = '\n'
2123

2224

23-
class OtlpUdpExporterCommon:
25+
class UdpExporter:
2426
def __init__(
2527
self,
2628
endpoint: Optional[str] = None
2729
):
28-
self._endpoint = endpoint or "http://127.0.0.1:2000"
29-
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #TODO: What does this mean?
30-
self._socket.setblocking(False) #TODO: Is this correct that we don't want to block?
30+
self._endpoint = endpoint or DEFAULT_ENDPOINT
31+
self._host, self._port = self._parse_endpoint(self._endpoint)
32+
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
33+
self._socket.setblocking(False)
3134

32-
def send_data(self, data, format):
33-
udp_data = {
34-
"format": format,
35-
"data": data
36-
}
35+
def send_data(self, data: str, format: str):
36+
udp_data = f'{{"format":"{format}","data":{data}}}'
37+
message = PROTOCOL_HEADER + udp_data
3738

38-
message = "%s%s%s" % (PROTOCOL_HEADER,
39-
PROTOCOL_DELIMITER,
40-
udp_data)
39+
try:
40+
print("Sending UDP data: ", message) # TODO: remove
41+
self._socket.sendto(message.encode('utf-8'), (self._host, int(self._port)))
42+
except Exception as e:
43+
print("Error sending UDP data: ", e)
4144

42-
print("Sending UDP data: ", message)
43-
self._socket.sendto(message.encode('utf-8'), self._endpoint)
44-
pass
45+
def shutdown(self):
46+
self._socket.close()
4547

48+
def _parse_endpoint(self, endpoint: str) -> tuple[str, int]:
49+
try:
50+
vals = endpoint.split(":")
51+
host = vals[0]
52+
port = int(vals[1])
53+
except Exception as e:
54+
raise ValueError(f"Invalid endpoint: {endpoint}") from e
4655

47-
class OtlpUdpMetricExporter(OtlpUdpExporterCommon, MetricExporter):
48-
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
49-
pass
56+
return host, port
5057

51-
def force_flush(self, timeout_millis: float = 10_000) -> bool:
52-
pass
5358

54-
def __init__(self, endpoint=None, temporality_dict=None):
55-
OtlpUdpExporterCommon.__init__(self, endpoint)
56-
MetricExporter.__init__(self, preferred_temporality=temporality_dict)
59+
class OtlpUdpMetricExporter(MetricExporter):
60+
def __init__(
61+
self,
62+
endpoint: Optional[str] = None,
63+
preferred_temporality: Dict[type, AggregationTemporality] = None,
64+
preferred_aggregation: Dict[type, Aggregation] = None
65+
):
66+
super().__init__(
67+
preferred_temporality=preferred_temporality,
68+
preferred_aggregation=preferred_aggregation,
69+
)
70+
self._udp_exporter = UdpExporter(endpoint=endpoint)
5771

5872
@override
5973
def export(
@@ -62,26 +76,31 @@ def export(
6276
timeout_millis: float = 10_000,
6377
**kwargs,
6478
) -> MetricExportResult:
65-
# serialized_data = encode_metrics(metrics_data).SerializeToString()
66-
# self.send_data(data=serialized_data, format="OTEL_V1_METRICS")
67-
return MetricExportResult.SUCCESS
79+
serialized_data = encode_metrics(metrics_data).SerializeToString()
80+
self._udp_exporter.send_data(data=serialized_data, format="OTEL_V1_METRICS") # TODO: Convert to constant
81+
return MetricExportResult.SUCCESS # TODO: send appropriate status back. Need to??
82+
83+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
84+
return True
85+
86+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
87+
self._udp_exporter.shutdown()
6888

6989

70-
class OtlpUdpTraceExporter(SpanExporter):
71-
def __init__(self, endpoint=None):
72-
self._exp = OtlpUdpExporterCommon(endpoint=endpoint)
90+
class OtlpUdpSpanExporter(SpanExporter):
91+
def __init__(self, endpoint: Optional[str] = None):
92+
self._udp_exporter = UdpExporter(endpoint=endpoint)
7393

7494
@override
7595
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
76-
print("SPAN EXPORTER CALLED")
7796
serialized_data = encode_spans(spans).SerializeToString()
78-
self._exp.send_data(data=serialized_data, format="OTEL_V1_TRACES")
79-
return SpanExportResult.SUCCESS
97+
self._udp_exporter.send_data(data=serialized_data, format="OTEL_V1_TRACES") # TODO: Convert to constant
98+
return SpanExportResult.SUCCESS # TODO: send appropriate status back. Need to??
8099

81100
@override
82101
def force_flush(self, timeout_millis: int = 30000) -> bool:
83102
return True
84103

85104
@override
86105
def shutdown(self) -> None:
87-
pass
106+
self._udp_exporter.shutdown()

0 commit comments

Comments
 (0)