@@ -49,22 +49,22 @@ def get_data(self, prefix):
4949 break
5050
5151 for message in messages :
52- attrs = message .get ("MessageAttributes" , {})
53- msg_retry_prefix = attrs .get ("retry_prefix" , {}).get ("StringValue" )
54- msg_function_prefix = attrs .get ("function_prefix" , {}).get (
55- "StringValue"
56- )
5752 receipt_handle = message ["ReceiptHandle" ]
53+ msg_retry_prefix = self ._get_message_attr (message , "retry_prefix" )
54+ msg_function_prefix = self ._get_message_attr (message , "function_prefix" )
5855
59- if (
56+ matches = (
6057 msg_retry_prefix == str (prefix )
6158 and msg_function_prefix == self .function_prefix
62- ):
63- data = self ._deserialize (message ["Body" ])
64- if data is not None :
65- key_data [receipt_handle ] = data
66- else :
59+ )
60+
61+ if not matches :
6762 self ._release_message (receipt_handle )
63+ continue
64+
65+ data = self ._deserialize (message ["Body" ])
66+ if data is not None :
67+ key_data [receipt_handle ] = data
6868
6969 if logger .isEnabledFor (logging .DEBUG ):
7070 logger .debug (
@@ -120,6 +120,13 @@ def _release_message(self, receipt_handle):
120120 except ClientError as e :
121121 logger .error (f"Failed to release SQS message: { e } " )
122122
123+ @staticmethod
124+ def _get_message_attr (message , attr_name ):
125+ """Extract a string attribute value from an SQS message."""
126+ return (
127+ message .get ("MessageAttributes" , {}).get (attr_name , {}).get ("StringValue" )
128+ )
129+
123130 def _chunk_data (self , data ):
124131 """Split a list of items into chunks that each fit under SQS_MAX_CHUNK_BYTES."""
125132 if not isinstance (data , list ):
@@ -153,7 +160,7 @@ def _chunk_data(self, data):
153160 if current_chunk :
154161 chunks .append (current_chunk )
155162
156- return chunks if chunks else [data ]
163+ return chunks or [data ]
157164
158165 def _serialize (self , data ):
159166 return json .dumps (data , ensure_ascii = False )
0 commit comments