Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/snippets/batch/AdvancedErrorHandling.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This file is referenced by docs/utilities/batch-processing.md
// via pymdownx.snippets (mkdocs).

namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing;

// --8<-- [start:sqs_record_handler_error_handling]
public class CustomSqsRecordHandler : ISqsRecordHandler // (1)!
{
public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
{
/*
* Your business logic.
* If an exception is thrown, the item will be marked as a partial batch item failure.
*/

var product = JsonSerializer.Deserialize<Product>(record.Body);

if (product.Id == 4) // (2)!
{
throw new ArgumentException("Error on id 4");
}

return await Task.FromResult(RecordHandlerResult.None); // (3)!
}

}
// --8<-- [end:sqs_record_handler_error_handling]

// --8<-- [start:error_handling_policy_attribute]
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler),
ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
{
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
// --8<-- [end:error_handling_policy_attribute]

// --8<-- [start:typed_custom_error_handling]
[BatchProcessor(
TypedRecordHandler = typeof(TypedSqsHandler),
ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent)
{
return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
}
// --8<-- [end:typed_custom_error_handling]
28 changes: 28 additions & 0 deletions docs/snippets/batch/CustomSerialization.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// This file is referenced by docs/utilities/batch-processing.md
// via pymdownx.snippets (mkdocs).

namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing;

// --8<-- [start:json_serializer_context_configuration]
[JsonSerializable(typeof(Product))]
[JsonSerializable(typeof(Order))]
[JsonSerializable(typeof(Customer))]
[JsonSerializable(typeof(List<Product>))]
[JsonSourceGenerationOptions(
PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)]
public partial class MyJsonSerializerContext : JsonSerializerContext
{
}
// --8<-- [end:json_serializer_context_configuration]

// --8<-- [start:json_serializer_context_using_with_attribute]
[BatchProcessor(
TypedRecordHandler = typeof(TypedSqsRecordHandler),
JsonSerializerContext = typeof(MyJsonSerializerContext))]
public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent)
{
return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
}
// --8<-- [end:json_serializer_context_using_with_attribute]
180 changes: 180 additions & 0 deletions docs/snippets/batch/GettingStartedBasic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// This file is referenced by docs/utilities/batch-processing.md
// via pymdownx.snippets (mkdocs).

namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing;

// --8<-- [start:kinesis_typed_handler_decorator]
public class Order
{
public string? OrderId { get; set; }
public DateTime OrderDate { get; set; }
public List<Product> Items { get; set; } = new();
public decimal TotalAmount { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<Order> // (1)!
{
public async Task<RecordHandlerResult> HandleAsync(Order order, CancellationToken cancellationToken)
{
Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items");

if (order.TotalAmount <= 0) // (2)!
{
throw new ArgumentException("Invalid order total");
}

return await Task.FromResult(RecordHandlerResult.None); // (3)!
}
}

[BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
{
return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
}
// --8<-- [end:kinesis_typed_handler_decorator]

// --8<-- [start:kinesis_handler_decorator_traditional]
internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)!
{
public async Task<RecordHandlerResult> HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken)
{
var product = JsonSerializer.Deserialize<Product>(record.Kinesis.Data);

if (product.Id == 4) // (2)!
{
throw new ArgumentException("Error on id 4");
}

return await Task.FromResult(RecordHandlerResult.None); // (3)!
}
}


[BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]
public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _)
{
return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
}
// --8<-- [end:kinesis_handler_decorator_traditional]

// --8<-- [start:dynamodb_typed_handler_decorator]
public class Customer
{
public string? CustomerId { get; set; }
public string? Name { get; set; }
public string? Email { get; set; }
public DateTime CreatedAt { get; set; }
}

internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> // (1)!
{
public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)
{
Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}");

if (string.IsNullOrEmpty(customer.Email)) // (2)!
{
throw new ArgumentException("Customer email is required");
}

return await Task.FromResult(RecordHandlerResult.None); // (3)!
}
}

[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
{
return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
}
// --8<-- [end:dynamodb_typed_handler_decorator]

// --8<-- [start:dynamodb_handler_decorator_traditional]
internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)!
{
public async Task<RecordHandlerResult> HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)
{
var product = JsonSerializer.Deserialize<Product>(record.Dynamodb.NewImage["Product"].S);

if (product.Id == 4) // (2)!
{
throw new ArgumentException("Error on id 4");
}

return await Task.FromResult(RecordHandlerResult.None); // (3)!
}
}


[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
{
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
}
// --8<-- [end:dynamodb_handler_decorator_traditional]

// --8<-- [start:using_utility_outside_decorator]
public async Task<BatchItemFailuresResponse> HandlerUsingUtility(DynamoDBEvent dynamoDbEvent)
{
var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler<DynamoDBEvent.DynamodbStreamRecord>.From(record =>
{
var product = JsonSerializer.Deserialize<JsonElement>(record.Dynamodb.NewImage["Product"].S);

if (product.GetProperty("Id").GetInt16() == 4)
{
throw new ArgumentException("Error on 4");
}
}));
return result.BatchItemFailuresResponse;
}
// --8<-- [end:using_utility_outside_decorator]

// --8<-- [start:using_utility_from_ioc_getrequiredservice]
public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent)
{
var batchProcessor = Services.Provider.GetRequiredService<IDynamoDbStreamBatchProcessor>();
var recordHandler = Services.Provider.GetRequiredService<IDynamoDbStreamRecordHandler>();
var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
return result.BatchItemFailuresResponse;
}
// --8<-- [end:using_utility_from_ioc_getrequiredservice]

// --8<-- [start:using_utility_from_ioc_injected_parameters]
public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent,
IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler)
{
var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
return result.BatchItemFailuresResponse;
}
// --8<-- [end:using_utility_from_ioc_injected_parameters]

// --8<-- [start:example_implementation_of_iserviceprovider]
internal class Services
{
private static readonly Lazy<IServiceProvider> LazyInstance = new(Build);

private static ServiceCollection _services;
public static IServiceProvider Provider => LazyInstance.Value;

public static IServiceProvider Init()
{
return LazyInstance.Value;
}

private static IServiceProvider Build()
{
_services = new ServiceCollection();
_services.AddScoped<IDynamoDbStreamBatchProcessor, CustomDynamoDbStreamBatchProcessor>();
_services.AddScoped<IDynamoDbStreamRecordHandler, CustomDynamoDbStreamRecordHandler>();
return _services.BuildServiceProvider();
}
}
// --8<-- [end:example_implementation_of_iserviceprovider]

// --8<-- [start:processing_messages_in_parallel]
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )]
public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
{
return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
}
// --8<-- [end:processing_messages_in_parallel]
Loading