-
Notifications
You must be signed in to change notification settings - Fork 22
feat(scribe): Implement SQS to eliminate state machine polling. #4639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
| def has_interrupt_message(self) -> tuple[bool, str | None]: | ||
| messages = self.receive_messages(max_messages=10) | ||
|
|
||
| for msg in messages: | ||
| body = msg["body"] | ||
| if body.get("type") == "INTERRUPT": | ||
| LOGGER.info(f"Found INTERRUPT message in queue") | ||
| return True, msg["receipt_handle"] | ||
|
|
||
| return False, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The has_interrupt_message() method fetches messages but doesn't acknowledge non-INTERRUPT messages, causing queue message accumulation and duplicates on retry.
View Details
📝 Patch Details
diff --git a/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py b/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py
index 7df5e881d..f5703da45 100644
--- a/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py
+++ b/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py
@@ -68,14 +68,21 @@ class SQSClient:
def has_interrupt_message(self) -> tuple[bool, str | None]:
messages = self.receive_messages(max_messages=10)
+ interrupt_receipt = None
for msg in messages:
body = msg["body"]
+ receipt_handle = msg["receipt_handle"]
+
if body.get("type") == "INTERRUPT":
LOGGER.info(f"Found INTERRUPT message in queue")
- return True, msg["receipt_handle"]
+ interrupt_receipt = receipt_handle
+ else:
+ # Delete non-INTERRUPT messages to prevent queue accumulation
+ self.delete_message(receipt_handle)
+ LOGGER.debug(f"Deleted processed {body.get('type', 'UNKNOWN')} message")
- return False, None
+ return interrupt_receipt is not None, interrupt_receipt
def get_resume_messages(self) -> list[dict[str, Any]]:
messages = self.receive_messages(max_messages=10)
@@ -83,8 +90,14 @@ class SQSClient:
for msg in messages:
body = msg["body"]
+ receipt_handle = msg["receipt_handle"]
+
if body.get("type") == "RESUME":
- resume_messages.append({"body": body, "receipt_handle": msg["receipt_handle"]})
+ resume_messages.append({"body": body, "receipt_handle": receipt_handle})
+ else:
+ # Delete non-RESUME messages to prevent queue accumulation
+ self.delete_message(receipt_handle)
+ LOGGER.debug(f"Deleted processed {body.get('type', 'UNKNOWN')} message")
if resume_messages:
LOGGER.info(f"Found {len(resume_messages)} RESUME messages in queue")
Analysis
SQS message accumulation in has_interrupt_message() and get_resume_messages()
What fails: SQSClient.has_interrupt_message() and SQSClient.get_resume_messages() fetch messages but only delete specific message types (INTERRUPT/RESUME), leaving other messages in the queue to accumulate
How to reproduce:
# Simulate mixed message types in SQS queue
# Call has_interrupt_message() repeatedly
client = SQSClient(queue_url)
client.has_interrupt_message() # Fetches all messages, only deletes INTERRUPT
client.has_interrupt_message() # Re-fetches same non-INTERRUPT messagesResult: Non-INTERRUPT messages (RESUME, OTHER types) remain in queue after each poll and get re-processed on every subsequent call, causing message accumulation and duplicate processing
Expected: All processed messages should be deleted from queue to prevent redelivery after SQS visibility timeout expires
Root cause: Methods call receive_messages() but only delete messages of specific types, violating SQS best practices for message cleanup
bc97f88 to
3dcde0c
Compare
No description provided.