diff --git a/docs/features/batch.md b/docs/features/batch.md
index ab0d81b568..7c5de6d3da 100644
--- a/docs/features/batch.md
+++ b/docs/features/batch.md
@@ -6,27 +6,7 @@ description: Utility
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
```mermaid
-stateDiagram-v2
- direction LR
- BatchSource: Amazon SQS
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
- LambdaInit: Lambda invocation
- BatchProcessor: Batch Processor
- RecordHandler: Record Handler function
- YourLogic: Your logic to process each batch item
- LambdaResponse: Lambda response
-
- BatchSource --> LambdaInit
-
- LambdaInit --> BatchProcessor
- BatchProcessor --> RecordHandler
-
- state BatchProcessor {
- [*] --> RecordHandler: Your function
- RecordHandler --> YourLogic
- }
-
- RecordHandler --> BatchProcessor: Collect results
- BatchProcessor --> LambdaResponse: Report items that failed processing
+--8<-- "examples/snippets/batch/diagrams/batch_processing.mermaid"
```
## Key features
@@ -42,17 +22,13 @@ When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event sour
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire.
```mermaid
-journey
- section Conditions
- Successful response: 5: Success
- Maximum retries: 3: Failure
- Records expired: 1: Failure
+--8<-- "examples/snippets/batch/diagrams/journey.mermaid"
```
This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
-* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
+* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as the checkpoint.
@@ -237,7 +213,7 @@ By default, we catch any exception raised by your record handler function. This
1. Any exception works here. See [extending `BatchProcessor` section, if you want to override this behavior.](#extending-batchprocessor)
- 2. Exceptions raised in `recordHandler` will propagate to `process_partial_response`.
We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).
+ 2. Errors raised in `recordHandler` will propagate to `processPartialResponse`.
We catch them and include each failed batch item identifier in the response object (see `Sample response` tab).
=== "Sample response"
@@ -263,19 +239,7 @@ Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-fr
```mermaid
-sequenceDiagram
- autonumber
- participant SQS queue
- participant Lambda service
- participant Lambda function
- Lambda service->>SQS queue: Poll
- Lambda service->>Lambda function: Invoke (batch event)
- Lambda function->>Lambda service: Report some failed messages
- activate SQS queue
- Lambda service->>SQS queue: Delete successful messages
- SQS queue-->>SQS queue: Failed messages return
- Note over SQS queue,Lambda service: Process repeat
- deactivate SQS queue
+--8<-- "examples/snippets/batch/diagrams/sqs.mermaid"
```
SQS mechanism with Batch Item Failures
@@ -288,22 +252,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)
```mermaid
-sequenceDiagram
- autonumber
- participant SQS queue
- participant Lambda service
- participant Lambda function
- Lambda service->>SQS queue: Poll
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3rd batch item
- Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
- deactivate Lambda function
- activate SQS queue
- Lambda service->>SQS queue: Delete successful messages (1-2)
- SQS queue-->>SQS queue: Failed messages return (3-10)
- deactivate SQS queue
+--8<-- "examples/snippets/batch/diagrams/sqs_fifo.mermaid"
```
SQS FIFO mechanism with Batch Item Failures
@@ -312,23 +261,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)
```mermaid
-sequenceDiagram
- autonumber
- participant SQS queue
- participant Lambda service
- participant Lambda function
- Lambda service->>SQS queue: Poll
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3rd batch item
- Lambda function-->Lambda function: Process messages from another MessageGroupID
- Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
- deactivate Lambda function
- activate SQS queue
- Lambda service->>SQS queue: Delete successful messages processed
- SQS queue-->>SQS queue: Failed messages return
- deactivate SQS queue
+--8<-- "examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid"
```
SQS FIFO mechanism with Batch Item Failures
@@ -343,23 +276,7 @@ For brevity, we will use `Streams` to refer to either services. For theory on st
```mermaid
-sequenceDiagram
- autonumber
- participant Streams
- participant Lambda service
- participant Lambda function
- Lambda service->>Streams: Poll latest records
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3rd batch item
- Lambda function-->Lambda function: Continue processing batch items (4-10)
- Lambda function->>Lambda service: Report batch item as failure (3)
- deactivate Lambda function
- activate Streams
- Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
- Lambda service->>Streams: Poll records starting from updated checkpoint
- deactivate Streams
+--8<-- "examples/snippets/batch/diagrams/streams.mermaid"
```
Kinesis and DynamoDB streams mechanism with single batch item failure
@@ -370,29 +287,115 @@ The behavior changes slightly when there are multiple item failures. Stream chec
```mermaid
-sequenceDiagram
- autonumber
- participant Streams
- participant Lambda service
- participant Lambda function
- Lambda service->>Streams: Poll latest records
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3-5 batch items
- Lambda function-->Lambda function: Continue processing batch items (6-10)
- Lambda function->>Lambda service: Report batch items as failure (3-5)
- deactivate Lambda function
- activate Streams
- Lambda service->>Streams: Checkpoints to lowest sequence number
- Lambda service->>Streams: Poll records starting from updated checkpoint
- deactivate Streams
+--8<-- "examples/snippets/batch/diagrams/streams_multiple_failures.mermaid"
```
Kinesis and DynamoDB streams mechanism with multiple batch item failures
## Advanced
+### Parser integration
+
+The Batch Processing utility integrates with the [Parser utility](./parser.md) to automatically validate and parse each batch record before processing. This ensures your record handler receives properly typed and validated data, eliminating the need for manual parsing and validation.
+
+To enable parser integration, import the `parser` function from `@aws-lambda-powertools/batch/parser` and pass it along with a schema when instantiating the `BatchProcessor`. This requires you to also [install the Parser utility](./parser.md#getting-started).
+
+```typescript
+import { parser } from '@aws-lambda-powertools/batch/parser';
+```
+
+You have two approaches for schema validation:
+
+1. **Item schema only** (`innerSchema`) - Focus on your payload schema, we handle extending the base event structure
+2. **Full event schema** (`schema`) - Validate the entire event record structure with complete control
+
+#### Benefits of parser integration
+
+Parser integration eliminates runtime errors from malformed data and provides compile-time type safety, making your code more reliable and easier to maintain. Invalid records are automatically marked as failed and won't reach your handler, reducing defensive coding.
+
+=== "Without parser integration"
+
+ ```typescript
+ const recordHandler = async (record: SQSRecord) => {
+ // Manual parsing with no type safety
+ const payload = JSON.parse(record.body); // any type
+ console.log(payload.name); // No autocomplete, runtime errors possible
+ };
+ ```
+
+=== "With parser integration"
+
+ ```typescript
+ const mySchema = z.object({ name: z.string(), age: z.number() });
+
+ const recordHandler = async (record: ParsedRecord>) => {
+ // Automatic validation and strong typing
+ console.log(record.body.name); // Full type safety and autocomplete
+ };
+ ```
+
+#### Using item schema only
+
+When you want to focus on validating your payload without dealing with the full event structure, use `innerSchema`. We automatically extend the base event schema for you, reducing boilerplate code while still validating the entire record.
+
+Available transformers by event type:
+
+| Event Type | Base Schema | Available Transformers | When to use transformer |
+|------------|---------------------------|------------------------------|----------------------------------------------------------------|
+| SQS | `SqsRecordSchema` | `json`, `base64` | `json` for stringified JSON, `base64` for encoded data |
+| Kinesis | `KinesisDataStreamRecord` | `base64` | Required for Kinesis data (always base64 encoded) |
+| DynamoDB | `DynamoDBStreamRecord` | `unmarshall` | Required to convert DynamoDB attribute values to plain objects |
+
+=== "SQS with JSON payload"
+
+ ```typescript hl_lines="6 12-15 19-21 27-28"
+ --8<-- "examples/snippets/batch/advanced_parser_item_sqs.ts"
+ ```
+
+=== "Sample Event"
+
+ ```json hl_lines="6 22"
+ --8<-- "examples/snippets/batch/samples/parser_SQS.json"
+ ```
+
+#### Using full event schema
+
+For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.
+
+=== "SQS"
+
+ ```typescript hl_lines="6 15-17 22-24 30-31"
+ --8<-- "examples/snippets/batch/advanced_parser_sqs.ts"
+ ```
+
+=== "Kinesis Data Streams"
+
+ ```typescript hl_lines="6 18-23 28-32 39 41-44"
+ --8<-- "examples/snippets/batch/advanced_parser_Kinesis.ts"
+ ```
+
+=== "DynamoDB Streams"
+
+ ```typescript hl_lines="6 17-19 24-28 35 37"
+ --8<-- "examples/snippets/batch/advanced_parser_DynamoDB.ts"
+ ```
+
+#### Typed record handlers with ParsedRecord
+
+To get full type safety in your record handlers, use the `ParsedRecord` utility type:
+
+```typescript
+import type { ParsedRecord } from '@aws-lambda-powertools/batch';
+
+// For most cases - single schema
+type MyRecord = ParsedRecord>;
+
+// For DynamoDB - separate schemas for NewImage and OldImage
+type MyDynamoRecord = ParsedRecord, z.infer>;
+```
+
+This eliminates verbose type annotations and provides clean autocompletion for your parsed data.
+
### Accessing processed messages
Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
diff --git a/examples/snippets/batch/advanced_parser_DynamoDB.ts b/examples/snippets/batch/advanced_parser_DynamoDB.ts
new file mode 100644
index 0000000000..738e2feaea
--- /dev/null
+++ b/examples/snippets/batch/advanced_parser_DynamoDB.ts
@@ -0,0 +1,44 @@
+import {
+ BatchProcessor,
+ EventType,
+ processPartialResponse,
+} from '@aws-lambda-powertools/batch';
+import { parser } from '@aws-lambda-powertools/batch/parser';
+import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
+import { Logger } from '@aws-lambda-powertools/logger';
+import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
+import {
+ DynamoDBStreamChangeRecordBase,
+ DynamoDBStreamRecord,
+} from '@aws-lambda-powertools/parser/schemas/dynamodb';
+import type { DynamoDBRecord, DynamoDBStreamHandler } from 'aws-lambda';
+import { z } from 'zod';
+
+const myItemSchema = DynamoDBMarshalled(
+ z.object({ name: z.string(), age: z.number() })
+);
+
+const logger = new Logger();
+const processor = new BatchProcessor(EventType.SQS, {
+ parser,
+ schema: DynamoDBStreamRecord.extend({
+ dynamodb: DynamoDBStreamChangeRecordBase.extend({
+ NewImage: myItemSchema,
+ }),
+ }),
+ logger,
+});
+
+const recordHandler = async ({
+ eventID,
+ dynamodb: {
+ NewImage: { name, age },
+ },
+}: ParsedRecord>) => {
+ logger.info(`Processing record ${eventID}`, { name, age });
+};
+
+export const handler: DynamoDBStreamHandler = async (event, context) =>
+ processPartialResponse(event, recordHandler, processor, {
+ context,
+ });
diff --git a/examples/snippets/batch/advanced_parser_Kinesis.ts b/examples/snippets/batch/advanced_parser_Kinesis.ts
new file mode 100644
index 0000000000..bd71918eb6
--- /dev/null
+++ b/examples/snippets/batch/advanced_parser_Kinesis.ts
@@ -0,0 +1,54 @@
+import {
+ BatchProcessor,
+ EventType,
+ processPartialResponse,
+} from '@aws-lambda-powertools/batch';
+import { parser } from '@aws-lambda-powertools/batch/parser';
+import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
+import { Logger } from '@aws-lambda-powertools/logger';
+import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers';
+import {
+ KinesisDataStreamRecord,
+ KinesisDataStreamRecordPayload,
+} from '@aws-lambda-powertools/parser/schemas/kinesis';
+import type { KinesisDataStreamRecordEvent } from '@aws-lambda-powertools/parser/types';
+import type { KinesisStreamHandler } from 'aws-lambda';
+import { z } from 'zod';
+
+const myItemSchema = Base64Encoded(
+ z.object({
+ name: z.string(),
+ age: z.number(),
+ })
+);
+
+const logger = new Logger();
+const processor = new BatchProcessor(EventType.KinesisDataStreams, {
+ parser,
+ schema: KinesisDataStreamRecord.extend({
+ kinesis: KinesisDataStreamRecordPayload.extend({
+ data: myItemSchema,
+ }),
+ }),
+ logger,
+});
+
+const recordHandler = async ({
+ kinesis: {
+ sequenceNumber,
+ data: { name, age },
+ },
+}: ParsedRecord<
+ KinesisDataStreamRecordEvent,
+ z.infer
+>) => {
+ logger.info(`Processing record: ${sequenceNumber}`, {
+ name,
+ age,
+ });
+};
+
+export const handler: KinesisStreamHandler = async (event, context) =>
+ processPartialResponse(event, recordHandler, processor, {
+ context,
+ });
diff --git a/examples/snippets/batch/advanced_parser_item_sqs.ts b/examples/snippets/batch/advanced_parser_item_sqs.ts
new file mode 100644
index 0000000000..982280a43d
--- /dev/null
+++ b/examples/snippets/batch/advanced_parser_item_sqs.ts
@@ -0,0 +1,35 @@
+import {
+ BatchProcessor,
+ EventType,
+ processPartialResponse,
+} from '@aws-lambda-powertools/batch';
+import { parser } from '@aws-lambda-powertools/batch/parser';
+import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
+import { Logger } from '@aws-lambda-powertools/logger';
+import type { SQSHandler, SQSRecord } from 'aws-lambda';
+import { z } from 'zod';
+
+const myItemSchema = z.object({
+ name: z.string(),
+ age: z.number(),
+});
+
+const logger = new Logger();
+const processor = new BatchProcessor(EventType.SQS, {
+ parser,
+ innerSchema: myItemSchema,
+ transformer: 'json',
+ logger,
+});
+
+const recordHandler = async ({
+ messageId,
+ body: { name, age },
+}: ParsedRecord>) => {
+ logger.info(`Processing record ${messageId}`, { name, age });
+};
+
+export const handler: SQSHandler = async (event, context) =>
+ processPartialResponse(event, recordHandler, processor, {
+ context,
+ });
diff --git a/examples/snippets/batch/advanced_parser_sqs.ts b/examples/snippets/batch/advanced_parser_sqs.ts
new file mode 100644
index 0000000000..8caee32588
--- /dev/null
+++ b/examples/snippets/batch/advanced_parser_sqs.ts
@@ -0,0 +1,38 @@
+import {
+ BatchProcessor,
+ EventType,
+ processPartialResponse,
+} from '@aws-lambda-powertools/batch';
+import { parser } from '@aws-lambda-powertools/batch/parser';
+import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
+import { Logger } from '@aws-lambda-powertools/logger';
+import { JSONStringified } from '@aws-lambda-powertools/parser/helpers';
+import { SqsRecordSchema } from '@aws-lambda-powertools/parser/schemas';
+import type { SqsRecord } from '@aws-lambda-powertools/parser/types';
+import type { SQSHandler } from 'aws-lambda';
+import { z } from 'zod';
+
+const myItemSchema = JSONStringified(
+ z.object({ name: z.string(), age: z.number() })
+);
+
+const logger = new Logger();
+const processor = new BatchProcessor(EventType.SQS, {
+ parser,
+ schema: SqsRecordSchema.extend({
+ body: myItemSchema,
+ }),
+ logger,
+});
+
+const recordHandler = async ({
+ messageId,
+ body: { name, age },
+}: ParsedRecord>) => {
+ logger.info(`Processing record ${messageId}`, { name, age });
+};
+
+export const handler: SQSHandler = async (event, context) =>
+ processPartialResponse(event, recordHandler, processor, {
+ context,
+ });
diff --git a/examples/snippets/batch/diagrams/batch_processing.mermaid b/examples/snippets/batch/diagrams/batch_processing.mermaid
new file mode 100644
index 0000000000..ad978a0649
--- /dev/null
+++ b/examples/snippets/batch/diagrams/batch_processing.mermaid
@@ -0,0 +1,21 @@
+stateDiagram-v2
+ direction LR
+ BatchSource: Amazon SQS
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
+ LambdaInit: Lambda invocation
+ BatchProcessor: Batch Processor
+ RecordHandler: Record Handler function
+ YourLogic: Your logic to process each batch item
+ LambdaResponse: Lambda response
+
+ BatchSource --> LambdaInit
+
+ LambdaInit --> BatchProcessor
+ BatchProcessor --> RecordHandler
+
+ state BatchProcessor {
+ [*] --> RecordHandler: Your function
+ RecordHandler --> YourLogic
+ }
+
+ RecordHandler --> BatchProcessor: Collect results
+ BatchProcessor --> LambdaResponse: Report items that failed processing
\ No newline at end of file
diff --git a/examples/snippets/batch/diagrams/journey.mermaid b/examples/snippets/batch/diagrams/journey.mermaid
new file mode 100644
index 0000000000..a6b3bc95e8
--- /dev/null
+++ b/examples/snippets/batch/diagrams/journey.mermaid
@@ -0,0 +1,5 @@
+journey
+ section Conditions
+ Successful response: 5: Success
+ Maximum retries: 3: Failure
+ Records expired: 1: Failure
\ No newline at end of file
diff --git a/examples/snippets/batch/diagrams/sqs.mermaid b/examples/snippets/batch/diagrams/sqs.mermaid
new file mode 100644
index 0000000000..a728bd37b2
--- /dev/null
+++ b/examples/snippets/batch/diagrams/sqs.mermaid
@@ -0,0 +1,13 @@
+sequenceDiagram
+ autonumber
+ participant SQS queue
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>SQS queue: Poll
+ Lambda service->>Lambda function: Invoke (batch event)
+ Lambda function->>Lambda service: Report some failed messages
+ activate SQS queue
+ Lambda service->>SQS queue: Delete successful messages
+ SQS queue-->>SQS queue: Failed messages return
+ Note over SQS queue,Lambda service: Process repeat
+ deactivate SQS queue
\ No newline at end of file
diff --git a/examples/snippets/batch/diagrams/sqs_fifo.mermaid b/examples/snippets/batch/diagrams/sqs_fifo.mermaid
new file mode 100644
index 0000000000..e5151ceb19
--- /dev/null
+++ b/examples/snippets/batch/diagrams/sqs_fifo.mermaid
@@ -0,0 +1,16 @@
+sequenceDiagram
+ autonumber
+ participant SQS queue
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>SQS queue: Poll
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3rd batch item
+ Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
+ deactivate Lambda function
+ activate SQS queue
+ Lambda service->>SQS queue: Delete successful messages (1-2)
+ SQS queue-->>SQS queue: Failed messages return (3-10)
+ deactivate SQS queue
\ No newline at end of file
diff --git a/examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid b/examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid
new file mode 100644
index 0000000000..729f47914e
--- /dev/null
+++ b/examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid
@@ -0,0 +1,17 @@
+sequenceDiagram
+ autonumber
+ participant SQS queue
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>SQS queue: Poll
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3rd batch item
+ Lambda function-->Lambda function: Process messages from another MessageGroupID
+ Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
+ deactivate Lambda function
+ activate SQS queue
+ Lambda service->>SQS queue: Delete successful messages processed
+ SQS queue-->>SQS queue: Failed messages return
+ deactivate SQS queue
\ No newline at end of file
diff --git a/examples/snippets/batch/diagrams/streams.mermaid b/examples/snippets/batch/diagrams/streams.mermaid
new file mode 100644
index 0000000000..bc634876e1
--- /dev/null
+++ b/examples/snippets/batch/diagrams/streams.mermaid
@@ -0,0 +1,17 @@
+sequenceDiagram
+ autonumber
+ participant Streams
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>Streams: Poll latest records
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3rd batch item
+ Lambda function-->Lambda function: Continue processing batch items (4-10)
+ Lambda function->>Lambda service: Report batch item as failure (3)
+ deactivate Lambda function
+ activate Streams
+ Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
+ Lambda service->>Streams: Poll records starting from updated checkpoint
+ deactivate Streams
\ No newline at end of file
diff --git a/examples/snippets/batch/diagrams/streams_multiple_failures.mermaid b/examples/snippets/batch/diagrams/streams_multiple_failures.mermaid
new file mode 100644
index 0000000000..817630b63e
--- /dev/null
+++ b/examples/snippets/batch/diagrams/streams_multiple_failures.mermaid
@@ -0,0 +1,17 @@
+sequenceDiagram
+ autonumber
+ participant Streams
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>Streams: Poll latest records
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3-5 batch items
+ Lambda function-->Lambda function: Continue processing batch items (6-10)
+ Lambda function->>Lambda service: Report batch items as failure (3-5)
+ deactivate Lambda function
+ activate Streams
+ Lambda service->>Streams: Checkpoints to lowest sequence number
+ Lambda service->>Streams: Poll records starting from updated checkpoint
+ deactivate Streams
\ No newline at end of file
diff --git a/examples/snippets/batch/samples/parser_SQS.json b/examples/snippets/batch/samples/parser_SQS.json
new file mode 100644
index 0000000000..025f5da87d
--- /dev/null
+++ b/examples/snippets/batch/samples/parser_SQS.json
@@ -0,0 +1,36 @@
+{
+ "Records": [
+ {
+ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"name\": \"test-1\",\"age\": 20}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"name\": \"test-2\",\"age\": 30}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "AIDAIENQZJOLO23YVJ4VO",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ }
+ ]
+}
diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts
index 1a8ec918b3..0e1f2296df 100644
--- a/packages/batch/src/BatchProcessor.ts
+++ b/packages/batch/src/BatchProcessor.ts
@@ -79,6 +79,40 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* });
* ```
*
+ * **Process batch with schema validation**
+ *
+ * @example
+ * ```typescript
+ * import {
+ * BatchProcessor,
+ * EventType,
+ * processPartialResponse,
+ * } from '@aws-lambda-powertools/batch';
+ * import { parser } from '@aws-lambda-powertools/batch/parser';
+ * import { SqsRecordSchema } from '@aws-lambda-powertools/parser/schemas';
+ * import type { SQSHandler } from 'aws-lambda';
+ * import { z } from 'zod';
+ *
+ * const myItemSchema = z.object({ name: z.string(), age: z.number() });
+ *
+ * const processor = new BatchProcessor(EventType.SQS, {
+ * parser,
+ * schema: SqsRecordSchema.extend({
+ * body: myItemSchema,
+ * }),
+ * });
+ *
+ * const recordHandler = async (record) => {
+ * // record is now fully typed and validated
+ * console.log(record.body.name, record.body.age);
+ * };
+ *
+ * export const handler: SQSHandler = async (event, context) =>
+ * processPartialResponse(event, recordHandler, processor, {
+ * context,
+ * });
+ * ```
+ *
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
*/
class BatchProcessor extends BasePartialBatchProcessor {
diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts
index 6cf3370b00..fb0a86a751 100644
--- a/packages/batch/src/index.ts
+++ b/packages/batch/src/index.ts
@@ -14,3 +14,4 @@ export { processPartialResponse } from './processPartialResponse.js';
export { processPartialResponseSync } from './processPartialResponseSync.js';
export { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
export { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';
+export type { ParsedRecord } from './types.js';
diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts
index 4ea03d920d..a160c53e7d 100644
--- a/packages/batch/src/types.ts
+++ b/packages/batch/src/types.ts
@@ -4,10 +4,13 @@ import type {
DynamoDBRecord,
KinesisStreamRecord,
SQSRecord,
+ StreamRecord,
} from 'aws-lambda';
import type { ZodType } from 'zod';
import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js';
import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
+import type { BatchProcessor } from './BatchProcessor.js';
+import type { parser } from './parser.js';
import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';
@@ -93,29 +96,149 @@ type PartialItemFailures = { itemIdentifier: string };
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };
/**
- * Type representing the parser configuration options passed to the BasePartialBatchProcessor class.
+ * Configuration options for {@link BatchProcessor | `BatchProcessor`} parser integration.
*
- * @property schema - The full event schema to be used for parsing
- * @property innerSchema - The inner payload schema
- * @property transformer - The transformer to be used for parsing the payload
- * @property logger - The logger to be used for logging debug and warning messages.
+ * When enabling parser integration, you must provide {@link parser} along with either `schema` or `innerSchema`.
+ * Import `parser` from `@aws-lambda-powertools/batch/parser`.
+ *
+ * When using `schema`, provide a complete schema for the record that matches the event type. For example,
+ * when using `EventType.SQS` you need a schema for an SQS Record. If using Zod, you can extend one of the
+ * built-in schemas to easily get the appropriate schema.
+ *
+ * When using `innerSchema`, we use the `EventType` to determine the base schema, so you only provide
+ * a schema for the payload. You can optionally pass a `transformer` to transform the payload before parsing.
+ * For example, use the `json` transformer for JSON stringified objects.
+ *
+ * You can optionally pass a `logger` for debug and warning messages.
+ *
+ * @note `innerSchema` supports only Zod schemas, while `schema` supports any Standard Schema-compatible parsing library.
+ *
+ * @property parser - Required when using schema parsing (import from `@aws-lambda-powertools/batch/parser`)
+ * @property schema - Complete event schema (mutually exclusive with innerSchema)
+ * @property innerSchema - Payload-only schema (mutually exclusive with schema)
+ * @property transformer - Payload transformer (only available with innerSchema)
+ * @property logger - Optional logger for debug/warning messages
*/
type BasePartialBatchProcessorParserConfig =
| {
- parser?: CallableFunction;
- schema?: StandardSchemaV1;
+ /**
+ * Required when using schema parsing - import from `@aws-lambda-powertools/batch/parser`
+ */
+ parser: typeof parser;
+ /**
+ * Complete event schema using Standard Schema specification, mutually exclusive with `innerSchema`
+ */
+ schema: StandardSchemaV1;
innerSchema?: never;
transformer?: never;
+ /**
+ * Optional logger for debug and warning messages
+ */
logger?: Pick;
}
| {
- parser?: CallableFunction;
+ /**
+ * Required when using schema parsing - import from `@aws-lambda-powertools/batch/parser`
+ */
+ parser: typeof parser;
schema?: never;
- innerSchema?: ZodType;
+ /**
+ * Payload-only Zod schema, mutually exclusive with `schema`
+ */
+ innerSchema: ZodType;
+ /**
+ * Payload transformer, only available with `innerSchema`
+ */
transformer?: 'json' | 'base64' | 'unmarshall';
+ /**
+ * Optional logger for debug and warning messages
+ */
+ logger?: Pick;
+ }
+ | {
+ parser?: never;
+ schema?: never;
+ innerSchema?: never;
+ transformer?: never;
+ /**
+ * Optional logger for debug and warning messages
+ */
logger?: Pick;
};
+/**
+ * Utility type for creating typed record handlers with custom payload schemas.
+ *
+ * @template TRecord - The base record type, one of: `SqsRecord`, `KinesisStreamRecord`, and `DynamoDBRecord`
+ * @template TPayload - The inferred type from your payload schema
+ * @template TOldPayload - Optional separate type for DynamoDB `OldImage`, when not specified both `NewImage` and `OldImage` will have the same type
+ *
+ * **SQS Records**
+ *
+ * @example
+ * ```typescript
+ * const mySchema = z.object({ name: z.string(), age: z.number() });
+ * type MySqsRecord = ParsedRecord>;
+ *
+ * const sqsHandler = async (record: MySqsRecord) => {
+ * // record.body is now typed as { name: string, age: number }
+ * };
+ * ```
+ *
+ * **Kinesis Records**
+ *
+ * @example
+ * ```typescript
+ * const kinesisSchema = z.object({ userId: z.string(), action: z.string() });
+ * type MyKinesisRecord = ParsedRecord>;
+ *
+ * const kinesisHandler = async (record: MyKinesisRecord) => {
+ * // record.kinesis.data is now typed as { userId: string, action: string }
+ * };
+ * ```
+ *
+ * **DynamoDB Records - Single Schema**
+ *
+ * @example
+ * ```typescript
+ * const dynamoSchema = z.object({ id: z.string(), status: z.string() });
+ * type MyDynamoRecord = ParsedRecord>;
+ *
+ * const dynamoHandler = async (record: MyDynamoRecord) => {
+ * // record.dynamodb.NewImage and record.dynamodb.OldImage are both typed as { id: string, status: string }
+ * };
+ * ```
+ *
+ * @example
+ * **DynamoDB Records - Separate Schemas**
+ * ```typescript
+ * const newSchema = z.object({ id: z.string(), status: z.string(), updatedAt: z.string() });
+ * const oldSchema = z.object({ id: z.string(), status: z.string() });
+ * type MyDynamoRecordSeparate = ParsedRecord, z.infer>;
+ *
+ * const dynamoHandlerSeparate = async (record: MyDynamoRecordSeparate) => {
+ * // record.dynamodb.NewImage is typed as { id: string, status: string, updatedAt: string }
+ * // record.dynamodb.OldImage is typed as { id: string, status: string }
+ * };
+ * ```
+ */
+type ParsedRecord = TRecord extends {
+ body: string;
+}
+ ? Omit & { body: TPayload }
+ : TRecord extends { kinesis: { data: string } }
+ ? Omit & {
+ kinesis: Omit & { data: TPayload };
+ }
+ : TRecord extends { dynamodb?: StreamRecord }
+ ? Omit & {
+ dynamodb: Omit & {
+ NewImage: TPayload;
+ OldImage: TOldPayload;
+ };
+ }
+ : TRecord;
+
export type {
BatchProcessingOptions,
BaseRecord,
@@ -125,4 +248,5 @@ export type {
PartialItemFailures,
PartialItemFailureResponse,
BasePartialBatchProcessorParserConfig,
+ ParsedRecord,
};
diff --git a/packages/batch/typedoc.json b/packages/batch/typedoc.json
index 2cc575bc18..5873dab1cd 100644
--- a/packages/batch/typedoc.json
+++ b/packages/batch/typedoc.json
@@ -1,5 +1,5 @@
{
"extends": ["../../typedoc.base.json"],
- "entryPoints": ["./src/index.ts", "./src/types.ts"],
+ "entryPoints": ["./src/index.ts", "./src/parser.ts", "./src/types.ts"],
"readme": "README.md"
}