From c28849a50ad108c57a2297dec46d352913f6462e Mon Sep 17 00:00:00 2001 From: arr Date: Fri, 6 Dec 2024 13:35:23 -0700 Subject: [PATCH 1/7] allow kinesis targets in pipes by pulling in the partition_key required parameter --- main.tf | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/main.tf b/main.tf index 6f1ea6a..21a8509 100644 --- a/main.tf +++ b/main.tf @@ -717,6 +717,13 @@ resource "aws_pipes_pipe" "this" { message_group_id = try(sqs_queue_parameters.value.message_group_id, null) } } + dynamic "kinesis_stream_parameters" { + for_each = try([target_parameters.value.kinesis_stream_parameters], []) + + content { + partition_key = try(kinesis_stream_parameters.value.partition_key, null) + } + } dynamic "cloudwatch_logs_parameters" { for_each = try([target_parameters.value.cloudwatch_logs_parameters], []) From 972e6a1d1f7b175bac344293dcad38fe41a1cdf4 Mon Sep 17 00:00:00 2001 From: arr Date: Fri, 6 Dec 2024 13:41:41 -0700 Subject: [PATCH 2/7] add an example --- examples/with-pipes/main.tf | 39 +++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/examples/with-pipes/main.tf b/examples/with-pipes/main.tf index 598fb38..92879f0 100644 --- a/examples/with-pipes/main.tf +++ b/examples/with-pipes/main.tf @@ -187,6 +187,39 @@ module "eventbridge" { } } + # With Kinesis Stream source and Kinesis Straem target + kinesis_source_kinesis_target = { + source = aws_kinesis_stream.source.arn + target = aws_kinesis_stream.target.arn + + source_parameters = { + kinesis_stream_parameters = { + batch_size = 7 + maximum_batching_window_in_seconds = 90 + maximum_record_age_in_seconds = 100 + maximum_retry_attempts = 4 + on_partial_batch_item_failure = "AUTOMATIC_BISECT" + parallelization_factor = 5 + starting_position = "TRIM_HORIZON" + starting_position_timestamp = null + dead_letter_config = { + arn = aws_sqs_queue.dlq.arn + } + } + } + + target_parameters = { + kinesis_stream_parameters = { + # Must be a json path and start with $. + partition_key = "$.id" + } + } + + tags = { + Pipe = "kinesis_source_kinesis_target" + } + } + # With SQS Queue source and EventBridge target sqs_source_eventbridge_target = { source = aws_sqs_queue.source.arn @@ -429,6 +462,12 @@ resource "aws_kinesis_stream" "source" { shard_count = 1 } +resource "aws_kinesis_stream" "target" { + name = "${random_pet.this.id}-target" + + shard_count = 1 +} + ################################## # CloudWatch Log Group and Stream ################################## From 09443683f119e174772fd809eea556777634d0ce Mon Sep 17 00:00:00 2001 From: arr Date: Mon, 9 Dec 2024 17:03:38 -0700 Subject: [PATCH 3/7] allow use of enrichment lambdas and the creation of the iam policy for invoking the function --- iam_pipes.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iam_pipes.tf b/iam_pipes.tf index 8054ae6..ca1afb4 100644 --- a/iam_pipes.tf +++ b/iam_pipes.tf @@ -29,7 +29,7 @@ locals { # Enrichment / Target lambda = { - values = [v.target, try(aws_cloudwatch_event_api_destination.this[v.enrichment].arn, null)], + values = [v.target, try(v.enrichment, null), try(aws_cloudwatch_event_api_destination.this[v.enrichment].arn, null)], matching_services = ["lambda"] }, step_functions = { From 883cd6ac65cdc6d803784b5499288aa919f21e47 Mon Sep 17 00:00:00 2001 From: arr Date: Tue, 10 Dec 2024 11:48:08 -0700 Subject: [PATCH 4/7] add a kinesis trust relationship... even though it didn't fix the problem --- iam_pipes.tf | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/iam_pipes.tf b/iam_pipes.tf index ca1afb4..3e279cc 100644 --- a/iam_pipes.tf +++ b/iam_pipes.tf @@ -338,6 +338,16 @@ data "aws_iam_policy_document" "assume_role_pipe" { values = ["arn:aws:pipes:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:pipe/${each.value.Name}"] } } + + statement { + effect = "Allow" + actions = ["sts:AssumeRole"] + + principals { + type = "Service" + identifiers = ["kinesis.${data.aws_partition.current.dns_suffix}"] + } + } } resource "aws_iam_role" "eventbridge_pipe" { From 9e47a4b9fa5cde18c6919b177477543a04d023b5 Mon Sep 17 00:00:00 2001 From: arr Date: Tue, 10 Dec 2024 12:09:59 -0700 Subject: [PATCH 5/7] wow... AWS needs PutRecord and PutRecords because it sends as a batch of records... --- iam_pipes.tf | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iam_pipes.tf b/iam_pipes.tf index 3e279cc..56f90ff 100644 --- a/iam_pipes.tf +++ b/iam_pipes.tf @@ -182,7 +182,8 @@ locals { kinesis_target = { actions = [ - "kinesis:PutRecord" + "kinesis:PutRecord", + "kinesis:PutRecords" ] } From 3a21594d00b9ff06220c38a8bd29194b5fb899c2 Mon Sep 17 00:00:00 2001 From: arr Date: Tue, 10 Dec 2024 12:25:29 -0700 Subject: [PATCH 6/7] remove the extra service assume role. not necessary. --- iam_pipes.tf | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/iam_pipes.tf b/iam_pipes.tf index 56f90ff..7e23560 100644 --- a/iam_pipes.tf +++ b/iam_pipes.tf @@ -339,16 +339,6 @@ data "aws_iam_policy_document" "assume_role_pipe" { values = ["arn:aws:pipes:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:pipe/${each.value.Name}"] } } - - statement { - effect = "Allow" - actions = ["sts:AssumeRole"] - - principals { - type = "Service" - identifiers = ["kinesis.${data.aws_partition.current.dns_suffix}"] - } - } } resource "aws_iam_role" "eventbridge_pipe" { From fc6d3fef2c680e7a8823406873f6f017097a1783 Mon Sep 17 00:00:00 2001 From: Ryan Despain <166053905+soisyourface@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:42:18 -0700 Subject: [PATCH 7/7] typo --- examples/with-pipes/main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/with-pipes/main.tf b/examples/with-pipes/main.tf index 92879f0..4c28857 100644 --- a/examples/with-pipes/main.tf +++ b/examples/with-pipes/main.tf @@ -187,7 +187,7 @@ module "eventbridge" { } } - # With Kinesis Stream source and Kinesis Straem target + # With Kinesis Stream source and Kinesis Stream target kinesis_source_kinesis_target = { source = aws_kinesis_stream.source.arn target = aws_kinesis_stream.target.arn