Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,102 @@ Notice the following:
In case you want your method to retry certain cases, you need to raise `RetryableTaskException`.
You can provide on optional `delay` time for the retry, set `count_retries=False` in case you don't want to limit retries, or use `max_retries_func` to specify a function which will be invoked when the defined maximum number of retries is exhausted.

#### Cross-Account SQS Support

Django-eb-sqs supports accessing SQS queues from different AWS accounts. This is useful in multi-account architectures where you need to send messages to or process messages from queues in other AWS accounts.

##### Configuration

You can configure cross-account queues in two ways:

**Method 1: Cross-Account Queue Configuration**
```python
EB_SQS_CROSS_ACCOUNT_QUEUES = {
'external-queue': {
'account_id': '123456789012',
'region': 'us-west-2', # optional, defaults to EB_AWS_REGION
'queue_name': 'actual-queue-name' # optional, defaults to the key name
},
'prod-notifications': {
'account_id': '987654321098',
'queue_name': 'notification-queue'
}
}
```

**Method 2: Direct Queue URLs**
```python
EB_SQS_QUEUE_URLS = {
'external-queue': 'https://sqs.us-west-2.amazonaws.com/123456789012/actual-queue-name',
'prod-notifications': 'https://sqs.us-east-1.amazonaws.com/987654321098/notification-queue'
}
```

##### Usage

Once configured, you can use cross-account queues just like regular queues:

```python
from eb_sqs.decorators import task

@task(queue_name='external-queue')
def process_external_data(data):
print(f"Processing data from external account: {data}")

# Send a task to the external queue
process_external_data.delay(data={'key': 'value'})
```

For processing messages from cross-account queues:

```bash
python manage.py process_queue --queues external-queue,prod-notifications
```

You can also use full queue URLs directly:

```bash
python manage.py process_queue --queues https://sqs.us-west-2.amazonaws.com/123456789012/actual-queue-name
```

##### IAM Permissions

For cross-account access to work, ensure your IAM role/user has the necessary permissions:

1. **Cross-account role assumption** (recommended): Set up cross-account IAM roles
2. **Resource-based policies**: Configure SQS queue policies to allow access from your account
3. **Direct permissions**: Grant SQS permissions for the specific cross-account queues

Example SQS queue policy for cross-account access:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CrossAccountAccess",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::YOUR-ACCOUNT-ID:root"
},
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:us-west-2:123456789012:actual-queue-name"
}
]
}
```

#### Settings

The following settings can be used to fine tune django-eb-sqs. Copy them into your Django `settings.py` file.

- EB_AWS_REGION (`us-east-1`): The AWS region to use when working with SQS.
- EB_SQS_CROSS_ACCOUNT_QUEUES (`{}`): Dictionary mapping queue names to cross-account configurations.
- EB_SQS_QUEUE_URLS (`{}`): Dictionary mapping queue names to full SQS queue URLs.
- EB_SQS_MAX_NUMBER_OF_MESSAGES (`10`): The maximum number of messages to read in a single call from SQS (<= 10).
- EB_SQS_WAIT_TIME_S (`2`): The time to wait (seconds) when receiving messages from SQS.
- NO_QUEUES_WAIT_TIME_S (`5`): The time a workers waits if there are no SQS queues available to process.
Expand Down
95 changes: 95 additions & 0 deletions cross_account_example_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Django settings example for cross-account SQS support

This file demonstrates how to configure django-eb-sqs for cross-account SQS access.
Copy the relevant settings to your Django settings.py file.
"""

# Basic django-eb-sqs settings
EB_AWS_REGION = 'us-east-1'
EB_SQS_DEFAULT_QUEUE = 'eb-sqs-default'

# Method 1: Cross-Account Queue Configuration
# This method allows you to define cross-account queues by specifying the account ID,
# region, and queue name. The library will construct the full SQS URL automatically.
EB_SQS_CROSS_ACCOUNT_QUEUES = {
'external-notifications': {
'account_id': '123456789012', # Target AWS account ID
'region': 'us-west-2', # Optional: defaults to EB_AWS_REGION
'queue_name': 'notification-queue' # Optional: defaults to the key name
},
'prod-analytics': {
'account_id': '987654321098',
'queue_name': 'analytics-data-queue' # Will use us-east-1 region (default)
},
'partner-events': {
'account_id': '555666777888',
'region': 'eu-west-1',
'queue_name': 'partner-integration-events'
}
}

# Method 2: Direct Queue URLs
# This method allows you to specify full SQS queue URLs directly.
# Use this when you already know the exact URLs or when queue names have special characters.
EB_SQS_QUEUE_URLS = {
'legacy-system': 'https://sqs.us-east-1.amazonaws.com/111222333444/legacy-processing-queue',
'fifo-queue': 'https://sqs.us-west-2.amazonaws.com/123456789012/high-priority.fifo',
'dev-testing': 'https://sqs.eu-central-1.amazonaws.com/999888777666/development-test-queue'
}

# You can mix both methods - the library will check QUEUE_URLS first, then CROSS_ACCOUNT_QUEUES
# For example:
# - 'external-notifications' will use cross-account config (method 1)
# - 'legacy-system' will use direct URL (method 2)
# - 'regular-queue' will use standard same-account behavior

# Other standard django-eb-sqs settings remain the same
EB_SQS_AUTO_ADD_QUEUE = False # Cannot auto-create cross-account queues
EB_SQS_MAX_NUMBER_OF_MESSAGES = 10
EB_SQS_WAIT_TIME_S = 2
EB_SQS_DEFAULT_DELAY = 0
EB_SQS_DEFAULT_MAX_RETRIES = 3

# Example usage in your Django application:

"""
from eb_sqs.decorators import task

@task(queue_name='external-notifications')
def send_notification_to_external_system(data):
# This task will be sent to the cross-account queue
# https://sqs.us-west-2.amazonaws.com/123456789012/notification-queue
print(f"Processing notification: {data}")

@task(queue_name='legacy-system')
def process_legacy_data(payload):
# This task will be sent to the queue specified in QUEUE_URLS
print(f"Processing legacy data: {payload}")

@task(queue_name='regular-queue')
def normal_task(message):
# This will use standard same-account behavior
# Queue name will be prefixed if EB_SQS_QUEUE_PREFIX is set
print(f"Processing regular task: {message}")

# Send tasks to cross-account queues
send_notification_to_external_system.delay(data={'event': 'user_registered'})
process_legacy_data.delay(payload={'id': 12345, 'type': 'import'})
normal_task.delay(message='Hello World')
"""

# Worker command examples:
"""
# Process cross-account queues:
python manage.py process_queue --queues external-notifications,prod-analytics

# Process mix of cross-account and regular queues:
python manage.py process_queue --queues external-notifications,regular-queue,legacy-system

# Process using full URLs directly:
python manage.py process_queue --queues https://sqs.us-west-2.amazonaws.com/123456789012/notification-queue

# Use prefixes (only works with same-account queues):
python manage.py process_queue --queues prefix:dev-,external-notifications
"""
67 changes: 62 additions & 5 deletions eb_sqs/aws/sqs_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any
import re

import boto3
from botocore.config import Config
Expand All @@ -16,16 +17,70 @@ def __init__(self):
)
self.queue_cache = {}

def _is_queue_url(self, queue_identifier: str) -> bool:
"""Check if the queue identifier is a full SQS URL"""
return bool(re.match(settings.SQS_URL_PATTERN, queue_identifier))

def _get_queue_url(self, queue_name: str) -> str:
"""Get queue URL from configuration or construct standard name"""
# Check if there's a direct URL mapping for this queue name
if queue_name in settings.QUEUE_URLS:
return settings.QUEUE_URLS[queue_name]

# Check if there's a cross-account configuration for this queue
if queue_name in settings.CROSS_ACCOUNT_QUEUES:
cross_account_config = settings.CROSS_ACCOUNT_QUEUES[queue_name]
account_id = cross_account_config.get('account_id')
region = cross_account_config.get('region', settings.AWS_REGION)
actual_queue_name = cross_account_config.get('queue_name', queue_name)
return f"https://sqs.{region}.amazonaws.com/{account_id}/{actual_queue_name}"

# Return None if no special configuration - will use standard queue name
return None

def _get_queue(self, queue_name: str, use_cache: bool = True) -> Any:
# Check if queue_name is already a full URL
if self._is_queue_url(queue_name):
return self._get_sqs_queue_by_url(queue_name, use_cache)

# Get configured URL for this queue
queue_url = self._get_queue_url(queue_name)
if queue_url:
return self._get_sqs_queue_by_url(queue_url, use_cache)

# Use standard queue name logic for same-account queues
full_queue_name = '{}{}'.format(settings.QUEUE_PREFIX, queue_name)

queue = self._get_sqs_queue(full_queue_name, use_cache)
queue = self._get_sqs_queue_by_name(full_queue_name, use_cache)
if not queue:
queue = self._add_sqs_queue(full_queue_name)

return queue

def _get_sqs_queue(self, queue_name: str, use_cache: bool) -> Any:
def _get_sqs_queue_by_url(self, queue_url: str, use_cache: bool) -> Any:
"""Get queue using full URL (supports cross-account queues)"""
cache_key = f"url:{queue_url}"

if use_cache and self.queue_cache.get(cache_key):
return self.queue_cache[cache_key]

try:
# Use Queue constructor with URL for cross-account support
queue = self.sqs.Queue(queue_url)

# Verify queue exists by accessing its attributes
_ = queue.attributes

self.queue_cache[cache_key] = queue
return queue
except ClientError as ex:
error_code = ex.response.get('Error', {}).get('Code', None)
if error_code in ['AWS.SimpleQueueService.NonExistentQueue', 'QueueDoesNotExist']:
return None
else:
raise ex

def _get_sqs_queue_by_name(self, queue_name: str, use_cache: bool) -> Any:
"""Get queue by name (same-account only)"""
if use_cache and self.queue_cache.get(queue_name):
return self.queue_cache[queue_name]

Expand All @@ -41,6 +96,7 @@ def _get_sqs_queue(self, queue_name: str, use_cache: bool) -> Any:
raise ex

def _add_sqs_queue(self, queue_name: str) -> Any:
"""Create a new queue (same-account only)"""
if settings.AUTO_ADD_QUEUE:
queue = self.sqs.create_queue(
QueueName=queue_name,
Expand All @@ -63,7 +119,8 @@ def add_message(self, queue_name: str, msg: str, delay: int):
DelaySeconds=delay
)
except ClientError as ex:
if ex.response.get('Error', {}).get('Code', None) == 'AWS.SimpleQueueService.NonExistentQueue':
error_code = ex.response.get('Error', {}).get('Code', None)
if error_code in ['AWS.SimpleQueueService.NonExistentQueue', 'QueueDoesNotExist']:
queue = self._get_queue(queue_name, use_cache=False)
queue.send_message(
MessageBody=msg,
Expand Down
7 changes: 7 additions & 0 deletions eb_sqs/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

AWS_REGION = getattr(settings, 'EB_AWS_REGION', 'us-east-1') # type: str

# Cross-account SQS support
CROSS_ACCOUNT_QUEUES = getattr(settings, 'EB_SQS_CROSS_ACCOUNT_QUEUES', {}) # type: dict
QUEUE_URLS = getattr(settings, 'EB_SQS_QUEUE_URLS', {}) # type: dict

# SQS URL validation pattern - flexible to handle all AWS region formats and 12-digit account IDs
SQS_URL_PATTERN = r'^https://sqs\.[a-z0-9-]+\.amazonaws\.com/\d{12}/.+'

MAX_NUMBER_OF_MESSAGES = getattr(settings, 'EB_SQS_MAX_NUMBER_OF_MESSAGES', 10) # type: int
WAIT_TIME_S = getattr(settings, 'EB_SQS_WAIT_TIME_S', 2) # type: int
NO_QUEUES_WAIT_TIME_S = getattr(settings, 'NO_QUEUES_WAIT_TIME_S', 5) # type: int
Expand Down
28 changes: 27 additions & 1 deletion eb_sqs/worker/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
import signal
from datetime import timedelta
from time import sleep
Expand Down Expand Up @@ -168,7 +169,32 @@ def _execute_user_code(function: Any):
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=True)

def get_queues_by_names(self, sqs: ServiceResource, queue_names: list) -> list:
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
queues = []
for queue_name in queue_names:
# Check if it's a full URL
if self._is_queue_url(queue_name):
queue = sqs.Queue(queue_name)
# Check if there's a configured URL for this queue
elif queue_name in settings.QUEUE_URLS:
queue = sqs.Queue(settings.QUEUE_URLS[queue_name])
# Check if there's a cross-account configuration
elif queue_name in settings.CROSS_ACCOUNT_QUEUES:
cross_account_config = settings.CROSS_ACCOUNT_QUEUES[queue_name]
account_id = cross_account_config.get('account_id')
region = cross_account_config.get('region', settings.AWS_REGION)
actual_queue_name = cross_account_config.get('queue_name', queue_name)
queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{actual_queue_name}"
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated URL construction logic exists in both WorkerService.get_queues_by_names and SqsQueueClient._get_queue_url. Consider extracting this into a shared helper function in the settings module or a utility module to ensure consistency and ease of maintenance.

Copilot uses AI. Check for mistakes.
queue = sqs.Queue(queue_url)
else:
# Use standard queue name for same-account queues
queue = sqs.get_queue_by_name(QueueName=queue_name)

queues.append(queue)
return queues

def _is_queue_url(self, queue_identifier: str) -> bool:
"""Check if the queue identifier is a full SQS URL"""
return bool(re.match(settings.SQS_URL_PATTERN, queue_identifier))

def get_queues_by_prefixes(self, sqs: ServiceResource, prefixes: list) -> list:
queues = []
Expand Down
Loading