Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions aws_lambda_powertools/utilities/batch/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
BatchProcessor,
EventType,
)
from aws_lambda_powertools.utilities.batch.exceptions import UnexpectedBatchTypeError
from aws_lambda_powertools.warnings import PowertoolsDeprecationWarning

if TYPE_CHECKING:
Expand Down Expand Up @@ -204,6 +205,9 @@ def handler(event, context):
"""
try:
records: list[dict] = event.get("Records", [])
if not records or not isinstance(records, list):
raise UnexpectedBatchTypeError

except AttributeError:
event_types = ", ".join(list(EventType.__members__))
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
Expand Down Expand Up @@ -268,6 +272,9 @@ def handler(event, context):
"""
try:
records: list[dict] = event.get("Records", [])
if not records or not isinstance(records, list):
raise UnexpectedBatchTypeError

except AttributeError:
event_types = ", ".join(list(EventType.__members__))
docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line
Expand Down
9 changes: 9 additions & 0 deletions aws_lambda_powertools/utilities/batch/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ def __str__(self):
return self.format_exceptions(parent_exception_str)


class UnexpectedBatchTypeError(BatchProcessingError):
"""Error thrown by the Batch Processing utility when a partial processor receives an unexpected batch type"""

def __init__(self):
msg = "Unexpected batch type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams"
super().__init__(msg)
self.name = "UnexpectedBatchTypeError"


class SQSFifoCircuitBreakerError(Exception):
"""
Signals a record not processed due to the SQS FIFO processing being interrupted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
batch_processor,
process_partial_response,
)
from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError
from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError, UnexpectedBatchTypeError
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecord,
)
Expand Down Expand Up @@ -708,3 +708,69 @@ def test_async_process_partial_response_invalid_input(async_record_handler: Call
# WHEN/THEN
with pytest.raises(ValueError):
async_process_partial_response(batch, record_handler, processor)


@pytest.mark.parametrize(
"event",
[
{},
{"Records": None},
{"Records": "not a list"},
],
)
def test_process_partial_response_raises_unexpected_batch_type(event, record_handler):
# GIVEN a batch processor configured for SQS events
processor = BatchProcessor(event_type=EventType.SQS)

# WHEN processing an event with invalid Records
with pytest.raises(UnexpectedBatchTypeError) as exc_info:
process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
)

# THEN the correct error message is raised
assert "Unexpected batch type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams" in str(exc_info.value)


@pytest.mark.asyncio
@pytest.mark.parametrize(
"event",
[
{},
{"Records": None},
{"Records": "not a list"},
],
)
async def test_async_process_partial_response_raises_unexpected_batch_type(event, async_record_handler):
# GIVEN a batch processor configured for SQS events
processor = BatchProcessor(event_type=EventType.SQS)

# WHEN processing an event with invalid Records asynchronously
with pytest.raises(UnexpectedBatchTypeError) as exc_info:
await async_process_partial_response(
event=event,
record_handler=async_record_handler,
processor=processor,
)

# THEN the correct error message is raised
assert "Unexpected batch type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams" in str(exc_info.value)


def test_process_partial_response_should_not_raise_unexpected_batch_type(record_handler):
# GIVEN a valid SQS event structure
event = {"Records": [{"messageId": "1", "body": "test"}]}
processor = BatchProcessor(event_type=EventType.SQS)

# WHEN processing the event
try:
process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
)
except UnexpectedBatchTypeError:
# THEN no UnexpectedBatchTypeError should be raised
pytest.fail("UnexpectedBatchTypeError was raised with a valid event structure!")
Loading