@@ -6,27 +6,7 @@ description: Utility
6
6
The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
7
7
8
8
``` mermaid
9
- stateDiagram-v2
10
- direction LR
11
- BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
12
- LambdaInit: Lambda invocation
13
- BatchProcessor: Batch Processor
14
- RecordHandler: Record Handler function
15
- YourLogic: Your logic to process each batch item
16
- LambdaResponse: Lambda response
17
-
18
- BatchSource --> LambdaInit
19
-
20
- LambdaInit --> BatchProcessor
21
- BatchProcessor --> RecordHandler
22
-
23
- state BatchProcessor {
24
- [*] --> RecordHandler: Your function
25
- RecordHandler --> YourLogic
26
- }
27
-
28
- RecordHandler --> BatchProcessor: Collect results
29
- BatchProcessor --> LambdaResponse: Report items that failed processing
9
+ --8<-- "examples/snippets/batch/diagrams/batch_processing.mermaid"
30
10
```
31
11
32
12
## Key features
@@ -42,11 +22,7 @@ When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event sour
42
22
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: ** a)** your Lambda function returns a successful response, ** b)** record reaches maximum retry attempts, or ** c)** when records expire.
43
23
44
24
``` mermaid
45
- journey
46
- section Conditions
47
- Successful response: 5: Success
48
- Maximum retries: 3: Failure
49
- Records expired: 1: Failure
25
+ --8<-- "examples/snippets/batch/diagrams/journey.mermaid"
50
26
```
51
27
52
28
This behavior changes when you enable the [ ReportBatchItemFailures feature] ( https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting ) in your Lambda function event source configuration:
@@ -263,19 +239,7 @@ Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-fr
263
239
264
240
<center >
265
241
``` mermaid
266
- sequenceDiagram
267
- autonumber
268
- participant SQS queue
269
- participant Lambda service
270
- participant Lambda function
271
- Lambda service->>SQS queue: Poll
272
- Lambda service->>Lambda function: Invoke (batch event)
273
- Lambda function->>Lambda service: Report some failed messages
274
- activate SQS queue
275
- Lambda service->>SQS queue: Delete successful messages
276
- SQS queue-->>SQS queue: Failed messages return
277
- Note over SQS queue,Lambda service: Process repeat
278
- deactivate SQS queue
242
+ --8<-- "examples/snippets/batch/diagrams/sqs.mermaid"
279
243
```
280
244
<i >SQS mechanism with Batch Item Failures</i >
281
245
</center >
@@ -288,22 +252,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)
288
252
289
253
<center >
290
254
``` mermaid
291
- sequenceDiagram
292
- autonumber
293
- participant SQS queue
294
- participant Lambda service
295
- participant Lambda function
296
- Lambda service->>SQS queue: Poll
297
- Lambda service->>Lambda function: Invoke (batch event)
298
- activate Lambda function
299
- Lambda function-->Lambda function: Process 2 out of 10 batch items
300
- Lambda function--xLambda function: Fail on 3rd batch item
301
- Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
302
- deactivate Lambda function
303
- activate SQS queue
304
- Lambda service->>SQS queue: Delete successful messages (1-2)
305
- SQS queue-->>SQS queue: Failed messages return (3-10)
306
- deactivate SQS queue
255
+ --8<-- "examples/snippets/batch/diagrams/sqs_fifo.mermaid"
307
256
```
308
257
<i >SQS FIFO mechanism with Batch Item Failures</i >
309
258
</center >
@@ -312,23 +261,7 @@ Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues)
312
261
313
262
<center >
314
263
``` mermaid
315
- sequenceDiagram
316
- autonumber
317
- participant SQS queue
318
- participant Lambda service
319
- participant Lambda function
320
- Lambda service->>SQS queue: Poll
321
- Lambda service->>Lambda function: Invoke (batch event)
322
- activate Lambda function
323
- Lambda function-->Lambda function: Process 2 out of 10 batch items
324
- Lambda function--xLambda function: Fail on 3rd batch item
325
- Lambda function-->Lambda function: Process messages from another MessageGroupID
326
- Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure
327
- deactivate Lambda function
328
- activate SQS queue
329
- Lambda service->>SQS queue: Delete successful messages processed
330
- SQS queue-->>SQS queue: Failed messages return
331
- deactivate SQS queue
264
+ --8<-- "examples/snippets/batch/diagrams/sqs_fifo_skip_group.mermaid"
332
265
```
333
266
<i >SQS FIFO mechanism with Batch Item Failures</i >
334
267
</center >
@@ -343,23 +276,7 @@ For brevity, we will use `Streams` to refer to either services. For theory on st
343
276
344
277
<center >
345
278
``` mermaid
346
- sequenceDiagram
347
- autonumber
348
- participant Streams
349
- participant Lambda service
350
- participant Lambda function
351
- Lambda service->>Streams: Poll latest records
352
- Lambda service->>Lambda function: Invoke (batch event)
353
- activate Lambda function
354
- Lambda function-->Lambda function: Process 2 out of 10 batch items
355
- Lambda function--xLambda function: Fail on 3rd batch item
356
- Lambda function-->Lambda function: Continue processing batch items (4-10)
357
- Lambda function->>Lambda service: Report batch item as failure (3)
358
- deactivate Lambda function
359
- activate Streams
360
- Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
361
- Lambda service->>Streams: Poll records starting from updated checkpoint
362
- deactivate Streams
279
+ --8<-- "examples/snippets/batch/diagrams/streams.mermaid"
363
280
```
364
281
<i >Kinesis and DynamoDB streams mechanism with single batch item failure</i >
365
282
</center >
@@ -370,23 +287,7 @@ The behavior changes slightly when there are multiple item failures. Stream chec
370
287
371
288
<center >
372
289
``` mermaid
373
- sequenceDiagram
374
- autonumber
375
- participant Streams
376
- participant Lambda service
377
- participant Lambda function
378
- Lambda service->>Streams: Poll latest records
379
- Lambda service->>Lambda function: Invoke (batch event)
380
- activate Lambda function
381
- Lambda function-->Lambda function: Process 2 out of 10 batch items
382
- Lambda function--xLambda function: Fail on 3-5 batch items
383
- Lambda function-->Lambda function: Continue processing batch items (6-10)
384
- Lambda function->>Lambda service: Report batch items as failure (3-5)
385
- deactivate Lambda function
386
- activate Streams
387
- Lambda service->>Streams: Checkpoints to lowest sequence number
388
- Lambda service->>Streams: Poll records starting from updated checkpoint
389
- deactivate Streams
290
+ --8<-- "examples/snippets/batch/diagrams/streams_multiple_failures.mermaid"
390
291
```
391
292
<i >Kinesis and DynamoDB streams mechanism with multiple batch item failures</i >
392
293
</center >
@@ -395,89 +296,81 @@ sequenceDiagram
395
296
396
297
### Parser integration
397
298
398
- You can define your own schema to parse your batch record payload via the parser configuration options when initializing the ** ` BatchProcessor ` ** .
299
+ Thanks to the [ Parser utility] ( ./parser.md ) integration, you can pass a [ Standard Schema] ( https://standardschema.dev ) {target="_ blank"}-compatible schema when instantiating the ` BatchProcessor ` and we will use it to validate each item in the batch before passing it to your record handler.
300
+
301
+ Since this is an opt-in feature, you will need to import the ` parser ` function from ` @aws-lambda-powertools/batch/parser ` , this allows us to keep the parsing logic separate from the main processing logic and avoid increasing the bundle size.
399
302
400
- #### Using inner payload schema
303
+ #### Using item schema only
401
304
402
- You can just define the schema of your internal payload and we will handle the parsing for you depending on the type of event that you are using .
305
+ When you only want to customize the schema of the item's payload you can pass an ` innerSchema ` objecta and we will use it to extend the base schema based on the ` EventType ` passed to the ` BatchProcessor ` .
403
306
404
- You can specify any transformers needed to transform your payload before being used for parsing the inner payload.
307
+ When doing this, you can also specify a ` transformer ` to tell us how to transform the payload before validation .
405
308
406
309
=== "SQS - using inner payload"
407
310
408
- ```typescript hl_lines="9-12 14-17 "
409
- --8<-- "examples/snippets/batch/parser-integration/sqsWithoutTransformer .ts"
311
+ ```typescript hl_lines="6 11-14 19 26 "
312
+ --8<-- "examples/snippets/batch/advanced_parser_item_sqs .ts"
410
313
```
411
314
412
315
=== "SQS - Sample Event"
413
316
414
317
```json hl_lines="6 22"
415
- --8<-- "examples/snippets/batch/samples/sampleSQSEventForParser .json"
318
+ --8<-- "examples/snippets/batch/samples/parser_SQS .json"
416
319
```
417
320
418
- === "Kinesis Data Streams - using inner payload"
321
+ The example below shows how to use the ` innerSchema ` with ` EventType.SQS ` , but you can use it with other event types as well:
419
322
420
- ```typescript hl_lines="9-12 14-17 21"
421
- --8<-- "examples/snippets/batch/parser-integration/kinesisWithoutTransformer.ts"
422
- ```
423
-
424
- === "Kinesis - Sample Event"
323
+ | Event Type | Base Schema | Transformer |
324
+ | ------------ | --------------------------- | ------------------------------ |
325
+ | SQS | ` SqsRecordSchema ` | ` json ` , ` base64 ` (optional) |
326
+ | Kinesis | ` KinesisDataStreamRecord ` | ` base64 ` |
327
+ | DynamoDB | ` DynamoDBStreamRecord ` | ` unmarshall ` |
425
328
426
- ```json hl_lines="6 22"
427
- --8<-- "examples/snippets/batch/samples/sampleKinesisEventForParser.json"
428
- ```
429
-
430
- === "DynamoDB Streams - using inner payload"
431
-
432
- ```typescript hl_lines="9-12 14-17 21"
433
- --8<-- "examples/snippets/batch/parser-integration/dynamodbWithoutTransformer.ts"
434
- ```
329
+ #### Extending built-in schemas
435
330
436
- === "DynamoDB - Sample Event"
331
+ When you want more control over the schema, you can extend a [ built-in schema ] ( ./parser.md#built-in-schemas ) for SQS, Kinesis Data Streams, or DynamoDB Streams with your own custom schema for the payload and we'll parse each item before passing it to your record handler. If the payload does not match the schema, the item will be marked as failed.
437
332
438
- ```json hl_lines="13-18 39-44"
439
- --8<-- "examples/snippets/batch/samples/sampleDynamoDBStreamsEventForParser.json"
440
- ```
333
+ === "SQS"
441
334
442
- #### Extending the schema yourself
335
+ === "index.ts"
443
336
444
- If you want to have full control, you can pass an extended schema with any transformers you need.
337
+ ```typescript hl_lines="6 14-16 20-23 29"
338
+ --8<-- "examples/snippets/batch/advanced_parser_sqs.ts"
339
+ ```
445
340
446
- === "SQS - using extended schema "
341
+ === "Sample Event "
447
342
448
- ```typescript hl_lines="11-16 18 21 "
449
- --8<-- "examples/snippets/batch/parser-integration/sqsExtended.ts "
450
- ```
343
+ ```json hl_lines="6 22 "
344
+ --8<-- "examples/snippets/batch/samples/parser_SQS.json "
345
+ ```
451
346
452
- === "SQS - Sample Event "
347
+ === "DynamoDB Streams "
453
348
454
- ```json hl_lines="6 22"
455
- --8<-- "examples/snippets/batch/samples/sampleSQSEventForParser.json"
456
- ```
349
+ === "index.ts"
457
350
458
- === "Kinesis Data Streams - using extended schema"
351
+ ```typescript hl_lines="6 17-19 24-28 35"
352
+ --8<-- "examples/snippets/batch/advanced_parser_DynamoDB.ts"
353
+ ```
459
354
460
- ```typescript hl_lines="11-18 20 23"
461
- --8<-- "examples/snippets/batch/parser-integration/kinesisExtended.ts"
462
- ```
355
+ === "Sample Event"
463
356
464
- === "Kinesis - Sample Event"
357
+ ```json hl_lines="13-18 39-44"
358
+ --8<-- "examples/snippets/batch/samples/parser_DynamoDB.json"
359
+ ```
465
360
466
- ```json hl_lines="6 22"
467
- --8<-- "examples/snippets/batch/samples/sampleKinesisEventForParser.json"
468
- ```
361
+ === "Kinesis Data Streams"
469
362
470
- === "DynamoDB Streams - using extended schema "
363
+ === "index.ts "
471
364
472
- ```typescript hl_lines="11-20 22 25 "
473
- --8<-- "examples/snippets/batch/parser-integration/dynamodbExtended .ts"
474
- ```
365
+ ```typescript hl_lines="6 17- 22 27-31 38 "
366
+ --8<-- "examples/snippets/batch/advanced_parser_Kinesis .ts"
367
+ ```
475
368
476
- === "DynamoDB - Sample Event"
369
+ === "Sample Event"
477
370
478
- ```json hl_lines="13-18 39-44 "
479
- --8<-- "examples/snippets/batch/samples/sampleDynamoDBStreamsEventForParser .json"
480
- ```
371
+ ```json hl_lines="8 24 "
372
+ --8<-- "examples/snippets/batch/samples/parser_Kinesis .json"
373
+ ```
481
374
482
375
### Accessing processed messages
483
376
0 commit comments