-
Notifications
You must be signed in to change notification settings - Fork 23
feat(scribe): Implement SQS to eliminate state machine polling. #4639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4922fa4
a668222
c13b0da
7ab7824
3da4386
a8d59ce
0c3aef2
a7d682f
1b927d5
33c5f04
ea21ded
1ddbb45
3dcde0c
4650796
06daeef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| import json | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| import boto3 | ||
| from botocore.exceptions import ClientError | ||
|
|
||
| from ..settings import LOGGER | ||
|
|
||
|
|
||
| class SQSClient: | ||
| def __init__(self, queue_url: str): | ||
| self.queue_url = queue_url | ||
| self.sqs_client = boto3.client("sqs") | ||
eyw520 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._last_checked_count = 0 | ||
|
|
||
| def receive_messages(self, max_messages: int = 10) -> list[dict[str, Any]]: | ||
| try: | ||
| response = self.sqs_client.receive_message( | ||
| QueueUrl=self.queue_url, | ||
| MaxNumberOfMessages=min(max_messages, 10), | ||
| WaitTimeSeconds=0, # Short polling for Lambda | ||
| AttributeNames=["All"], | ||
| ) | ||
|
|
||
| messages = response.get("Messages", []) | ||
| parsed_messages = [] | ||
|
|
||
| for msg in messages: | ||
| try: | ||
| body = json.loads(msg["Body"]) | ||
| parsed_messages.append( | ||
| { | ||
| "body": body, | ||
| "receipt_handle": msg["ReceiptHandle"], | ||
| "message_id": msg["MessageId"], | ||
| } | ||
| ) | ||
| except json.JSONDecodeError: | ||
| LOGGER.warning(f"Failed to parse message body as JSON: {msg['Body']}") | ||
| continue | ||
|
|
||
| if parsed_messages: | ||
| self._last_checked_count += len(parsed_messages) | ||
| LOGGER.info(f"Received {len(parsed_messages)} messages from queue") | ||
|
|
||
| return parsed_messages | ||
|
|
||
| except ClientError as e: | ||
| LOGGER.error(f"Failed to receive messages from queue: {e.response['Error']['Message']}") | ||
| return [] | ||
| except Exception as e: | ||
| LOGGER.error(f"Unexpected error receiving messages: {str(e)}", exc_info=True) | ||
| return [] | ||
|
|
||
| def delete_message(self, receipt_handle: str) -> bool: | ||
| try: | ||
| self.sqs_client.delete_message(QueueUrl=self.queue_url, ReceiptHandle=receipt_handle) | ||
| LOGGER.debug("Deleted message from queue") | ||
| return True | ||
|
|
||
| except ClientError as e: | ||
| LOGGER.error(f"Failed to delete message: {e.response['Error']['Message']}") | ||
| return False | ||
| except Exception as e: | ||
| LOGGER.error(f"Unexpected error deleting message: {str(e)}", exc_info=True) | ||
| return False | ||
|
|
||
| def has_interrupt_message(self) -> tuple[bool, str | None]: | ||
| messages = self.receive_messages(max_messages=10) | ||
|
|
||
| for msg in messages: | ||
| body = msg["body"] | ||
| if body.get("type") == "INTERRUPT": | ||
| LOGGER.info(f"Found INTERRUPT message in queue") | ||
| return True, msg["receipt_handle"] | ||
|
|
||
| return False, None | ||
|
Comment on lines
69
to
80
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The View Details📝 Patch Detailsdiff --git a/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py b/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py
index 7df5e881d..f5703da45 100644
--- a/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py
+++ b/servers/fai-lambda/fai-scribe/src/utils/sqs_client.py
@@ -68,14 +68,21 @@ class SQSClient:
def has_interrupt_message(self) -> tuple[bool, str | None]:
messages = self.receive_messages(max_messages=10)
+ interrupt_receipt = None
for msg in messages:
body = msg["body"]
+ receipt_handle = msg["receipt_handle"]
+
if body.get("type") == "INTERRUPT":
LOGGER.info(f"Found INTERRUPT message in queue")
- return True, msg["receipt_handle"]
+ interrupt_receipt = receipt_handle
+ else:
+ # Delete non-INTERRUPT messages to prevent queue accumulation
+ self.delete_message(receipt_handle)
+ LOGGER.debug(f"Deleted processed {body.get('type', 'UNKNOWN')} message")
- return False, None
+ return interrupt_receipt is not None, interrupt_receipt
def get_resume_messages(self) -> list[dict[str, Any]]:
messages = self.receive_messages(max_messages=10)
@@ -83,8 +90,14 @@ class SQSClient:
for msg in messages:
body = msg["body"]
+ receipt_handle = msg["receipt_handle"]
+
if body.get("type") == "RESUME":
- resume_messages.append({"body": body, "receipt_handle": msg["receipt_handle"]})
+ resume_messages.append({"body": body, "receipt_handle": receipt_handle})
+ else:
+ # Delete non-RESUME messages to prevent queue accumulation
+ self.delete_message(receipt_handle)
+ LOGGER.debug(f"Deleted processed {body.get('type', 'UNKNOWN')} message")
if resume_messages:
LOGGER.info(f"Found {len(resume_messages)} RESUME messages in queue")
AnalysisSQS message accumulation in has_interrupt_message() and get_resume_messages()What fails: SQSClient.has_interrupt_message() and SQSClient.get_resume_messages() fetch messages but only delete specific message types (INTERRUPT/RESUME), leaving other messages in the queue to accumulate How to reproduce: # Simulate mixed message types in SQS queue
# Call has_interrupt_message() repeatedly
client = SQSClient(queue_url)
client.has_interrupt_message() # Fetches all messages, only deletes INTERRUPT
client.has_interrupt_message() # Re-fetches same non-INTERRUPT messagesResult: Non-INTERRUPT messages (RESUME, OTHER types) remain in queue after each poll and get re-processed on every subsequent call, causing message accumulation and duplicate processing Expected: All processed messages should be deleted from queue to prevent redelivery after SQS visibility timeout expires Root cause: Methods call receive_messages() but only delete messages of specific types, violating SQS best practices for message cleanup |
||
|
|
||
| def get_resume_messages(self) -> list[dict[str, Any]]: | ||
| messages = self.receive_messages(max_messages=10) | ||
| resume_messages = [] | ||
|
|
||
| for msg in messages: | ||
| body = msg["body"] | ||
| if body.get("type") == "RESUME": | ||
| resume_messages.append({"body": body, "receipt_handle": msg["receipt_handle"]}) | ||
|
|
||
| if resume_messages: | ||
| LOGGER.info(f"Found {len(resume_messages)} RESUME messages in queue") | ||
|
|
||
| return resume_messages | ||
Uh oh!
There was an error while loading. Please reload this page.