Skip to content

Commit 6b9e190

Browse files
fuguiKzhjgraca
andauthored
docs: extract batch processing snippets (#1104)
Co-authored-by: Henrique Graca <[email protected]>
1 parent 68280d3 commit 6b9e190

14 files changed

+1010
-801
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// This file is referenced by docs/utilities/batch-processing.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing;
5+
6+
// --8<-- [start:sqs_record_handler_error_handling]
7+
public class CustomSqsRecordHandler : ISqsRecordHandler // (1)!
8+
{
9+
public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
10+
{
11+
/*
12+
* Your business logic.
13+
* If an exception is thrown, the item will be marked as a partial batch item failure.
14+
*/
15+
16+
var product = JsonSerializer.Deserialize<Product>(record.Body);
17+
18+
if (product.Id == 4) // (2)!
19+
{
20+
throw new ArgumentException("Error on id 4");
21+
}
22+
23+
return await Task.FromResult(RecordHandlerResult.None); // (3)!
24+
}
25+
26+
}
27+
// --8<-- [end:sqs_record_handler_error_handling]
28+
29+
// --8<-- [start:error_handling_policy_attribute]
30+
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler),
31+
ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
32+
public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
33+
{
34+
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
35+
}
36+
// --8<-- [end:error_handling_policy_attribute]
37+
38+
// --8<-- [start:typed_custom_error_handling]
39+
[BatchProcessor(
40+
TypedRecordHandler = typeof(TypedSqsHandler),
41+
ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
42+
public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent)
43+
{
44+
return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
45+
}
46+
// --8<-- [end:typed_custom_error_handling]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// This file is referenced by docs/utilities/batch-processing.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing;
5+
6+
// --8<-- [start:json_serializer_context_configuration]
7+
[JsonSerializable(typeof(Product))]
8+
[JsonSerializable(typeof(Order))]
9+
[JsonSerializable(typeof(Customer))]
10+
[JsonSerializable(typeof(List<Product>))]
11+
[JsonSourceGenerationOptions(
12+
PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
13+
WriteIndented = false,
14+
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)]
15+
public partial class MyJsonSerializerContext : JsonSerializerContext
16+
{
17+
}
18+
// --8<-- [end:json_serializer_context_configuration]
19+
20+
// --8<-- [start:json_serializer_context_using_with_attribute]
21+
[BatchProcessor(
22+
TypedRecordHandler = typeof(TypedSqsRecordHandler),
23+
JsonSerializerContext = typeof(MyJsonSerializerContext))]
24+
public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent)
25+
{
26+
return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
27+
}
28+
// --8<-- [end:json_serializer_context_using_with_attribute]
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// This file is referenced by docs/utilities/batch-processing.md
2+
// via pymdownx.snippets (mkdocs).
3+
4+
namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing;
5+
6+
// --8<-- [start:kinesis_typed_handler_decorator]
7+
public class Order
8+
{
9+
public string? OrderId { get; set; }
10+
public DateTime OrderDate { get; set; }
11+
public List<Product> Items { get; set; } = new();
12+
public decimal TotalAmount { get; set; }
13+
}
14+
15+
internal class TypedKinesisRecordHandler : ITypedRecordHandler<Order> // (1)!
16+
{
17+
public async Task<RecordHandlerResult> HandleAsync(Order order, CancellationToken cancellationToken)
18+
{
19+
Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items");
20+
21+
if (order.TotalAmount <= 0) // (2)!
22+
{
23+
throw new ArgumentException("Invalid order total");
24+
}
25+
26+
return await Task.FromResult(RecordHandlerResult.None); // (3)!
27+
}
28+
}
29+
30+
[BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
31+
public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
32+
{
33+
return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
34+
}
35+
// --8<-- [end:kinesis_typed_handler_decorator]
36+
37+
// --8<-- [start:kinesis_handler_decorator_traditional]
38+
internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)!
39+
{
40+
public async Task<RecordHandlerResult> HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken)
41+
{
42+
var product = JsonSerializer.Deserialize<Product>(record.Kinesis.Data);
43+
44+
if (product.Id == 4) // (2)!
45+
{
46+
throw new ArgumentException("Error on id 4");
47+
}
48+
49+
return await Task.FromResult(RecordHandlerResult.None); // (3)!
50+
}
51+
}
52+
53+
54+
[BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]
55+
public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _)
56+
{
57+
return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
58+
}
59+
// --8<-- [end:kinesis_handler_decorator_traditional]
60+
61+
// --8<-- [start:dynamodb_typed_handler_decorator]
62+
public class Customer
63+
{
64+
public string? CustomerId { get; set; }
65+
public string? Name { get; set; }
66+
public string? Email { get; set; }
67+
public DateTime CreatedAt { get; set; }
68+
}
69+
70+
internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> // (1)!
71+
{
72+
public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
73+
{
74+
Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}");
75+
76+
if (string.IsNullOrEmpty(customer.Email)) // (2)!
77+
{
78+
throw new ArgumentException("Customer email is required");
79+
}
80+
81+
return await Task.FromResult(RecordHandlerResult.None); // (3)!
82+
}
83+
}
84+
85+
[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
86+
public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
87+
{
88+
return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
89+
}
90+
// --8<-- [end:dynamodb_typed_handler_decorator]
91+
92+
// --8<-- [start:dynamodb_handler_decorator_traditional]
93+
internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)!
94+
{
95+
public async Task<RecordHandlerResult> HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)
96+
{
97+
var product = JsonSerializer.Deserialize<Product>(record.Dynamodb.NewImage["Product"].S);
98+
99+
if (product.Id == 4) // (2)!
100+
{
101+
throw new ArgumentException("Error on id 4");
102+
}
103+
104+
return await Task.FromResult(RecordHandlerResult.None); // (3)!
105+
}
106+
}
107+
108+
109+
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
110+
public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
111+
{
112+
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
113+
}
114+
// --8<-- [end:dynamodb_handler_decorator_traditional]
115+
116+
// --8<-- [start:using_utility_outside_decorator]
117+
public async Task<BatchItemFailuresResponse> HandlerUsingUtility(DynamoDBEvent dynamoDbEvent)
118+
{
119+
var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler<DynamoDBEvent.DynamodbStreamRecord>.From(record =>
120+
{
121+
var product = JsonSerializer.Deserialize<JsonElement>(record.Dynamodb.NewImage["Product"].S);
122+
123+
if (product.GetProperty("Id").GetInt16() == 4)
124+
{
125+
throw new ArgumentException("Error on 4");
126+
}
127+
}));
128+
return result.BatchItemFailuresResponse;
129+
}
130+
// --8<-- [end:using_utility_outside_decorator]
131+
132+
// --8<-- [start:using_utility_from_ioc_getrequiredservice]
133+
public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent)
134+
{
135+
var batchProcessor = Services.Provider.GetRequiredService<IDynamoDbStreamBatchProcessor>();
136+
var recordHandler = Services.Provider.GetRequiredService<IDynamoDbStreamRecordHandler>();
137+
var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
138+
return result.BatchItemFailuresResponse;
139+
}
140+
// --8<-- [end:using_utility_from_ioc_getrequiredservice]
141+
142+
// --8<-- [start:using_utility_from_ioc_injected_parameters]
143+
public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent,
144+
IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler)
145+
{
146+
var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
147+
return result.BatchItemFailuresResponse;
148+
}
149+
// --8<-- [end:using_utility_from_ioc_injected_parameters]
150+
151+
// --8<-- [start:example_implementation_of_iserviceprovider]
152+
internal class Services
153+
{
154+
private static readonly Lazy<IServiceProvider> LazyInstance = new(Build);
155+
156+
private static ServiceCollection _services;
157+
public static IServiceProvider Provider => LazyInstance.Value;
158+
159+
public static IServiceProvider Init()
160+
{
161+
return LazyInstance.Value;
162+
}
163+
164+
private static IServiceProvider Build()
165+
{
166+
_services = new ServiceCollection();
167+
_services.AddScoped<IDynamoDbStreamBatchProcessor, CustomDynamoDbStreamBatchProcessor>();
168+
_services.AddScoped<IDynamoDbStreamRecordHandler, CustomDynamoDbStreamRecordHandler>();
169+
return _services.BuildServiceProvider();
170+
}
171+
}
172+
// --8<-- [end:example_implementation_of_iserviceprovider]
173+
174+
// --8<-- [start:processing_messages_in_parallel]
175+
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )]
176+
public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
177+
{
178+
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
179+
}
180+
// --8<-- [end:processing_messages_in_parallel]

0 commit comments

Comments
 (0)