@@ -13,6 +13,7 @@ def __init__(self, queue_url: str):
1313 self .queue_url = queue_url
1414 self .sqs_client = boto3 .client ("sqs" )
1515 self ._last_checked_count = 0
16+ self ._cached_messages : list [dict [str , Any ]] = []
1617
1718 def receive_messages (self , max_messages : int = 10 ) -> list [dict [str , Any ]]:
1819 try :
@@ -74,19 +75,28 @@ def has_interrupt_message(self) -> tuple[bool, str | None]:
7475 if body .get ("type" ) == "INTERRUPT" :
7576 LOGGER .info (f"Found INTERRUPT message in queue" )
7677 return True , msg ["receipt_handle" ]
78+ else :
79+ self ._cached_messages .append (msg )
7780
7881 return False , None
7982
8083 def get_resume_messages (self ) -> list [dict [str , Any ]]:
81- messages = self .receive_messages (max_messages = 10 )
8284 resume_messages = []
8385
86+ for msg in self ._cached_messages :
87+ body = msg ["body" ]
88+ if body .get ("type" ) == "RESUME" :
89+ resume_messages .append ({"body" : body , "receipt_handle" : msg ["receipt_handle" ]})
90+
91+ messages = self .receive_messages (max_messages = 10 )
8492 for msg in messages :
8593 body = msg ["body" ]
8694 if body .get ("type" ) == "RESUME" :
8795 resume_messages .append ({"body" : body , "receipt_handle" : msg ["receipt_handle" ]})
8896
8997 if resume_messages :
90- LOGGER .info (f"Found { len (resume_messages )} RESUME messages in queue" )
98+ LOGGER .info (f"Found { len (resume_messages )} RESUME messages ({ len ([m for m in self ._cached_messages if m ['body' ].get ('type' ) == 'RESUME' ])} cached, { len ([m for m in messages if m ['body' ].get ('type' ) == 'RESUME' ])} new)" )
99+
100+ self ._cached_messages .clear ()
91101
92102 return resume_messages
0 commit comments