File tree Expand file tree Collapse file tree 3 files changed +9
-18
lines changed
Expand file tree Collapse file tree 3 files changed +9
-18
lines changed Original file line number Diff line number Diff line change @@ -53,12 +53,10 @@ def get_data(self, prefix):
5353 msg_retry_prefix = self ._get_message_attr (message , "retry_prefix" )
5454 msg_function_prefix = self ._get_message_attr (message , "function_prefix" )
5555
56- matches = (
57- msg_retry_prefix == str (prefix )
58- and msg_function_prefix == self .function_prefix
59- )
60-
61- if not matches :
56+ if (
57+ msg_retry_prefix != str (prefix )
58+ or msg_function_prefix != self .function_prefix
59+ ):
6260 self ._release_message (receipt_handle )
6361 continue
6462
@@ -123,9 +121,8 @@ def _release_message(self, receipt_handle):
123121 @staticmethod
124122 def _get_message_attr (message , attr_name ):
125123 """Extract a string attribute value from an SQS message."""
126- return (
127- message .get ("MessageAttributes" , {}).get (attr_name , {}).get ("StringValue" )
128- )
124+ attrs = message .get ("MessageAttributes" , {})
125+ return attrs .get (attr_name , {}).get ("StringValue" )
129126
130127 def _chunk_data (self , data ):
131128 """Split a list of items into chunks that each fit under SQS_MAX_CHUNK_BYTES."""
Original file line number Diff line number Diff line change @@ -82,7 +82,7 @@ def _get_key_prefix(self, retry_prefix):
8282 return f"{ DD_S3_RETRY_DIRNAME } /{ self .function_prefix } /{ str (retry_prefix )} /"
8383
8484 def _serialize (self , data ):
85- return bytes ( json .dumps (data ).encode ("UTF-8" ) )
85+ return json .dumps (data ).encode ("UTF-8" )
8686
8787 def _deserialize (self , data ):
8888 return json .loads (data .decode ("UTF-8" ))
Original file line number Diff line number Diff line change @@ -247,12 +247,6 @@ def normalize_events(events, metadata):
247247
248248
249249def collect_and_count (events ):
250- collected = []
251- counter = 0
252- for event in events :
253- counter += 1
254- collected .append (event )
255-
256- send_event_metric ("incoming_events" , counter )
257-
250+ collected = list (events )
251+ send_event_metric ("incoming_events" , len (collected ))
258252 return collected
You can’t perform that action at this time.
0 commit comments