Skip to content

Commit bf16f6e

Browse files
authored
Merge branch 'develop' into tracer
Signed-off-by: Roger Zhang <[email protected]>
2 parents 78051f6 + 2b916c7 commit bf16f6e

File tree

9 files changed

+317
-75
lines changed

9 files changed

+317
-75
lines changed

CHANGELOG.md

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,48 +21,54 @@
2121

2222
* **data_classes:** Add CloudWatchAlarmEvent data class ([#3868](https://github.com/aws-powertools/powertools-lambda-python/issues/3868))
2323
* **event-handler:** add compress option when serving Swagger HTML ([#3946](https://github.com/aws-powertools/powertools-lambda-python/issues/3946))
24+
* **event_handler:** define exception_handler directly from the router ([#3979](https://github.com/aws-powertools/powertools-lambda-python/issues/3979))
25+
* **tracer:** auto-disable tracer when for AWS SAM and Chalice environments ([#3949](https://github.com/aws-powertools/powertools-lambda-python/issues/3949))
2426

2527
## Maintenance
2628

27-
* **deps:** bump docker/setup-buildx-action from 3.1.0 to 3.2.0 ([#3955](https://github.com/aws-powertools/powertools-lambda-python/issues/3955))
29+
* **deps:** bump squidfunk/mkdocs-material from `3678304` to `6c81a89` in /docs ([#3973](https://github.com/aws-powertools/powertools-lambda-python/issues/3973))
2830
* **deps:** bump redis from 5.0.2 to 5.0.3 ([#3929](https://github.com/aws-powertools/powertools-lambda-python/issues/3929))
2931
* **deps:** bump actions/checkout from 4.1.1 to 4.1.2 ([#3939](https://github.com/aws-powertools/powertools-lambda-python/issues/3939))
3032
* **deps:** bump datadog-lambda from 5.89.0 to 5.90.0 ([#3941](https://github.com/aws-powertools/powertools-lambda-python/issues/3941))
3133
* **deps:** bump the layer-balancer group in /layer/scripts/layer-balancer with 3 updates ([#3972](https://github.com/aws-powertools/powertools-lambda-python/issues/3972))
32-
* **deps:** bump squidfunk/mkdocs-material from `3678304` to `6c81a89` in /docs ([#3973](https://github.com/aws-powertools/powertools-lambda-python/issues/3973))
3334
* **deps:** bump pypa/gh-action-pypi-publish from 1.8.12 to 1.8.14 ([#3918](https://github.com/aws-powertools/powertools-lambda-python/issues/3918))
35+
* **deps:** bump aws-encryption-sdk from 3.1.1 to 3.2.0 ([#3983](https://github.com/aws-powertools/powertools-lambda-python/issues/3983))
36+
* **deps:** bump docker/setup-buildx-action from 3.1.0 to 3.2.0 ([#3955](https://github.com/aws-powertools/powertools-lambda-python/issues/3955))
3437
* **deps:** bump datadog-lambda from 5.90.0 to 5.91.0 ([#3958](https://github.com/aws-powertools/powertools-lambda-python/issues/3958))
35-
* **deps-dev:** bump aws-cdk-aws-lambda-python-alpha from 2.132.1a0 to 2.133.0a0 ([#3976](https://github.com/aws-powertools/powertools-lambda-python/issues/3976))
36-
* **deps-dev:** bump the boto-typing group with 1 update ([#3956](https://github.com/aws-powertools/powertools-lambda-python/issues/3956))
38+
* **deps-dev:** bump mkdocs-material from 9.5.13 to 9.5.14 ([#3978](https://github.com/aws-powertools/powertools-lambda-python/issues/3978))
39+
* **deps-dev:** bump the boto-typing group with 1 update ([#3964](https://github.com/aws-powertools/powertools-lambda-python/issues/3964))
40+
* **deps-dev:** bump aws-cdk from 2.132.1 to 2.133.0 ([#3963](https://github.com/aws-powertools/powertools-lambda-python/issues/3963))
3741
* **deps-dev:** bump cdklabs-generative-ai-cdk-constructs from 0.1.89 to 0.1.90 ([#3957](https://github.com/aws-powertools/powertools-lambda-python/issues/3957))
42+
* **deps-dev:** bump the boto-typing group with 1 update ([#3956](https://github.com/aws-powertools/powertools-lambda-python/issues/3956))
43+
* **deps-dev:** bump types-python-dateutil from 2.8.19.20240311 to 2.9.0.20240315 ([#3966](https://github.com/aws-powertools/powertools-lambda-python/issues/3966))
3844
* **deps-dev:** bump coverage from 7.4.3 to 7.4.4 ([#3959](https://github.com/aws-powertools/powertools-lambda-python/issues/3959))
39-
* **deps-dev:** bump the boto-typing group with 1 update ([#3964](https://github.com/aws-powertools/powertools-lambda-python/issues/3964))
45+
* **deps-dev:** bump black from 24.2.0 to 24.3.0 ([#3968](https://github.com/aws-powertools/powertools-lambda-python/issues/3968))
4046
* **deps-dev:** bump cdklabs-generative-ai-cdk-constructs from 0.1.88 to 0.1.89 ([#3952](https://github.com/aws-powertools/powertools-lambda-python/issues/3952))
4147
* **deps-dev:** bump sentry-sdk from 1.41.0 to 1.42.0 ([#3951](https://github.com/aws-powertools/powertools-lambda-python/issues/3951))
4248
* **deps-dev:** bump the boto-typing group with 1 update ([#3950](https://github.com/aws-powertools/powertools-lambda-python/issues/3950))
43-
* **deps-dev:** bump aws-cdk from 2.132.1 to 2.133.0 ([#3963](https://github.com/aws-powertools/powertools-lambda-python/issues/3963))
44-
* **deps-dev:** bump types-python-dateutil from 2.8.19.20240311 to 2.9.0.20240315 ([#3966](https://github.com/aws-powertools/powertools-lambda-python/issues/3966))
49+
* **deps-dev:** bump aws-cdk-lib from 2.132.1 to 2.133.0 ([#3965](https://github.com/aws-powertools/powertools-lambda-python/issues/3965))
50+
* **deps-dev:** bump ruff from 0.3.2 to 0.3.3 ([#3967](https://github.com/aws-powertools/powertools-lambda-python/issues/3967))
4551
* **deps-dev:** bump the boto-typing group with 2 updates ([#3940](https://github.com/aws-powertools/powertools-lambda-python/issues/3940))
4652
* **deps-dev:** bump cdklabs-generative-ai-cdk-constructs from 0.1.87 to 0.1.88 ([#3942](https://github.com/aws-powertools/powertools-lambda-python/issues/3942))
4753
* **deps-dev:** bump pytest from 8.0.2 to 8.1.1 ([#3943](https://github.com/aws-powertools/powertools-lambda-python/issues/3943))
4854
* **deps-dev:** bump aws-cdk-aws-lambda-python-alpha from 2.131.0a0 to 2.132.1a0 ([#3944](https://github.com/aws-powertools/powertools-lambda-python/issues/3944))
49-
* **deps-dev:** bump black from 24.2.0 to 24.3.0 ([#3968](https://github.com/aws-powertools/powertools-lambda-python/issues/3968))
55+
* **deps-dev:** bump pytest-asyncio from 0.23.5.post1 to 0.23.6 ([#3984](https://github.com/aws-powertools/powertools-lambda-python/issues/3984))
5056
* **deps-dev:** bump aws-cdk from 2.132.0 to 2.132.1 ([#3938](https://github.com/aws-powertools/powertools-lambda-python/issues/3938))
5157
* **deps-dev:** bump aws-cdk-lib from 2.131.0 to 2.132.1 ([#3936](https://github.com/aws-powertools/powertools-lambda-python/issues/3936))
52-
* **deps-dev:** bump aws-cdk-lib from 2.132.1 to 2.133.0 ([#3965](https://github.com/aws-powertools/powertools-lambda-python/issues/3965))
58+
* **deps-dev:** bump the boto-typing group with 2 updates ([#3974](https://github.com/aws-powertools/powertools-lambda-python/issues/3974))
5359
* **deps-dev:** bump aws-cdk from 2.131.0 to 2.132.0 ([#3928](https://github.com/aws-powertools/powertools-lambda-python/issues/3928))
5460
* **deps-dev:** bump types-redis from 4.6.0.20240218 to 4.6.0.20240311 ([#3931](https://github.com/aws-powertools/powertools-lambda-python/issues/3931))
5561
* **deps-dev:** bump types-python-dateutil from 2.8.19.20240106 to 2.8.19.20240311 ([#3932](https://github.com/aws-powertools/powertools-lambda-python/issues/3932))
5662
* **deps-dev:** bump cdklabs-generative-ai-cdk-constructs from 0.1.83 to 0.1.87 ([#3930](https://github.com/aws-powertools/powertools-lambda-python/issues/3930))
57-
* **deps-dev:** bump ruff from 0.3.2 to 0.3.3 ([#3967](https://github.com/aws-powertools/powertools-lambda-python/issues/3967))
63+
* **deps-dev:** bump types-python-dateutil from 2.9.0.20240315 to 2.9.0.20240316 ([#3977](https://github.com/aws-powertools/powertools-lambda-python/issues/3977))
5864
* **deps-dev:** bump ruff from 0.3.0 to 0.3.2 ([#3925](https://github.com/aws-powertools/powertools-lambda-python/issues/3925))
5965
* **deps-dev:** bump mypy from 1.8.0 to 1.9.0 ([#3921](https://github.com/aws-powertools/powertools-lambda-python/issues/3921))
60-
* **deps-dev:** bump the boto-typing group with 2 updates ([#3974](https://github.com/aws-powertools/powertools-lambda-python/issues/3974))
61-
* **deps-dev:** bump types-python-dateutil from 2.9.0.20240315 to 2.9.0.20240316 ([#3977](https://github.com/aws-powertools/powertools-lambda-python/issues/3977))
62-
* **deps-dev:** bump pytest-asyncio from 0.23.5 to 0.23.5.post1 ([#3923](https://github.com/aws-powertools/powertools-lambda-python/issues/3923))
6366
* **deps-dev:** bump cdklabs-generative-ai-cdk-constructs from 0.1.90 to 0.1.91 ([#3975](https://github.com/aws-powertools/powertools-lambda-python/issues/3975))
67+
* **deps-dev:** bump aws-cdk-aws-lambda-python-alpha from 2.132.1a0 to 2.133.0a0 ([#3976](https://github.com/aws-powertools/powertools-lambda-python/issues/3976))
68+
* **deps-dev:** bump pytest-asyncio from 0.23.5 to 0.23.5.post1 ([#3923](https://github.com/aws-powertools/powertools-lambda-python/issues/3923))
69+
* **deps-dev:** bump the boto-typing group with 2 updates ([#3982](https://github.com/aws-powertools/powertools-lambda-python/issues/3982))
6470
* **deps-dev:** bump the boto-typing group with 2 updates ([#3919](https://github.com/aws-powertools/powertools-lambda-python/issues/3919))
65-
* **deps-dev:** bump mkdocs-material from 9.5.13 to 9.5.14 ([#3978](https://github.com/aws-powertools/powertools-lambda-python/issues/3978))
71+
* **deps-dev:** bump cdklabs-generative-ai-cdk-constructs from 0.1.91 to 0.1.94 ([#3985](https://github.com/aws-powertools/powertools-lambda-python/issues/3985))
6672
* **deps-dev:** bump bandit from 1.7.7 to 1.7.8 ([#3920](https://github.com/aws-powertools/powertools-lambda-python/issues/3920))
6773

6874

aws_lambda_powertools/logging/logger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ def debug(
580580
extra=extra,
581581
)
582582

583-
def append_keys(self, **additional_keys) -> None:
583+
def append_keys(self, **additional_keys: object) -> None:
584584
self.registered_formatter.append_keys(**additional_keys)
585585

586586
def remove_keys(self, keys: Iterable[str]) -> None:

aws_lambda_powertools/utilities/batch/exceptions.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,19 @@ def __init__(self, msg="", child_exceptions: List[ExceptionInfo] | None = None):
3636
def __str__(self):
3737
parent_exception_str = super(BatchProcessingError, self).__str__()
3838
return self.format_exceptions(parent_exception_str)
39+
40+
41+
class SQSFifoCircuitBreakerError(Exception):
42+
"""
43+
Signals a record not processed due to the SQS FIFO processing being interrupted
44+
"""
45+
46+
pass
47+
48+
49+
class SQSFifoMessageGroupCircuitBreakerError(Exception):
50+
"""
51+
Signals a record not processed due to the SQS FIFO message group processing being interrupted
52+
"""
53+
54+
pass

aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
from typing import List, Optional, Tuple
2-
3-
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
1+
import logging
2+
from typing import Optional, Set
3+
4+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, ExceptionInfo, FailureResponse
5+
from aws_lambda_powertools.utilities.batch.exceptions import (
6+
SQSFifoCircuitBreakerError,
7+
SQSFifoMessageGroupCircuitBreakerError,
8+
)
49
from aws_lambda_powertools.utilities.batch.types import BatchSqsTypeModel
510

6-
7-
class SQSFifoCircuitBreakerError(Exception):
8-
"""
9-
Signals a record not processed due to the SQS FIFO processing being interrupted
10-
"""
11-
12-
pass
11+
logger = logging.getLogger(__name__)
1312

1413

1514
class SqsFifoPartialProcessor(BatchProcessor):
@@ -57,36 +56,59 @@ def lambda_handler(event, context: LambdaContext):
5756
None,
5857
)
5958

60-
def __init__(self, model: Optional["BatchSqsTypeModel"] = None):
61-
super().__init__(EventType.SQS, model)
59+
group_circuit_breaker_exc = (
60+
SQSFifoMessageGroupCircuitBreakerError,
61+
SQSFifoMessageGroupCircuitBreakerError("A previous record from this message group failed processing"),
62+
None,
63+
)
6264

63-
def process(self) -> List[Tuple]:
65+
def __init__(self, model: Optional["BatchSqsTypeModel"] = None, skip_group_on_error: bool = False):
6466
"""
65-
Call instance's handler for each record. When the first failed message is detected,
66-
the process is short-circuited, and the remaining messages are reported as failed items.
67+
Initialize the SqsFifoProcessor.
68+
69+
Parameters
70+
----------
71+
model: Optional["BatchSqsTypeModel"]
72+
An optional model for batch processing.
73+
skip_group_on_error: bool
74+
Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures
75+
Default is False.
76+
6777
"""
68-
result: List[Tuple] = []
78+
self._skip_group_on_error: bool = skip_group_on_error
79+
self._current_group_id = None
80+
self._failed_group_ids: Set[str] = set()
81+
super().__init__(EventType.SQS, model)
6982

70-
for i, record in enumerate(self.records):
71-
# If we have failed messages, it means that the last message failed.
72-
# We then short circuit the process, failing the remaining messages
73-
if self.fail_messages:
74-
return self._short_circuit_processing(i, result)
83+
def _process_record(self, record):
84+
self._current_group_id = record.get("attributes", {}).get("MessageGroupId")
7585

76-
# Otherwise, process the message normally
77-
result.append(self._process_record(record))
86+
# Short-circuits the process if:
87+
# - There are failed messages, OR
88+
# - The `skip_group_on_error` option is on, and the current message is part of a failed group.
89+
fail_entire_batch = bool(self.fail_messages) and not self._skip_group_on_error
90+
fail_group_id = self._skip_group_on_error and self._current_group_id in self._failed_group_ids
91+
if fail_entire_batch or fail_group_id:
92+
return self.failure_handler(
93+
record=self._to_batch_type(record, event_type=self.event_type, model=self.model),
94+
exception=self.group_circuit_breaker_exc if self._skip_group_on_error else self.circuit_breaker_exc,
95+
)
7896

79-
return result
97+
return super()._process_record(record)
8098

81-
def _short_circuit_processing(self, first_failure_index: int, result: List[Tuple]) -> List[Tuple]:
82-
"""
83-
Starting from the first failure index, fail all the remaining messages, and append them to the result list.
84-
"""
85-
remaining_records = self.records[first_failure_index:]
86-
for remaining_record in remaining_records:
87-
data = self._to_batch_type(record=remaining_record, event_type=self.event_type, model=self.model)
88-
result.append(self.failure_handler(record=data, exception=self.circuit_breaker_exc))
89-
return result
99+
def failure_handler(self, record, exception: ExceptionInfo) -> FailureResponse:
100+
# If we are failing a message and the `skip_group_on_error` is on, we store the failed group ID
101+
# This way, future messages with the same group ID will be failed automatically.
102+
if self._skip_group_on_error and self._current_group_id:
103+
self._failed_group_ids.add(self._current_group_id)
104+
105+
return super().failure_handler(record, exception)
106+
107+
def _clean(self):
108+
self._failed_group_ids.clear()
109+
self._current_group_id = None
110+
111+
super()._clean()
90112

91113
async def _async_process_record(self, record: dict):
92114
raise NotImplementedError()

docs/utilities/batch.md

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,11 @@ Processing batches from SQS works in three stages:
141141

142142
#### FIFO queues
143143

144-
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank" rel="nofollow"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
145-
This helps preserve the ordering of messages in your queue.
144+
When working with [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, a batch may include messages from different group IDs.
145+
146+
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
147+
148+
Enable the `skip_group_on_error` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.
146149

147150
=== "Recommended"
148151

@@ -164,6 +167,12 @@ This helps preserve the ordering of messages in your queue.
164167
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py"
165168
```
166169

170+
=== "Enabling skip_group_on_error flag"
171+
172+
```python hl_lines="2-6 9 23"
173+
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_skip_on_error.py"
174+
```
175+
167176
### Processing messages from Kinesis
168177

169178
Processing batches from Kinesis works in three stages:
@@ -311,7 +320,7 @@ sequenceDiagram
311320

312321
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
313322
314-
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
323+
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skip_group_on_error` flag.
315324

316325
<center>
317326
```mermaid
@@ -335,6 +344,31 @@ sequenceDiagram
335344
<i>SQS FIFO mechanism with Batch Item Failures</i>
336345
</center>
337346

347+
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skip_group_on_error` flag.
348+
349+
<center>
350+
```mermaid
351+
sequenceDiagram
352+
autonumber
353+
participant SQS queue
354+
participant Lambda service
355+
participant Lambda function
356+
Lambda service->>SQS queue: Poll
357+
Lambda service->>Lambda function: Invoke (batch event)
358+
activate Lambda function
359+
Lambda function-->Lambda function: Process 2 out of 10 batch items
360+
Lambda function--xLambda function: Fail on 3rd batch item
361+
Lambda function-->Lambda function: Process messages from another MessageGroupID
362+
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
363+
deactivate Lambda function
364+
activate SQS queue
365+
Lambda service->>SQS queue: Delete successful messages processed
366+
SQS queue-->>SQS queue: Failed messages return
367+
deactivate SQS queue
368+
```
369+
<i>SQS FIFO mechanism with Batch Item Failures</i>
370+
</center>
371+
338372
#### Kinesis and DynamoDB Streams
339373

340374
> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
SqsFifoPartialProcessor,
4+
process_partial_response,
5+
)
6+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
processor = SqsFifoPartialProcessor(skip_group_on_error=True)
10+
tracer = Tracer()
11+
logger = Logger()
12+
13+
14+
@tracer.capture_method
15+
def record_handler(record: SQSRecord):
16+
payload: str = record.json_body # if json string data, otherwise record.body for str
17+
logger.info(payload)
18+
19+
20+
@logger.inject_lambda_context
21+
@tracer.capture_lambda_handler
22+
def lambda_handler(event, context: LambdaContext):
23+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

0 commit comments

Comments
 (0)