Skip to content

Commit 6572783

Browse files
committed
init genesis custom log behavior
1 parent 409cb6a commit 6572783

File tree

6 files changed

+318
-19
lines changed

6 files changed

+318
-19
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
44
import os
55
import re
6-
from logging import NOTSET, Logger, getLogger
6+
from logging import NOTSET, CRITICAL, Logger, getLogger
77
from typing import ClassVar, Dict, List, Type, Union
88

99
from importlib_metadata import version
@@ -83,6 +83,7 @@
8383
DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT"
8484
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT"
8585
METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL"
86+
OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER"
8687
DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0
8788
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
8889
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"
@@ -122,6 +123,24 @@ class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator):
122123
# pylint: disable=no-self-use
123124
@override
124125
def _configure(self, **kwargs):
126+
127+
print(f"OTEL_EXPORTER_OTLP_LOGS_HEADERS: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_HEADERS', 'Not set')}")
128+
print(f"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED: {os.environ.get('OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED', 'Not set')}")
129+
print(f"OTEL_METRICS_EXPORTER: {os.environ.get('OTEL_METRICS_EXPORTER', 'Not set')}")
130+
print(f"OTEL_TRACES_EXPORTER: {os.environ.get('OTEL_TRACES_EXPORTER', 'Not set')}")
131+
print(f"OTEL_LOGS_EXPORTER: {os.environ.get('OTEL_LOGS_EXPORTER', 'Not set')}")
132+
print(f"OTEL_PYTHON_DISTRO: {os.environ.get('OTEL_PYTHON_DISTRO', 'Not set')}")
133+
print(f"OTEL_PYTHON_CONFIGURATOR: {os.environ.get('OTEL_PYTHON_CONFIGURATOR', 'Not set')}")
134+
print(f"OTEL_EXPORTER_OTLP_PROTOCOL: {os.environ.get('OTEL_EXPORTER_OTLP_PROTOCOL', 'Not set')}")
135+
print(f"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_TRACES_ENDPOINT', 'Not set')}")
136+
print(f"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT: {os.environ.get('OTEL_EXPORTER_OTLP_LOGS_ENDPOINT', 'Not set')}")
137+
print(f"OTEL_RESOURCE_ATTRIBUTES: {os.environ.get('OTEL_RESOURCE_ATTRIBUTES', 'Not set')}")
138+
print(f"AGENT_OBSERVABILITY_ENABLED: {os.environ.get('AGENT_OBSERVABILITY_ENABLED', 'Not set')}")
139+
print(f"AWS_CLOUDWATCH_LOG_GROUP: {os.environ.get('AWS_CLOUDWATCH_LOG_GROUP', 'Not set')}")
140+
print(f"AWS_CLOUDWATCH_LOG_STREAM: {os.environ.get('AWS_CLOUDWATCH_LOG_STREAM', 'Not set')}")
141+
print(f"OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: {os.environ.get('OTEL_PYTHON_DISABLED_INSTRUMENTATIONS', 'Not set')}")
142+
print(f"PYTHONPATH: {os.environ.get('PYTHONPATH', 'Not set')}")
143+
125144
if _is_defer_to_workers_enabled() and _is_wsgi_master_process():
126145
_logger.info(
127146
"Skipping ADOT initialization since deferral to worker is enabled, and this is a master process."
@@ -174,16 +193,19 @@ def _initialize_components():
174193
resource=resource,
175194
)
176195
_init_metrics(metric_exporters, resource)
196+
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false")
197+
if logging_enabled.strip().lower() == "true":
198+
_init_logging(log_exporters, resource)
177199

178200

179201
def _init_logging(
180202
exporters: Dict[str, Type[LogExporter]],
181203
resource: Resource = None,
182204
):
183205

184-
# Provides a default OTLP log exporter when none is specified.
206+
# Provides a default OTLP log exporter when the environment is not set.
185207
# This is the behavior for the logs exporters for other languages.
186-
if not exporters:
208+
if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) == None:
187209
exporters = {"otlp": OTLPLogExporter}
188210

189211
provider = LoggerProvider(resource=resource)
@@ -192,7 +214,7 @@ def _init_logging(
192214
for _, exporter_class in exporters.items():
193215
exporter_args: Dict[str, any] = {}
194216
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
195-
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
217+
provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter))
196218

197219
handler = LoggingHandler(level=NOTSET, logger_provider=provider)
198220

@@ -363,15 +385,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
363385
_logger.info("Detected using AWS OTLP Traces Endpoint.")
364386

365387
if isinstance(span_exporter, OTLPSpanExporter):
366-
if is_agent_observability_enabled():
367-
logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
368-
logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint)
369-
span_exporter = OTLPAwsSpanExporter(
370-
endpoint=traces_endpoint,
371-
logger_provider=get_logger_provider()
372-
)
373-
else:
374-
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
388+
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
375389

376390
else:
377391
_logger.warning(
@@ -636,4 +650,4 @@ def create_exporter(self):
636650
endpoint=application_signals_endpoint, preferred_temporality=temporality_dict
637651
)
638652

639-
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
653+
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
BASE_LOG_BUFFER_BYTE_SIZE = 450000
2+
MAX_LOG_REQUEST_BYTE_SIZE = 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from time import sleep
2+
import json
3+
import logging
4+
import os
5+
import threading
6+
from typing import Mapping, Sequence
7+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
8+
from amazon.opentelemetry.distro.exporter.otlp.aws.common.constants import MAX_LOG_REQUEST_BYTE_SIZE, BASE_LOG_BUFFER_BYTE_SIZE
9+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
10+
from opentelemetry.sdk._logs._internal.export import BatchLogExportStrategy, detach, attach, set_value, _SUPPRESS_INSTRUMENTATION_KEY
11+
12+
from opentelemetry.sdk._logs import LogData
13+
from opentelemetry.util.types import AnyValue
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
class AwsBatchLogRecordProcessor(BatchLogRecordProcessor):
18+
19+
def __init__(
20+
self,
21+
exporter: OTLPAwsLogExporter,
22+
schedule_delay_millis: float | None = None,
23+
max_export_batch_size: int | None = None,
24+
export_timeout_millis: float | None = None,
25+
max_queue_size: int | None = None
26+
):
27+
28+
super().__init__(
29+
exporter=exporter,
30+
schedule_delay_millis=schedule_delay_millis,
31+
max_export_batch_size=max_export_batch_size,
32+
export_timeout_millis=export_timeout_millis,
33+
max_queue_size=max_queue_size
34+
)
35+
36+
# Code based off of:
37+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
38+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
39+
"""
40+
Overrides the batching behavior of upstream's export method. Preserves existing batching behavior but
41+
will intermediarly export small log batches if the size of the data in the batch is at or above AWS CloudWatch's maximum request size limit
42+
of 1 MB.
43+
44+
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
45+
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
46+
"""
47+
48+
with self._export_lock:
49+
iteration = 0
50+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
51+
# once the lock is obtained to see if we still need to make the requested export.
52+
while self._should_export_batch(batch_strategy, iteration):
53+
54+
iteration += 1
55+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
56+
try:
57+
batch_length = min(self._max_export_batch_size, len(self._queue))
58+
batch_data_size = 0
59+
batch = []
60+
61+
for _ in range(batch_length):
62+
63+
log_data = self._queue.pop()
64+
log_size = self._get_size_of_log(log_data)
65+
66+
if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE):
67+
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
68+
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
69+
self._exporter.set_gen_ai_flag()
70+
71+
self._exporter.export(batch)
72+
batch_data_size = 0
73+
batch = []
74+
75+
batch_data_size += log_size
76+
batch.append(log_data)
77+
78+
if batch:
79+
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
80+
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
81+
self._exporter.set_gen_ai_flag()
82+
83+
self._exporter.export(batch)
84+
except Exception: # pylint: disable=broad-exception-caught
85+
_logger.exception("Exception while exporting logs.")
86+
detach(token)
87+
88+
def _get_size_of_log(self, log_data: LogData) -> int:
89+
"""
90+
Estimates the size of a given LogData based on the size of the body + a buffer amount representing a rough guess of other data present
91+
in the log.
92+
"""
93+
size = BASE_LOG_BUFFER_BYTE_SIZE
94+
body = log_data.log_record.body
95+
96+
if body:
97+
size += self._get_size_of_any_value(body)
98+
99+
return size
100+
101+
def _get_size_of_any_value(self, val: AnyValue) -> int:
102+
"""
103+
Recursively calculates the size of an AnyValue type in bytes.
104+
"""
105+
size = 0
106+
107+
if isinstance(val, str) or isinstance(val, bytes):
108+
return len(val)
109+
110+
if isinstance(val, bool):
111+
if val:
112+
return 4 #len(True) = 4
113+
return 5 #len(False) = 5
114+
115+
if isinstance(val, int) or isinstance(val, float):
116+
return len(str(val))
117+
118+
if isinstance(val, Sequence):
119+
for content in val:
120+
size += self._get_size_of_any_value(content)
121+
122+
if isinstance(val, Mapping):
123+
for _, content in val.items():
124+
size += self._get_size_of_any_value(content)
125+
126+
return size
127+
128+
129+
130+
131+
132+
133+
134+

0 commit comments

Comments
 (0)