diff --git a/examples/with-pipes/main.tf b/examples/with-pipes/main.tf index 598fb38..4c28857 100644 --- a/examples/with-pipes/main.tf +++ b/examples/with-pipes/main.tf @@ -187,6 +187,39 @@ module "eventbridge" { } } + # With Kinesis Stream source and Kinesis Stream target + kinesis_source_kinesis_target = { + source = aws_kinesis_stream.source.arn + target = aws_kinesis_stream.target.arn + + source_parameters = { + kinesis_stream_parameters = { + batch_size = 7 + maximum_batching_window_in_seconds = 90 + maximum_record_age_in_seconds = 100 + maximum_retry_attempts = 4 + on_partial_batch_item_failure = "AUTOMATIC_BISECT" + parallelization_factor = 5 + starting_position = "TRIM_HORIZON" + starting_position_timestamp = null + dead_letter_config = { + arn = aws_sqs_queue.dlq.arn + } + } + } + + target_parameters = { + kinesis_stream_parameters = { + # Must be a json path and start with $. + partition_key = "$.id" + } + } + + tags = { + Pipe = "kinesis_source_kinesis_target" + } + } + # With SQS Queue source and EventBridge target sqs_source_eventbridge_target = { source = aws_sqs_queue.source.arn @@ -429,6 +462,12 @@ resource "aws_kinesis_stream" "source" { shard_count = 1 } +resource "aws_kinesis_stream" "target" { + name = "${random_pet.this.id}-target" + + shard_count = 1 +} + ################################## # CloudWatch Log Group and Stream ################################## diff --git a/iam_pipes.tf b/iam_pipes.tf index 024982d..c44d347 100644 --- a/iam_pipes.tf +++ b/iam_pipes.tf @@ -29,7 +29,7 @@ locals { # Enrichment / Target lambda = { - values = [v.target, try(aws_cloudwatch_event_api_destination.this[v.enrichment].arn, null)], + values = [v.target, try(v.enrichment, null), try(aws_cloudwatch_event_api_destination.this[v.enrichment].arn, null)], matching_services = ["lambda"] }, step_functions = { @@ -191,7 +191,8 @@ locals { kinesis_target = { actions = [ - "kinesis:PutRecord" + "kinesis:PutRecord", + "kinesis:PutRecords" ] } diff --git a/main.tf b/main.tf index 32a8132..d63f3fe 100644 --- a/main.tf +++ b/main.tf @@ -738,6 +738,13 @@ resource "aws_pipes_pipe" "this" { message_group_id = try(sqs_queue_parameters.value.message_group_id, null) } } + dynamic "kinesis_stream_parameters" { + for_each = try([target_parameters.value.kinesis_stream_parameters], []) + + content { + partition_key = try(kinesis_stream_parameters.value.partition_key, null) + } + } dynamic "cloudwatch_logs_parameters" { for_each = try([target_parameters.value.cloudwatch_logs_parameters], [])