11import logging
2- from typing import Mapping , Optional , Sequence , cast
2+ from typing import List , Mapping , Optional , Sequence , cast
33
44from amazon .opentelemetry .distro .exporter .otlp .aws .logs .otlp_aws_logs_exporter import OTLPAwsLogExporter
55from opentelemetry .context import (
1818
1919class AwsBatchLogRecordProcessor (BatchLogRecordProcessor ):
2020 _BASE_LOG_BUFFER_BYTE_SIZE = (
21- 2000 # Buffer size in bytes to account for log metadata not included in the body size calculation
21+ 1000 # Buffer size in bytes to account for log metadata not included in the body or attribute size calculation
2222 )
2323 _MAX_LOG_REQUEST_BYTE_SIZE = (
2424 1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
@@ -66,7 +66,7 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
6666
6767 for _ in range (batch_length ):
6868 log_data : LogData = self ._queue .pop ()
69- log_size = self ._BASE_LOG_BUFFER_BYTE_SIZE + self . _get_any_value_size (log_data . log_record . body )
69+ log_size = self ._estimate_log_size (log_data )
7070
7171 if batch and (batch_data_size + log_size > self ._MAX_LOG_REQUEST_BYTE_SIZE ):
7272 # if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
@@ -92,60 +92,67 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
9292 _logger .exception ("Exception while exporting logs: " + str (exception ))
9393 detach (token )
9494
95- def _get_any_value_size (self , val : AnyValue , depth : int = 3 ) -> int :
95+ def _estimate_log_size (self , log : LogData , depth : int = 3 ) -> int :
9696 """
97- Only used to indicate whether we should export a batch log size of 1 or not.
98- Calculates the size in bytes of an AnyValue object .
99- Will process complex AnyValue structures up to the specified depth limit.
100- If the depth limit of the AnyValue structure is exceeded, returns 0.
97+ Estimates the size in bytes of a log by calculating the size of its body and its attributes
98+ and adding a buffer amount to account for other log metadata information .
99+ Will process complex log structures up to the specified depth limit.
100+ If the depth limit of the log structure is exceeded, returns 0.
101101
102102 Args:
103- val : The AnyValue object to calculate size for
103+ log : The Log object to calculate size for
104104 depth: Maximum depth to traverse in nested structures (default: 3)
105105
106106 Returns:
107- int: Total size of the AnyValue object in bytes
107+ int: The estimated size of the log object in bytes
108108 """
109- # Use a stack to prevent excessive recursive calls.
110- stack = [(val , 0 )]
111- size : int = 0
112-
113- while stack :
114- # small optimization. We can stop calculating the size once it reaches the 1 MB limit.
115- if size >= self ._MAX_LOG_REQUEST_BYTE_SIZE :
116- return size
117-
118- next_val , current_depth = stack .pop ()
119-
120- if isinstance (next_val , (str , bytes )):
121- size += len (next_val )
122- continue
123-
124- if isinstance (next_val , bool ):
125- size += 4 if next_val else 5
126- continue
127-
128- if isinstance (next_val , (float , int )):
129- size += len (str (next_val ))
130- continue
131-
132- if current_depth <= depth :
133- if isinstance (next_val , Sequence ):
134- for content in next_val :
135- stack .append ((cast (AnyValue , content ), current_depth + 1 ))
136-
137- if isinstance (next_val , Mapping ):
138- for key , content in next_val .items ():
139- size += len (key )
140- stack .append ((content , current_depth + 1 ))
141- else :
142- _logger .debug ("Max log depth exceeded. Log data size will not be accurately calculated." )
143- return 0
109+
110+ # Use a queue to prevent excessive recursive calls.
111+ queue : List [tuple [AnyValue , int ]] = [(log .log_record .body , 0 ), (log .log_record .attributes , - 1 )]
112+
113+ size : int = self ._BASE_LOG_BUFFER_BYTE_SIZE
114+
115+ while queue :
116+ new_queue : List [tuple [AnyValue , int ]] = []
117+
118+ for data in queue :
119+ # small optimization, can stop calculating the size once it reaches the 1 MB limit.
120+ if size >= self ._MAX_LOG_REQUEST_BYTE_SIZE :
121+ return size
122+
123+ next_val , current_depth = data
124+
125+ if isinstance (next_val , (str , bytes )):
126+ size += len (next_val )
127+ continue
128+
129+ if isinstance (next_val , bool ):
130+ size += 4 if next_val else 5
131+ continue
132+
133+ if isinstance (next_val , (float , int )):
134+ size += len (str (next_val ))
135+ continue
136+
137+ if current_depth <= depth :
138+ # Sequence has to be
139+ if isinstance (next_val , Sequence ):
140+ for content in next_val :
141+ new_queue .append ((cast (AnyValue , content ), current_depth + 1 ))
142+
143+ if isinstance (next_val , Mapping ):
144+ for key , content in next_val .items ():
145+ size += len (key )
146+ new_queue .append ((content , current_depth + 1 ))
147+ else :
148+ _logger .debug (f"Max log dept of { depth } exceeded. Log data size will not be accurately calculated." )
149+
150+ queue = new_queue
144151
145152 return size
146153
147154 @staticmethod
148- def _is_gen_ai_log (log_data : LogData ) -> bool :
155+ def _is_gen_ai_log (log : LogData ) -> bool :
149156 """
150157 Is the log a Gen AI log event?
151158 """
@@ -157,4 +164,4 @@ def _is_gen_ai_log(log_data: LogData) -> bool:
157164 "openlit.otel.tracing" ,
158165 }
159166
160- return log_data .instrumentation_scope .name in gen_ai_instrumentations
167+ return log .instrumentation_scope .name in gen_ai_instrumentations
0 commit comments