1212
1313from amazon .opentelemetry .distro .exporter .otlp .aws .common .aws_auth_session import AwsAuthSession
1414from amazon .opentelemetry .distro .exporter .otlp .aws .common .constants import BASE_LOG_BUFFER_BYTE_SIZE
15+ from opentelemetry .exporter .otlp .proto .common ._log_encoder import encode_logs
1516from opentelemetry .exporter .otlp .proto .http import Compression
1617from opentelemetry .exporter .otlp .proto .http ._log_exporter import OTLPLogExporter , _create_exp_backoff_generator
1718from opentelemetry .sdk ._logs import (
2627
2728class OTLPAwsLogExporter (OTLPLogExporter ):
2829 _LARGE_LOG_HEADER = {"x-aws-log-semantics" : "otel" }
29- _RETRY_AFTER_HEADER = "Retry-After"
30+ _RETRY_AFTER_HEADER = "Retry-After" # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
3031
3132 def __init__ (
3233 self ,
@@ -55,31 +56,22 @@ def __init__(
5556 session = AwsAuthSession (aws_region = self ._aws_region , service = "logs" ),
5657 )
5758
59+ # Code based off of:
60+ # https://github.com/open-telemetry/opentelemetry-python/blob/main/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167
5861 def export (self , batch : Sequence [LogData ]) -> LogExportResult :
62+
5963 print (f"Exporting batch of { len (batch )} logs" )
6064 print ("TOTAL DATA SIZE " + str (sum (self ._get_size_of_log (logz ) for logz in batch )))
6165 print ("GEN_AI_FLAG " + str (self ._gen_ai_flag ))
6266
63- return super ().export (batch )
64-
65- def set_gen_ai_flag (self ):
66- self ._gen_ai_flag = True
67-
68- @staticmethod
69- def _retryable (resp : requests .Response ) -> bool :
70- if resp .status_code == 429 or resp .status_code == 503 :
71- return True
72-
73- return OTLPLogExporter ._retryable (resp )
74-
75- def _export (self , serialized_data : bytes ) -> requests .Response :
7667 """
77- Exports the given serialized OTLP log data. Behaviors of how this export will work.
68+ Exports the given batch of OTLP log data.
69+ Behaviors of how this export will work -
7870
7971 1. Always compresses the serialized data into gzip before sending.
8072
81- 2. If self._gen_ai_flag is enabled, the log data is > 1 MB and we assume that the log contains normalized gen.ai attributes .
82- - in this case we inject the 'x-aws-log-semantics' flag into the header.
73+ 2. If self._gen_ai_flag is enabled, the log data is > 1 MB and the assumption is that the log is a normalized gen.ai LogEvent .
74+ - inject the 'x-aws-log-semantics' flag into the header.
8375
8476 3. Retry behavior is now the following:
8577 - if the response contains a status code that is retryable and the response contains Retry-After in its headers,
@@ -88,69 +80,109 @@ def _export(self, serialized_data: bytes) -> requests.Response:
8880 - if the response does not contain that Retry-After header, default back to the current iteration of the
8981 exponential backoff delay
9082 """
83+
84+ if self ._shutdown :
85+ _logger .warning ("Exporter already shutdown, ignoring batch" )
86+ return LogExportResult .FAILURE
87+
88+ serialized_data = encode_logs (batch ).SerializeToString ()
89+
9190 gzip_data = BytesIO ()
9291 with gzip .GzipFile (fileobj = gzip_data , mode = "w" ) as gzip_stream :
9392 gzip_stream .write (serialized_data )
9493
9594 data = gzip_data .getvalue ()
9695
97- def send ():
98- try :
99- return self ._session .post (
100- url = self ._endpoint ,
101- headers = self ._LARGE_LOG_HEADER if self ._gen_ai_flag else None ,
102- data = data ,
103- verify = self ._certificate_file ,
104- timeout = self ._timeout ,
105- cert = self ._client_cert ,
106- )
107- except ConnectionError :
108- return self ._session .post (
109- url = self ._endpoint ,
110- headers = self ._LARGE_LOG_HEADER if self ._gen_ai_flag else None ,
111- data = data ,
112- verify = self ._certificate_file ,
113- timeout = self ._timeout ,
114- cert = self ._client_cert ,
115- )
116-
11796 backoff = _create_exp_backoff_generator (self ._MAX_RETRY_TIMEOUT )
11897
11998 while True :
120- resp = send ()
99+ resp = self ._send (data )
100+
101+ if resp .ok :
102+ return LogExportResult .SUCCESS
121103
122104 if not self ._retryable (resp ):
105+ _logger .error (
106+ "Failed to export logs batch code: %s, reason: %s" ,
107+ resp .status_code ,
108+ resp .text ,
109+ )
123110 self ._gen_ai_flag = False
124- return resp
125-
126- _logger .warning (
127- "Transient error %s encountered while exporting logs batch, retrying in %ss." ,
128- resp .reason ,
129- delay ,
130- )
111+ return LogExportResult .FAILURE
131112
113+ # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
132114 retry_after = resp .headers .get (self ._RETRY_AFTER_HEADER , None )
133115
134116 # Set the next retry delay to the value of the Retry-After response in the headers.
135117 # If Retry-After is not present in the headers, default to the next iteration of the exponential backoff strategy.
136118 delay = next (backoff , - 1 ) if retry_after == None else self ._parse_retryable_header (retry_after )
137119
138120 if delay == - 1 :
121+ _logger .error (
122+ "Transient error %s encountered while exporting logs batch. "
123+ "No Retry-After header found and all backoff retries exhausted. "
124+ "Logs will not be exported." ,
125+ resp .reason ,
126+ )
139127 self ._gen_ai_flag = False
140- return resp
128+ return LogExportResult .FAILURE
129+
130+ _logger .warning (
131+ "Transient error %s encountered while exporting logs batch, retrying in %ss." ,
132+ resp .reason ,
133+ delay ,
134+ )
141135
142136 sleep (delay )
143- continue
137+
138+ def set_gen_ai_flag (self ):
139+ """
140+ Sets the gen_ai flag to true to signal injecting the LLO flag to the headers of the export request.
141+ """
142+ self ._gen_ai_flag = True
143+
144+ def _send (self , serialized_data : bytes ):
145+ try :
146+ return self ._session .post (
147+ url = self ._endpoint ,
148+ headers = self ._LARGE_LOG_HEADER if self ._gen_ai_flag else None ,
149+ data = serialized_data ,
150+ verify = self ._certificate_file ,
151+ timeout = self ._timeout ,
152+ cert = self ._client_cert ,
153+ )
154+ except ConnectionError :
155+ return self ._session .post (
156+ url = self ._endpoint ,
157+ headers = self ._LARGE_LOG_HEADER if self ._gen_ai_flag else None ,
158+ data = serialized_data ,
159+ verify = self ._certificate_file ,
160+ timeout = self ._timeout ,
161+ cert = self ._client_cert ,
162+ )
163+
164+ @staticmethod
165+ def _retryable (resp : requests .Response ) -> bool :
166+ """
167+ Is it a retryable response?
168+ """
169+ if resp .status_code == 429 or resp .status_code == 503 :
170+ return True
171+
172+ return OTLPLogExporter ._retryable (resp )
144173
145174 def _parse_retryable_header (self , retry_header : str ) -> float :
146- "Converts the given retryable header into a delay in seconds, returns -1 if there's an error with the parsing"
175+ """
176+ Converts the given retryable header into a delay in seconds, returns -1 if there's an error with the parsing
177+ """
147178 try :
148179 return float (retry_header )
149180 except ValueError :
150181 return - 1
151182
152183 def _get_size_of_log (self , log_data : LogData ):
153- # Rough estimate of the size of the LogData based on size of the content body + a buffer to account for other information in logs.
184+ # Rough estimate of the size of the LogData based on size of
185+ # the content body + a buffer to account for other information in logs.
154186 size = BASE_LOG_BUFFER_BYTE_SIZE
155187 body = log_data .log_record .body
156188
0 commit comments