Integrate large payload support for SQS#2337
Conversation
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary. As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action Relates to: celery#279
The try/except block was triggering when sqs_extended_client isn't installed, which results in boto being overwritten with None
Introduce two tests to verify S3 client creation behavior: one for insecure connections and another for custom endpoint usage. This ensures proper configuration of boto3 client initialization in these scenarios.
There was a problem hiding this comment.
Pull Request Overview
This PR integrates Amazon SQS Extended Client Library support to handle messages larger than SQS's 256KB limit by transparently storing large payloads in S3.
Key Changes
- Added transparent large payload support using S3 storage for messages exceeding SQS limits
- Implemented S3 client creation and management methods for retrieving large message payloads
- Enhanced message processing to automatically detect and handle S3-stored messages
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| t/unit/transport/test_SQS.py | Comprehensive test coverage for S3 client creation, large message handling, error scenarios, and edge cases |
| requirements/test-ci.txt | Updated to include SQS extras requirement for CI testing |
| requirements/extras/sqs.txt | Added amazon-sqs-extended-client dependency |
| kombu/transport/SQS.py | Core implementation with S3 client methods and large payload detection logic |
| kombu/asynchronous/aws/sqs/ext.py | Added sqs_extended_client import with fallback |
| kombu/asynchronous/aws/ext.py | Added sqs_extended_client import with fallback |
| docs/reference/kombu.transport.SQS.rst | Documentation for large message support feature |
Comments suppressed due to low confidence (3)
t/unit/transport/test_SQS.py:580
- Move the import statement to the top of the file with other imports instead of importing within the test method.
MaxNumberOfMessages=kombu_message_count)['Messages']:
t/unit/transport/test_SQS.py:654
- Move the import statement to the top of the file with other imports instead of importing within the test method.
message = 'my test message'
t/unit/transport/test_SQS.py:695
- Move the import statement to the top of the file with other imports instead of importing within the test method.
assert self.channel.connection._deliver.call_count == 5
| secret_access_key='test_secret_key' | ||
| ) | ||
|
|
||
| # assert isinstance(client, boto3.client('s3').__class__) |
There was a problem hiding this comment.
Remove commented-out assertion code. If this check is needed, implement it properly; otherwise, delete the comment.
| # assert isinstance(client, boto3.client('s3').__class__) | |
| client = mock_session().client('s3') | |
| assert isinstance(client, boto3.client('s3').__class__) |
| if ( | ||
| sqs_extended_client and | ||
| isinstance(payload, list) | ||
| and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS |
There was a problem hiding this comment.
[nitpick] Consider extracting the sqs_extended_client.client.MESSAGE_POINTER_CLASS into a constant for better readability and maintainability.
| and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS | |
| and payload[0] == MESSAGE_POINTER_CLASS |
| # Check if this is a large payload stored in S3 | ||
| if ( | ||
| sqs_extended_client and | ||
| isinstance(payload, list) |
There was a problem hiding this comment.
The condition lacks proper error handling for cases where payload[0] access might fail if payload is an empty list. Consider adding bounds checking.
| isinstance(payload, list) | |
| isinstance(payload, list) | |
| and len(payload) > 0 |
No description provided.