Skip to content

Commit 4fed581

Browse files
sdangolsvozzadreamorosi
authored
docs(batch): added documentation for the parser integration feature in batch processor (#4435)
Co-authored-by: Stefano Vozza <[email protected]> Co-authored-by: Andrea Amorosi <[email protected]>
1 parent dc7c654 commit 4fed581

17 files changed

+593
-118
lines changed

docs/features/batch.md

Lines changed: 111 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,7 @@ description: Utility
66
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
77

88
```mermaid
9-
stateDiagram-v2
10-
direction LR
11-
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
12-
LambdaInit: Lambda invocation
13-
BatchProcessor: Batch Processor
14-
RecordHandler: Record Handler function
15-
YourLogic: Your logic to process each batch item
16-
LambdaResponse: Lambda response
17-
18-
BatchSource --> LambdaInit
19-
20-
LambdaInit --> BatchProcessor
21-
BatchProcessor --> RecordHandler
22-
23-
state BatchProcessor {
24-
[*] --> RecordHandler: Your function
25-
RecordHandler --> YourLogic
26-
}
27-
28-
RecordHandler --> BatchProcessor: Collect results
29-
BatchProcessor --> LambdaResponse: Report items that failed processing
9+
--8<-- "examples/snippets/batch/diagrams/batch_processing.mermaid"
3010
```
3111

3212
## Key features
@@ -42,17 +22,13 @@ When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event sour
4222
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire.
4323

4424
```mermaid
45-
journey
46-
section Conditions
47-
Successful response: 5: Success
48-
Maximum retries: 3: Failure
49-
Records expired: 1: Failure
25+
--8<-- "examples/snippets/batch/diagrams/journey.mermaid"
5026
```
5127

5228
This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
5329

5430
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
55-
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
31+
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as the checkpoint.
5632

5733
<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
5834
<!-- markdownlint-disable MD013 -->
@@ -237,7 +213,7 @@ By default, we catch any exception raised by your record handler function. This
237213

238214
1. Any exception works here. See [extending `BatchProcessor` section, if you want to override this behavior.](#extending-batchprocessor)
239215

240-
2. Exceptions raised in `recordHandler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).
216+
2. Errors raised in `recordHandler` will propagate to `processPartialResponse`. <br/><br/> We catch them and include each failed batch item identifier in the response object (see `Sample response` tab).
241217

242218
=== "Sample response"
243219

@@ -263,19 +239,7 @@ Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-fr
263239

264240
<center>
265241
```mermaid
266-
sequenceDiagram
267-
autonumber
268-
participant SQS queue
269-
participant Lambda service
270-
participant Lambda function
271-
Lambda service->>SQS queue: Poll
272-
Lambda service->>Lambda function: Invoke (batch event)
273-
Lambda function->>Lambda service: Report some failed messages
274-
activate SQS queue
275-
Lambda service->>SQS queue: Delete successful messages
276-
SQS queue-->>SQS queue: Failed messages return
277-
Note over SQS queue,Lambda service: Process repeat
278-
deactivate SQS queue
242+
--8<-- "examples/snippets/batch/diagrams/sqs.mermaid"
279243
```
280244
<i>SQS mechanism with Batch Item Failures</i>
281245
</center>
@@ -288,22 +252,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)
288252

289253
<center>
290254
```mermaid
291-
sequenceDiagram
292-
autonumber
293-
participant SQS queue
294-
participant Lambda service
295-
participant Lambda function
296-
Lambda service->>SQS queue: Poll
297-
Lambda service->>Lambda function: Invoke (batch event)
298-
activate Lambda function
299-
Lambda function-->Lambda function: Process 2 out of 10 batch items
300-
Lambda function--xLambda function: Fail on 3rd batch item
301-
Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
302-
deactivate Lambda function
303-
activate SQS queue
304-
Lambda service->>SQS queue: Delete successful messages (1-2)
305-
SQS queue-->>SQS queue: Failed messages return (3-10)
306-
deactivate SQS queue
255+
--8<-- "examples/snippets/batch/diagrams/sqs_fifo.mermaid"
307256
```
308257
<i>SQS FIFO mechanism with Batch Item Failures</i>
309258
</center>
@@ -312,23 +261,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)
312261

313262
<center>
314263
```mermaid
315-
sequenceDiagram
316-
autonumber
317-
participant SQS queue
318-
participant Lambda service
319-
participant Lambda function
320-
Lambda service->>SQS queue: Poll
321-
Lambda service->>Lambda function: Invoke (batch event)
322-
activate Lambda function
323-
Lambda function-->Lambda function: Process 2 out of 10 batch items
324-
Lambda function--xLambda function: Fail on 3rd batch item
325-
Lambda function-->Lambda function: Process messages from another MessageGroupID
326-
Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
327-
deactivate Lambda function
328-
activate SQS queue
329-
Lambda service->>SQS queue: Delete successful messages processed
330-
SQS queue-->>SQS queue: Failed messages return
331-
deactivate SQS queue
264+
--8<-- "examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid"
332265
```
333266
<i>SQS FIFO mechanism with Batch Item Failures</i>
334267
</center>
@@ -343,23 +276,7 @@ For brevity, we will use `Streams` to refer to either services. For theory on st
343276

344277
<center>
345278
```mermaid
346-
sequenceDiagram
347-
autonumber
348-
participant Streams
349-
participant Lambda service
350-
participant Lambda function
351-
Lambda service->>Streams: Poll latest records
352-
Lambda service->>Lambda function: Invoke (batch event)
353-
activate Lambda function
354-
Lambda function-->Lambda function: Process 2 out of 10 batch items
355-
Lambda function--xLambda function: Fail on 3rd batch item
356-
Lambda function-->Lambda function: Continue processing batch items (4-10)
357-
Lambda function->>Lambda service: Report batch item as failure (3)
358-
deactivate Lambda function
359-
activate Streams
360-
Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
361-
Lambda service->>Streams: Poll records starting from updated checkpoint
362-
deactivate Streams
279+
--8<-- "examples/snippets/batch/diagrams/streams.mermaid"
363280
```
364281
<i>Kinesis and DynamoDB streams mechanism with single batch item failure</i>
365282
</center>
@@ -370,29 +287,115 @@ The behavior changes slightly when there are multiple item failures. Stream chec
370287

371288
<center>
372289
```mermaid
373-
sequenceDiagram
374-
autonumber
375-
participant Streams
376-
participant Lambda service
377-
participant Lambda function
378-
Lambda service->>Streams: Poll latest records
379-
Lambda service->>Lambda function: Invoke (batch event)
380-
activate Lambda function
381-
Lambda function-->Lambda function: Process 2 out of 10 batch items
382-
Lambda function--xLambda function: Fail on 3-5 batch items
383-
Lambda function-->Lambda function: Continue processing batch items (6-10)
384-
Lambda function->>Lambda service: Report batch items as failure (3-5)
385-
deactivate Lambda function
386-
activate Streams
387-
Lambda service->>Streams: Checkpoints to lowest sequence number
388-
Lambda service->>Streams: Poll records starting from updated checkpoint
389-
deactivate Streams
290+
--8<-- "examples/snippets/batch/diagrams/streams_multiple_failures.mermaid"
390291
```
391292
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
392293
</center>
393294

394295
## Advanced
395296

297+
### Parser integration
298+
299+
The Batch Processing utility integrates with the [Parser utility](./parser.md) to automatically validate and parse each batch record before processing. This ensures your record handler receives properly typed and validated data, eliminating the need for manual parsing and validation.
300+
301+
To enable parser integration, import the `parser` function from `@aws-lambda-powertools/batch/parser` and pass it along with a schema when instantiating the `BatchProcessor`. This requires you to also [install the Parser utility](./parser.md#getting-started).
302+
303+
```typescript
304+
import { parser } from '@aws-lambda-powertools/batch/parser';
305+
```
306+
307+
You have two approaches for schema validation:
308+
309+
1. **Item schema only** (`innerSchema`) - Focus on your payload schema, we handle extending the base event structure
310+
2. **Full event schema** (`schema`) - Validate the entire event record structure with complete control
311+
312+
#### Benefits of parser integration
313+
314+
Parser integration eliminates runtime errors from malformed data and provides compile-time type safety, making your code more reliable and easier to maintain. Invalid records are automatically marked as failed and won't reach your handler, reducing defensive coding.
315+
316+
=== "Without parser integration"
317+
318+
```typescript
319+
const recordHandler = async (record: SQSRecord) => {
320+
// Manual parsing with no type safety
321+
const payload = JSON.parse(record.body); // any type
322+
console.log(payload.name); // No autocomplete, runtime errors possible
323+
};
324+
```
325+
326+
=== "With parser integration"
327+
328+
```typescript
329+
const mySchema = z.object({ name: z.string(), age: z.number() });
330+
331+
const recordHandler = async (record: ParsedRecord<SQSRecord, z.infer<typeof mySchema>>) => {
332+
// Automatic validation and strong typing
333+
console.log(record.body.name); // Full type safety and autocomplete
334+
};
335+
```
336+
337+
#### Using item schema only
338+
339+
When you want to focus on validating your payload without dealing with the full event structure, use `innerSchema`. We automatically extend the base event schema for you, reducing boilerplate code while still validating the entire record.
340+
341+
Available transformers by event type:
342+
343+
| Event Type | Base Schema | Available Transformers | When to use transformer |
344+
|------------|---------------------------|------------------------------|----------------------------------------------------------------|
345+
| SQS | `SqsRecordSchema` | `json`, `base64` | `json` for stringified JSON, `base64` for encoded data |
346+
| Kinesis | `KinesisDataStreamRecord` | `base64` | Required for Kinesis data (always base64 encoded) |
347+
| DynamoDB | `DynamoDBStreamRecord` | `unmarshall` | Required to convert DynamoDB attribute values to plain objects |
348+
349+
=== "SQS with JSON payload"
350+
351+
```typescript hl_lines="6 12-15 19-21 27-28"
352+
--8<-- "examples/snippets/batch/advanced_parser_item_sqs.ts"
353+
```
354+
355+
=== "Sample Event"
356+
357+
```json hl_lines="6 22"
358+
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
359+
```
360+
361+
#### Using full event schema
362+
363+
For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.
364+
365+
=== "SQS"
366+
367+
```typescript hl_lines="6 15-17 22-24 30-31"
368+
--8<-- "examples/snippets/batch/advanced_parser_sqs.ts"
369+
```
370+
371+
=== "Kinesis Data Streams"
372+
373+
```typescript hl_lines="6 18-23 28-32 39 41-44"
374+
--8<-- "examples/snippets/batch/advanced_parser_Kinesis.ts"
375+
```
376+
377+
=== "DynamoDB Streams"
378+
379+
```typescript hl_lines="6 17-19 24-28 35 37"
380+
--8<-- "examples/snippets/batch/advanced_parser_DynamoDB.ts"
381+
```
382+
383+
#### Typed record handlers with ParsedRecord
384+
385+
To get full type safety in your record handlers, use the `ParsedRecord` utility type:
386+
387+
```typescript
388+
import type { ParsedRecord } from '@aws-lambda-powertools/batch';
389+
390+
// For most cases - single schema
391+
type MyRecord = ParsedRecord<SQSRecord, z.infer<typeof mySchema>>;
392+
393+
// For DynamoDB - separate schemas for NewImage and OldImage
394+
type MyDynamoRecord = ParsedRecord<DynamoDBRecord, z.infer<typeof newSchema>, z.infer<typeof oldSchema>>;
395+
```
396+
397+
This eliminates verbose type annotations and provides clean autocompletion for your parsed data.
398+
396399
### Accessing processed messages
397400

398401
Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import {
2+
BatchProcessor,
3+
EventType,
4+
processPartialResponse,
5+
} from '@aws-lambda-powertools/batch';
6+
import { parser } from '@aws-lambda-powertools/batch/parser';
7+
import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
8+
import { Logger } from '@aws-lambda-powertools/logger';
9+
import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb';
10+
import {
11+
DynamoDBStreamChangeRecordBase,
12+
DynamoDBStreamRecord,
13+
} from '@aws-lambda-powertools/parser/schemas/dynamodb';
14+
import type { DynamoDBRecord, DynamoDBStreamHandler } from 'aws-lambda';
15+
import { z } from 'zod';
16+
17+
const myItemSchema = DynamoDBMarshalled(
18+
z.object({ name: z.string(), age: z.number() })
19+
);
20+
21+
const logger = new Logger();
22+
const processor = new BatchProcessor(EventType.SQS, {
23+
parser,
24+
schema: DynamoDBStreamRecord.extend({
25+
dynamodb: DynamoDBStreamChangeRecordBase.extend({
26+
NewImage: myItemSchema,
27+
}),
28+
}),
29+
logger,
30+
});
31+
32+
const recordHandler = async ({
33+
eventID,
34+
dynamodb: {
35+
NewImage: { name, age },
36+
},
37+
}: ParsedRecord<DynamoDBRecord, z.infer<typeof myItemSchema>>) => {
38+
logger.info(`Processing record ${eventID}`, { name, age });
39+
};
40+
41+
export const handler: DynamoDBStreamHandler = async (event, context) =>
42+
processPartialResponse(event, recordHandler, processor, {
43+
context,
44+
});
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import {
2+
BatchProcessor,
3+
EventType,
4+
processPartialResponse,
5+
} from '@aws-lambda-powertools/batch';
6+
import { parser } from '@aws-lambda-powertools/batch/parser';
7+
import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
8+
import { Logger } from '@aws-lambda-powertools/logger';
9+
import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers';
10+
import {
11+
KinesisDataStreamRecord,
12+
KinesisDataStreamRecordPayload,
13+
} from '@aws-lambda-powertools/parser/schemas/kinesis';
14+
import type { KinesisDataStreamRecordEvent } from '@aws-lambda-powertools/parser/types';
15+
import type { KinesisStreamHandler } from 'aws-lambda';
16+
import { z } from 'zod';
17+
18+
const myItemSchema = Base64Encoded(
19+
z.object({
20+
name: z.string(),
21+
age: z.number(),
22+
})
23+
);
24+
25+
const logger = new Logger();
26+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
27+
parser,
28+
schema: KinesisDataStreamRecord.extend({
29+
kinesis: KinesisDataStreamRecordPayload.extend({
30+
data: myItemSchema,
31+
}),
32+
}),
33+
logger,
34+
});
35+
36+
const recordHandler = async ({
37+
kinesis: {
38+
sequenceNumber,
39+
data: { name, age },
40+
},
41+
}: ParsedRecord<
42+
KinesisDataStreamRecordEvent,
43+
z.infer<typeof myItemSchema>
44+
>) => {
45+
logger.info(`Processing record: ${sequenceNumber}`, {
46+
name,
47+
age,
48+
});
49+
};
50+
51+
export const handler: KinesisStreamHandler = async (event, context) =>
52+
processPartialResponse(event, recordHandler, processor, {
53+
context,
54+
});

0 commit comments

Comments
 (0)