Skip to content

Commit b4c340e

Browse files
committed
merge
1 parent 6572783 commit b4c340e

File tree

9 files changed

+370
-172
lines changed

9 files changed

+370
-172
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED"
1313

14+
1415
def is_installed(req: str) -> bool:
1516
"""Is the given required package installed?"""
1617

@@ -24,5 +25,6 @@ def is_installed(req: str) -> bool:
2425
return False
2526
return True
2627

28+
2729
def is_agent_observability_enabled() -> bool:
2830
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
44
import os
55
import re
6-
from logging import NOTSET, CRITICAL, Logger, getLogger
6+
from logging import NOTSET, Logger, getLogger
77
from typing import ClassVar, Dict, List, Type, Union
88

99
from importlib_metadata import version
1010
from typing_extensions import override
1111

1212
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
13-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
1413
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
14+
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
1515
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1616
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1717
AttributePropagatingSpanProcessorBuilder,
@@ -22,13 +22,14 @@
2222
AwsMetricAttributesSpanExporterBuilder,
2323
)
2424
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
25+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import AwsBatchLogRecordProcessor
2526
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
2627
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
2728
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
2829
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2930
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
3031
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
31-
from opentelemetry._logs import set_logger_provider, get_logger_provider
32+
from opentelemetry._logs import get_logger_provider, set_logger_provider
3233
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
3334
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
3435
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
@@ -123,24 +124,6 @@ class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator):
123124
# pylint: disable=no-self-use
124125
@override
125126
def _configure(self, **kwargs):
126-
127-
print(f"OTEL_EXPORTER_OTLP_LOGS_HEADERS: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_HEADERS', 'Not set')}")
128-
print(f"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: {os.environ.get('OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED', 'Not set')}")
129-
print(f"OTEL_METRICS_EXPORTER: {os.environ.get('OTEL_METRICS_EXPORTER', 'Not set')}")
130-
print(f"OTEL_TRACES_EXPORTER: {os.environ.get('OTEL_TRACES_EXPORTER', 'Not set')}")
131-
print(f"OTEL_LOGS_EXPORTER: {os.environ.get('OTEL_LOGS_EXPORTER', 'Not set')}")
132-
print(f"OTEL_PYTHON_DISTRO: {os.environ.get('OTEL_PYTHON_DISTRO', 'Not set')}")
133-
print(f"OTEL_PYTHON_CONFIGURATOR: {os.environ.get('OTEL_PYTHON_CONFIGURATOR', 'Not set')}")
134-
print(f"OTEL_EXPORTER_OTLP_PROTOCOL: {os.environ.get('OTEL_EXPORTER_OTLP_PROTOCOL', 'Not set')}")
135-
print(f"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_TRACES_ENDPOINT', 'Not set')}")
136-
print(f"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_ENDPOINT', 'Not set')}")
137-
print(f"OTEL_RESOURCE_ATTRIBUTES: {os.environ.get('OTEL_RESOURCE_ATTRIBUTES', 'Not set')}")
138-
print(f"AGENT_OBSERVABILITY_ENABLED: {os.environ.get('AGENT_OBSERVABILITY_ENABLED', 'Not set')}")
139-
print(f"AWS_CLOUDWATCH_LOG_GROUP: {os.environ.get('AWS_CLOUDWATCH_LOG_GROUP', 'Not set')}")
140-
print(f"AWS_CLOUDWATCH_LOG_STREAM: {os.environ.get('AWS_CLOUDWATCH_LOG_STREAM', 'Not set')}")
141-
print(f"OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: {os.environ.get('OTEL_PYTHON_DISABLED_INSTRUMENTATIONS', 'Not set')}")
142-
print(f"PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not set')}")
143-
144127
if _is_defer_to_workers_enabled() and _is_wsgi_master_process():
145128
_logger.info(
146129
"Skipping ADOT initialization since deferral to worker is enabled, and this is a master process."
@@ -193,9 +176,6 @@ def _initialize_components():
193176
resource=resource,
194177
)
195178
_init_metrics(metric_exporters, resource)
196-
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false")
197-
if logging_enabled.strip().lower() == "true":
198-
_init_logging(log_exporters, resource)
199179

200180

201181
def _init_logging(
@@ -205,7 +185,7 @@ def _init_logging(
205185

206186
# Provides a default OTLP log exporter when the environment is not set.
207187
# This is the behavior for the logs exporters for other languages.
208-
if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) == None:
188+
if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) is None:
209189
exporters = {"otlp": OTLPLogExporter}
210190

211191
provider = LoggerProvider(resource=resource)
@@ -214,7 +194,11 @@ def _init_logging(
214194
for _, exporter_class in exporters.items():
215195
exporter_args: Dict[str, any] = {}
216196
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
217-
provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter))
197+
198+
if isinstance(log_exporter, OTLPAwsLogExporter):
199+
provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter))
200+
else:
201+
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
218202

219203
handler = LoggingHandler(level=NOTSET, logger_provider=provider)
220204

@@ -385,7 +369,10 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
385369
_logger.info("Detected using AWS OTLP Traces Endpoint.")
386370

387371
if isinstance(span_exporter, OTLPSpanExporter):
388-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
372+
if is_agent_observability_enabled():
373+
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
374+
else:
375+
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
389376

390377
else:
391378
_logger.warning(
@@ -650,4 +637,4 @@ def create_exporter(self):
650637
endpoint=application_signals_endpoint, preferred_temporality=temporality_dict
651638
)
652639

653-
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
640+
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1-
BASE_LOG_BUFFER_BYTE_SIZE = 450000
2-
MAX_LOG_REQUEST_BYTE_SIZE = 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
1+
BASE_LOG_BUFFER_BYTE_SIZE = 2000
2+
MAX_LOG_REQUEST_BYTE_SIZE = (
3+
1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
4+
)
Lines changed: 47 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,51 @@
1-
from time import sleep
2-
import json
31
import logging
4-
import os
5-
import threading
62
from typing import Mapping, Sequence
7-
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
8-
from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import MAX_LOG_REQUEST_BYTE_SIZE, BASE_LOG_BUFFER_BYTE_SIZE
9-
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
10-
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy, detach, attach, set_value, _SUPPRESS_INSTRUMENTATION_KEY
113

4+
from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import (
5+
BASE_LOG_BUFFER_BYTE_SIZE,
6+
MAX_LOG_REQUEST_BYTE_SIZE,
7+
)
8+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
129
from opentelemetry.sdk._logs import LogData
10+
from opentelemetry.sdk._logs._internal.export import (
11+
_SUPPRESS_INSTRUMENTATION_KEY,
12+
BatchLogExportStrategy,
13+
attach,
14+
detach,
15+
set_value,
16+
)
17+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
1318
from opentelemetry.util.types import AnyValue
1419

1520
_logger = logging.getLogger(__name__)
1621

22+
1723
class AwsBatchLogRecordProcessor(BatchLogRecordProcessor):
1824

19-
def __init__(
25+
def __init__(
2026
self,
2127
exporter: OTLPAwsLogExporter,
2228
schedule_delay_millis: float | None = None,
2329
max_export_batch_size: int | None = None,
2430
export_timeout_millis: float | None = None,
25-
max_queue_size: int | None = None
26-
):
31+
max_queue_size: int | None = None,
32+
):
2733

2834
super().__init__(
2935
exporter=exporter,
3036
schedule_delay_millis=schedule_delay_millis,
3137
max_export_batch_size=max_export_batch_size,
3238
export_timeout_millis=export_timeout_millis,
33-
max_queue_size=max_queue_size
39+
max_queue_size=max_queue_size,
3440
)
3541

36-
# Code based off of:
3742
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
3843
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
3944
"""
40-
Overrides the batching behavior of upstream's export method. Preserves existing batching behavior but
41-
will intermediarly export small log batches if the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit
42-
of 1 MB.
45+
Preserves existing batching behavior but will intermediarly export small log batches if the size of the data in the batch is at or
46+
above AWS CloudWatch's maximum request size limit of 1 MB.
4347
44-
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
48+
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
4549
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
4650
"""
4751

@@ -57,78 +61,70 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
5761
batch_length = min(self._max_export_batch_size, len(self._queue))
5862
batch_data_size = 0
5963
batch = []
60-
64+
6165
for _ in range(batch_length):
62-
66+
6367
log_data = self._queue.pop()
6468
log_size = self._get_size_of_log(log_data)
65-
69+
6670
if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE):
6771
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
6872
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
6973
self._exporter.set_gen_ai_flag()
70-
74+
7175
self._exporter.export(batch)
7276
batch_data_size = 0
7377
batch = []
74-
78+
7579
batch_data_size += log_size
7680
batch.append(log_data)
77-
81+
7882
if batch:
7983
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
8084
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
8185
self._exporter.set_gen_ai_flag()
82-
86+
8387
self._exporter.export(batch)
8488
except Exception: # pylint: disable=broad-exception-caught
8589
_logger.exception("Exception while exporting logs.")
86-
detach(token)
87-
88-
def _get_size_of_log(self, log_data: LogData) -> int:
90+
detach(token)
91+
92+
@staticmethod
93+
def _get_size_of_log(log_data: LogData) -> int:
8994
"""
90-
Estimates the size of a given LogData based on the size of the body + a buffer amount representing a rough guess of other data present
91-
in the log.
95+
Estimates the size of a given LogData based on the size of the body + a buffer
96+
amount representing a rough guess of other data present in the log.
9297
"""
9398
size = BASE_LOG_BUFFER_BYTE_SIZE
9499
body = log_data.log_record.body
95-
100+
96101
if body:
97-
size += self._get_size_of_any_value(body)
102+
size += AwsBatchLogRecordProcessor._get_size_of_any_value(body)
98103

99104
return size
100105

101-
def _get_size_of_any_value(self, val: AnyValue) -> int:
106+
@staticmethod
107+
def _get_size_of_any_value(val: AnyValue) -> int:
102108
"""
103109
Recursively calculates the size of an AnyValue type in bytes.
104110
"""
105111
size = 0
106-
112+
107113
if isinstance(val, str) or isinstance(val, bytes):
108114
return len(val)
109-
115+
110116
if isinstance(val, bool):
111-
if val:
112-
return 4 #len(True) = 4
113-
return 5 #len(False) = 5
114-
117+
return 4 if val else 5
118+
115119
if isinstance(val, int) or isinstance(val, float):
116120
return len(str(val))
117-
121+
118122
if isinstance(val, Sequence):
119123
for content in val:
120-
size += self._get_size_of_any_value(content)
121-
124+
size += AwsBatchLogRecordProcessor._get_size_of_any_value(content)
125+
122126
if isinstance(val, Mapping):
123127
for _, content in val.items():
124-
size += self._get_size_of_any_value(content)
125-
126-
return size
127-
128-
128+
size += AwsBatchLogRecordProcessor._get_size_of_any_value(content)
129129

130-
131-
132-
133-
134-
130+
return size

0 commit comments

Comments
 (0)