You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: docs/features/batch.md
+35-36Lines changed: 35 additions & 36 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -31,9 +31,9 @@ stateDiagram-v2
31
31
32
32
## Key features
33
33
34
-
- Reports batch item failures to reduce number of retries for a record upon errors
35
-
- Simple interface to process each batch record
36
-
- Build your own batch processor by extending primitives
34
+
* Reports batch item failures to reduce number of retries for a record upon errors
35
+
* Simple interface to process each batch record
36
+
* Build your own batch processor by extending primitives
37
37
38
38
## Background
39
39
@@ -51,14 +51,13 @@ journey
51
51
52
52
This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
53
53
54
-
-[**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
55
-
-[**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
54
+
*[**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
55
+
*[**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
56
56
57
57
<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
58
58
<!-- markdownlint-disable MD013 -->
59
-
60
59
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
61
-
We recommend implementing processing logic in an [idempotent manner](./idempotency.md){target="\_blank"} whenever possible.
60
+
We recommend implementing processing logic in an [idempotent manner](./idempotency.md){target="_blank"} whenever possible.
62
61
63
62
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
64
63
@@ -109,7 +108,7 @@ Processing batches from SQS works in three stages:
109
108
3. Use **`processPartialResponse`** to kick off processing
110
109
111
110
!!! note
112
-
By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues).
111
+
By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues).
113
112
114
113
=== "index.ts"
115
114
@@ -123,7 +122,7 @@ By default, the batch processor will process messages in parallel, which does no
123
122
124
123
1. **Step 1**. Creates a partial failure batch processor for SQS queues. See [partial failure mechanics for details](#partial-failure-mechanics)
125
124
2. **Step 2**. Defines a function to receive one record at a time from the batch
126
-
3. **Step 3**. Kicks off processing
125
+
3. **Step 3**. Kicks off processing
127
126
128
127
=== "Sample response"
129
128
@@ -141,7 +140,7 @@ By default, the batch processor will process messages in parallel, which does no
141
140
142
141
#### FIFO queues
143
142
144
-
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html){target="\_blank"}, a batch may include messages from different group IDs.
143
+
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html){target="_blank"}, a batch may include messages from different group IDs.
145
144
146
145
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
147
146
@@ -198,7 +197,7 @@ Processing batches from DynamoDB Streams works in three stages:
198
197
3. Use **`processPartialResponse`** to kick off processing
199
198
200
199
???+ info
201
-
This code example optionally uses Logger for completion.
200
+
This code example optionally uses Logger for completion.
202
201
203
202
=== "index.ts"
204
203
@@ -224,7 +223,7 @@ This code example optionally uses Logger for completion.
224
223
225
224
### Error handling
226
225
227
-
By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
226
+
By default, we catch any exception raised by your record handler function. This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
228
227
229
228
=== "Sample error handling with custom exception"
230
229
@@ -250,15 +249,15 @@ By default, we catch any exception raised by your record handler function. This
250
249
251
250
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
252
251
253
-
-**All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`
254
-
-**Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing
255
-
-**All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled.
252
+
***All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`
253
+
***Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing
254
+
***All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled.
256
255
257
256
The following sequence diagrams explain how each Batch processor behaves under different scenarios.
258
257
259
258
#### SQS Standard
260
259
261
-
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="\_blank"}.
260
+
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
262
261
263
262
Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues.
264
263
@@ -283,7 +282,7 @@ sequenceDiagram
283
282
284
283
#### SQS FIFO
285
284
286
-
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="\_blank"}.
285
+
> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
287
286
288
287
Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skipGroupOnError` flag.
289
288
@@ -336,11 +335,11 @@ sequenceDiagram
336
335
337
336
#### Kinesis and DynamoDB Streams
338
337
339
-
> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="\_blank"}.
338
+
> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
340
339
341
340
Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).
342
341
343
-
For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="\_blank"}
342
+
For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}
344
343
345
344
<center>
346
345
```mermaid
@@ -522,11 +521,11 @@ If you want to have full control and optimize the flow, you can pass an extended
522
521
523
522
Use the `BatchProcessor` directly in your function to access a list of all returned values from your `recordHandler` function.
524
523
525
-
-**When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record
526
-
-**When failed**. We will include a tuple with `fail`, exception as a string, and the batch record
524
+
***When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record
525
+
***When failed**. We will include a tuple with `fail`, exception as a string, and the batch record
1. The processor requires the records array. This is typically handled by `processPartialResponse`.
@@ -536,22 +535,22 @@ Use the `BatchProcessor` directly in your function to access a list of all retur
536
535
537
536
Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out.
538
537
539
-
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="\_blank"} into your `recordHandler` as optional second argument if you pass it to the `processPartialResponse` function.
538
+
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you pass it to the `processPartialResponse` function.
By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.
548
547
549
-
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="\_blank"}, potentially impacting performance.
548
+
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.
550
549
551
550
For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling.
@@ -578,7 +577,7 @@ By default, the `BatchProcessor` processes records in parallel using `Promise.al
578
577
When processing records from SQS FIFO queues, we recommend using the [`SqsFifoPartialProcessor`](#fifo-queues) class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID.
<i>Visual representation to bring your own processor</i>
608
607
</center>
609
608
610
-
-**`prepare()`** – called once as part of the processor initialization
611
-
-**`clean()`** – teardown logic called once after `processRecord` completes
612
-
-**`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
613
-
-**`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)
609
+
***`prepare()`** – called once as part of the processor initialization
610
+
***`clean()`** – teardown logic called once after `processRecord` completes
611
+
***`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
612
+
***`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)
614
613
615
614
You can then pass this class to `processPartialResponse` to process the records in your Lambda handler function.
You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without calling `tracer.setSegment(subsegment)`. This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing.
0 commit comments