Skip to content
191 changes: 85 additions & 106 deletions docs/features/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,7 @@ description: Utility
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

```mermaid
stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
YourLogic: Your logic to process each batch item
LambdaResponse: Lambda response

BatchSource --> LambdaInit

LambdaInit --> BatchProcessor
BatchProcessor --> RecordHandler

state BatchProcessor {
[*] --> RecordHandler: Your function
RecordHandler --> YourLogic
}

RecordHandler --> BatchProcessor: Collect results
BatchProcessor --> LambdaResponse: Report items that failed processing
--8<-- "examples/snippets/batch/diagrams/batch_processing.mermaid"
```

## Key features
Expand All @@ -42,11 +22,7 @@ When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event sour
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire.

```mermaid
journey
section Conditions
Successful response: 5: Success
Maximum retries: 3: Failure
Records expired: 1: Failure
--8<-- "examples/snippets/batch/diagrams/journey.mermaid"
```

This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
Expand Down Expand Up @@ -263,19 +239,7 @@ Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-fr

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
Lambda function->>Lambda service: Report some failed messages
activate SQS queue
Lambda service->>SQS queue: Delete successful messages
SQS queue-->>SQS queue: Failed messages return
Note over SQS queue,Lambda service: Process repeat
deactivate SQS queue
--8<-- "examples/snippets/batch/diagrams/sqs.mermaid"
```
<i>SQS mechanism with Batch Item Failures</i>
</center>
Expand All @@ -288,22 +252,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages (1-2)
SQS queue-->>SQS queue: Failed messages return (3-10)
deactivate SQS queue
--8<-- "examples/snippets/batch/diagrams/sqs_fifo.mermaid"
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>
Expand All @@ -312,23 +261,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Process messages from another MessageGroupID
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages processed
SQS queue-->>SQS queue: Failed messages return
deactivate SQS queue
--8<-- "examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid"
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>
Expand All @@ -343,23 +276,7 @@ For brevity, we will use `Streams` to refer to either services. For theory on st

<center>
```mermaid
sequenceDiagram
autonumber
participant Streams
participant Lambda service
participant Lambda function
Lambda service->>Streams: Poll latest records
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Continue processing batch items (4-10)
Lambda function->>Lambda service: Report batch item as failure (3)
deactivate Lambda function
activate Streams
Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
Lambda service->>Streams: Poll records starting from updated checkpoint
deactivate Streams
--8<-- "examples/snippets/batch/diagrams/streams.mermaid"
```
<i>Kinesis and DynamoDB streams mechanism with single batch item failure</i>
</center>
Expand All @@ -370,29 +287,91 @@ The behavior changes slightly when there are multiple item failures. Stream chec

<center>
```mermaid
sequenceDiagram
autonumber
participant Streams
participant Lambda service
participant Lambda function
Lambda service->>Streams: Poll latest records
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3-5 batch items
Lambda function-->Lambda function: Continue processing batch items (6-10)
Lambda function->>Lambda service: Report batch items as failure (3-5)
deactivate Lambda function
activate Streams
Lambda service->>Streams: Checkpoints to lowest sequence number
Lambda service->>Streams: Poll records starting from updated checkpoint
deactivate Streams
--8<-- "examples/snippets/batch/diagrams/streams_multiple_failures.mermaid"
```
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
</center>

## Advanced

### Parser integration

Thanks to the [Parser utility](./parser.md) integration, you can pass a [Standard Schema](https://standardschema.dev){target="_blank"}-compatible schema when instantiating the `BatchProcessor` and we will use it to validate each item in the batch before passing it to your record handler.

Since this is an opt-in feature, you will need to import the `parser` function from `@aws-lambda-powertools/batch/parser`, this allows us to keep the parsing logic separate from the main processing logic and avoid increasing the bundle size.

#### Using item schema only

When you only want to customize the schema of the item's payload you can pass an `innerSchema` objecta and we will use it to extend the base schema based on the `EventType` passed to the `BatchProcessor`.

When doing this, you can also specify a `transformer` to tell us how to transform the payload before validation.

=== "SQS - using inner payload"

```typescript hl_lines="6 11-14 19 26"
--8<-- "examples/snippets/batch/advanced_parser_item_sqs.ts"
```

=== "SQS - Sample Event"

```json hl_lines="6 22"
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
```

The example below shows how to use the `innerSchema` with `EventType.SQS`, but you can use it with other event types as well:

| Event Type | Base Schema | Transformer |
|------------|---------------------------|------------------------------|
| SQS | `SqsRecordSchema` | `json`, `base64` (optional) |
| Kinesis | `KinesisDataStreamRecord` | `base64` |
| DynamoDB | `DynamoDBStreamRecord` | `unmarshall` |

#### Extending built-in schemas

When you want more control over the schema, you can extend a [built-in schema](./parser.md#built-in-schemas) for SQS, Kinesis Data Streams, or DynamoDB Streams with your own custom schema for the payload and we'll parse each item before passing it to your record handler. If the payload does not match the schema, the item will be marked as failed.

=== "SQS"

=== "index.ts"

```typescript hl_lines="6 14-16 20-23 29"
--8<-- "examples/snippets/batch/advanced_parser_sqs.ts"
```

=== "Sample Event"

```json hl_lines="6 22"
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
```

=== "DynamoDB Streams"

=== "index.ts"

```typescript hl_lines="6 17-19 24-28 35"
--8<-- "examples/snippets/batch/advanced_parser_DynamoDB.ts"
```

=== "Sample Event"

```json hl_lines="13-18 39-44"
--8<-- "examples/snippets/batch/samples/parser_DynamoDB.json"
```

=== "Kinesis Data Streams"

=== "index.ts"

```typescript hl_lines="6 17-22 27-31 38"
--8<-- "examples/snippets/batch/advanced_parser_Kinesis.ts"
```

=== "Sample Event"

```json hl_lines="8 24"
--8<-- "examples/snippets/batch/samples/parser_Kinesis.json"
```

### Accessing processed messages

Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
Expand Down
51 changes: 51 additions & 0 deletions examples/snippets/batch/advanced_parser_DynamoDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import { Logger } from '@aws-lambda-powertools/logger';
import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
import {
DynamoDBStreamChangeRecordBase,
DynamoDBStreamRecord,
} from '@aws-lambda-powertools/parser/schemas/dynamodb';
import type { DynamoDBStreamEvent } from '@aws-lambda-powertools/parser/types';
import type { DynamoDBStreamHandler } from 'aws-lambda';
import { z } from 'zod';

const myItemSchema = DynamoDBMarshalled(
z.object({ name: z.string(), age: z.number() })
);

const logger = new Logger();
const processor = new BatchProcessor(EventType.SQS, {
parser,
schema: DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
NewImage: myItemSchema,
}),
}),
logger,
});

const recordHandler = async ({
eventID,
dynamodb: {
NewImage: { name, age },
},
}: Omit<DynamoDBStreamEvent['Records'][number], 'dynamodb'> & {
dynamodb: Omit<
DynamoDBStreamEvent['Records'][number]['dynamodb'],
'NewImage'
> & {
NewImage: z.infer<typeof myItemSchema>;
};
}) => {
logger.info(`Processing record ${eventID}`, { name, age });
};

export const handler: DynamoDBStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
54 changes: 54 additions & 0 deletions examples/snippets/batch/advanced_parser_Kinesis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import { Logger } from '@aws-lambda-powertools/logger';
import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers';
import {
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
} from '@aws-lambda-powertools/parser/schemas/kinesis';
import type { KinesisDataStreamRecordEvent } from '@aws-lambda-powertools/parser/types';
import type { KinesisStreamHandler } from 'aws-lambda';
import { z } from 'zod';

const myItemSchema = Base64Encoded(
z.object({
name: z.string(),
age: z.number(),
})
);

const logger = new Logger();
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
parser,
schema: KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: myItemSchema,
}),
}),
logger,
});

const recordHandler = async ({
kinesis: {
sequenceNumber,
data: { name, age },
},
}: Omit<KinesisDataStreamRecordEvent, 'kinesis'> & {
kinesis: Omit<KinesisDataStreamRecordEvent['kinesis'], 'data'> & {
data: z.infer<typeof myItemSchema>;
};
}) => {
logger.info(`Processing record: ${sequenceNumber}`, {
name,
age,
});
};

export const handler: KinesisStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
34 changes: 34 additions & 0 deletions examples/snippets/batch/advanced_parser_item_sqs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSHandler, SQSRecord } from 'aws-lambda';
import { z } from 'zod';

const myItemSchema = z.object({
name: z.string(),
age: z.number(),
});

const logger = new Logger();
const processor = new BatchProcessor(EventType.SQS, {
parser,
innerSchema: myItemSchema,
transformer: 'json',
logger,
});

const recordHandler = async ({
messageId,
body: { name, age },
}: SQSRecord & { body: z.infer<typeof myItemSchema> }) => {
logger.info(`Processing record ${messageId}`, { name, age });
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
Loading