Skip to content
219 changes: 111 additions & 108 deletions docs/features/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,7 @@ description: Utility
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

```mermaid
stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
YourLogic: Your logic to process each batch item
LambdaResponse: Lambda response

BatchSource --> LambdaInit

LambdaInit --> BatchProcessor
BatchProcessor --> RecordHandler

state BatchProcessor {
[*] --> RecordHandler: Your function
RecordHandler --> YourLogic
}

RecordHandler --> BatchProcessor: Collect results
BatchProcessor --> LambdaResponse: Report items that failed processing
--8<-- "examples/snippets/batch/diagrams/batch_processing.mermaid"
```

## Key features
Expand All @@ -42,17 +22,13 @@ When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event sour
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire.

```mermaid
journey
section Conditions
Successful response: 5: Success
Maximum retries: 3: Failure
Records expired: 1: Failure
--8<-- "examples/snippets/batch/diagrams/journey.mermaid"
```

This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:

* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.

<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
<!-- markdownlint-disable MD013 -->
Expand Down Expand Up @@ -237,7 +213,7 @@ By default, we catch any exception raised by your record handler function. This

1. Any exception works here. See [extending `BatchProcessor` section, if you want to override this behavior.](#extending-batchprocessor)

2. Exceptions raised in `recordHandler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).
2. Exceptions raised in `recordHandler` will propagate to `processPartialResponse`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).

=== "Sample response"

Expand All @@ -263,19 +239,7 @@ Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-fr

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
Lambda function->>Lambda service: Report some failed messages
activate SQS queue
Lambda service->>SQS queue: Delete successful messages
SQS queue-->>SQS queue: Failed messages return
Note over SQS queue,Lambda service: Process repeat
deactivate SQS queue
--8<-- "examples/snippets/batch/diagrams/sqs.mermaid"
```
<i>SQS mechanism with Batch Item Failures</i>
</center>
Expand All @@ -288,22 +252,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages (1-2)
SQS queue-->>SQS queue: Failed messages return (3-10)
deactivate SQS queue
--8<-- "examples/snippets/batch/diagrams/sqs_fifo.mermaid"
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>
Expand All @@ -312,23 +261,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)

<center>
```mermaid
sequenceDiagram
autonumber
participant SQS queue
participant Lambda service
participant Lambda function
Lambda service->>SQS queue: Poll
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Process messages from another MessageGroupID
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
deactivate Lambda function
activate SQS queue
Lambda service->>SQS queue: Delete successful messages processed
SQS queue-->>SQS queue: Failed messages return
deactivate SQS queue
--8<-- "examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid"
```
<i>SQS FIFO mechanism with Batch Item Failures</i>
</center>
Expand All @@ -343,23 +276,7 @@ For brevity, we will use `Streams` to refer to either services. For theory on st

<center>
```mermaid
sequenceDiagram
autonumber
participant Streams
participant Lambda service
participant Lambda function
Lambda service->>Streams: Poll latest records
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3rd batch item
Lambda function-->Lambda function: Continue processing batch items (4-10)
Lambda function->>Lambda service: Report batch item as failure (3)
deactivate Lambda function
activate Streams
Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
Lambda service->>Streams: Poll records starting from updated checkpoint
deactivate Streams
--8<-- "examples/snippets/batch/diagrams/streams.mermaid"
```
<i>Kinesis and DynamoDB streams mechanism with single batch item failure</i>
</center>
Expand All @@ -370,29 +287,115 @@ The behavior changes slightly when there are multiple item failures. Stream chec

<center>
```mermaid
sequenceDiagram
autonumber
participant Streams
participant Lambda service
participant Lambda function
Lambda service->>Streams: Poll latest records
Lambda service->>Lambda function: Invoke (batch event)
activate Lambda function
Lambda function-->Lambda function: Process 2 out of 10 batch items
Lambda function--xLambda function: Fail on 3-5 batch items
Lambda function-->Lambda function: Continue processing batch items (6-10)
Lambda function->>Lambda service: Report batch items as failure (3-5)
deactivate Lambda function
activate Streams
Lambda service->>Streams: Checkpoints to lowest sequence number
Lambda service->>Streams: Poll records starting from updated checkpoint
deactivate Streams
--8<-- "examples/snippets/batch/diagrams/streams_multiple_failures.mermaid"
```
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
</center>

## Advanced

### Parser integration

The Batch Processing utility integrates with the [Parser utility](./parser.md) to automatically validate and parse each batch record before processing. This ensures your record handler receives properly typed and validated data, eliminating the need for manual parsing and validation.

To enable parser integration, import the `parser` function from `@aws-lambda-powertools/batch/parser` and pass it along with a schema when instantiating the `BatchProcessor`. This requires you to also [install the Parser utility](./parser.md#getting-started).

```typescript
import { parser } from '@aws-lambda-powertools/batch/parser';
```

You have two approaches for schema validation:

1. **Item schema only** (`innerSchema`) - Focus on your payload schema, we handle extending the base event structure
2. **Full event schema** (`schema`) - Validate the entire event record structure with complete control

#### Benefits of parser integration

Parser integration eliminates runtime errors from malformed data and provides compile-time type safety, making your code more reliable and easier to maintain. Invalid records are automatically marked as failed and won't reach your handler, reducing defensive coding.

=== "Without parser integration"

```typescript
const recordHandler = async (record: SQSRecord) => {
// Manual parsing with no type safety
const payload = JSON.parse(record.body); // any type
console.log(payload.name); // No autocomplete, runtime errors possible
};
```

=== "With parser integration"

```typescript
const mySchema = z.object({ name: z.string(), age: z.number() });

const recordHandler = async (record: ParsedRecord<SQSRecord, z.infer<typeof mySchema>>) => {
// Automatic validation and strong typing
console.log(record.body.name); // Full type safety and autocomplete
};
```

#### Using item schema only

When you want to focus on validating your payload without dealing with the full event structure, use `innerSchema`. We automatically extend the base event schema for you, reducing boilerplate while still validating the entire record.

Available transformers by event type:

| Event Type | Base Schema | Available Transformers | When to use transformer |
|------------|---------------------------|------------------------------|----------------------------------------------------------------|
| SQS | `SqsRecordSchema` | `json`, `base64` | `json` for stringified JSON, `base64` for encoded data |
| Kinesis | `KinesisDataStreamRecord` | `base64` | Required for Kinesis data (always base64 encoded) |
| DynamoDB | `DynamoDBStreamRecord` | `unmarshall` | Required to convert DynamoDB attribute values to plain objects |

=== "SQS with JSON payload"

```typescript hl_lines="6 12-15 19-21 27-28"
--8<-- "examples/snippets/batch/advanced_parser_item_sqs.ts"
```

=== "Sample Event"

```json hl_lines="6 22"
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
```

#### Using full event schema

For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.

=== "SQS"

```typescript hl_lines="6 15-17 22-24 30-31"
--8<-- "examples/snippets/batch/advanced_parser_sqs.ts"
```

=== "Kinesis Data Streams"

```typescript hl_lines="6 18-23 28-32 39 41-44"
--8<-- "examples/snippets/batch/advanced_parser_Kinesis.ts"
```

=== "DynamoDB Streams"

```typescript hl_lines="6 17-19 24-28 35 37"
--8<-- "examples/snippets/batch/advanced_parser_DynamoDB.ts"
```

#### Typed record handlers with ParsedRecord

To get full type safety in your record handlers, use the `ParsedRecord` utility type:

```typescript
import type { ParsedRecord } from '@aws-lambda-powertools/batch';

// For most cases - single schema
type MyRecord = ParsedRecord<SQSRecord, z.infer<typeof mySchema>>;

// For DynamoDB - separate schemas for NewImage and OldImage
type MyDynamoRecord = ParsedRecord<DynamoDBRecord, z.infer<typeof newSchema>, z.infer<typeof oldSchema>>;
```

This eliminates verbose type annotations and provides clean autocompletion for your parsed data.

### Accessing processed messages

Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
Expand Down
44 changes: 44 additions & 0 deletions examples/snippets/batch/advanced_parser_DynamoDB.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
import {
DynamoDBStreamChangeRecordBase,
DynamoDBStreamRecord,
} from '@aws-lambda-powertools/parser/schemas/dynamodb';
import type { DynamoDBRecord, DynamoDBStreamHandler } from 'aws-lambda';
import { z } from 'zod';

const myItemSchema = DynamoDBMarshalled(
z.object({ name: z.string(), age: z.number() })
);

const logger = new Logger();
const processor = new BatchProcessor(EventType.SQS, {
parser,
schema: DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
NewImage: myItemSchema,
}),
}),
logger,
});

const recordHandler = async ({
eventID,
dynamodb: {
NewImage: { name, age },
},
}: ParsedRecord<DynamoDBRecord, z.infer<typeof myItemSchema>>) => {
logger.info(`Processing record ${eventID}`, { name, age });
};

export const handler: DynamoDBStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
54 changes: 54 additions & 0 deletions examples/snippets/batch/advanced_parser_Kinesis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers';
import {
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
} from '@aws-lambda-powertools/parser/schemas/kinesis';
import type { KinesisDataStreamRecordEvent } from '@aws-lambda-powertools/parser/types';
import type { KinesisStreamHandler } from 'aws-lambda';
import { z } from 'zod';

const myItemSchema = Base64Encoded(
z.object({
name: z.string(),
age: z.number(),
})
);

const logger = new Logger();
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
parser,
schema: KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: myItemSchema,
}),
}),
logger,
});

const recordHandler = async ({
kinesis: {
sequenceNumber,
data: { name, age },
},
}: ParsedRecord<
KinesisDataStreamRecordEvent,
z.infer<typeof myItemSchema>
>) => {
logger.info(`Processing record: ${sequenceNumber}`, {
name,
age,
});
};

export const handler: KinesisStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
Loading