|
| 1 | +--- |
| 2 | +title: "Manage EventBridge Pipes with the ACK Pipes Controller" |
| 3 | +description: "Forward messages between two SQS queues with a pipe." |
| 4 | +lead: "Create and manage EventBridge Pipes directly from Kubernetes" |
| 5 | +draft: false |
| 6 | +menu: |
| 7 | + docs: |
| 8 | + parent: "tutorials" |
| 9 | +weight: 45 |
| 10 | +toc: true |
| 11 | +--- |
| 12 | + |
| 13 | +Amazon EventBridge Pipes connects sources to targets. It reduces the need for specialized knowledge and integration code |
| 14 | +when developing event driven architectures, fostering consistency across your company’s applications. To set up a pipe, |
| 15 | +you choose the source, add optional filtering, define optional enrichment, and choose the target for the event data. |
| 16 | + |
| 17 | +In this tutorial you will learn how to create and manage an [EventBridge |
| 18 | +Pipe](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html) to forward messages between two SQS queues |
| 19 | +from an Amazon Elastic Kubernetes (EKS) deployment. |
| 20 | + |
| 21 | +## Setup |
| 22 | + |
| 23 | +Although it is not necessary to use Amazon Elastic Kubernetes Service (Amazon EKS) with ACK, this guide assumes that you |
| 24 | +have access to an Amazon EKS cluster. If this is your first time creating an Amazon EKS cluster, see [Amazon EKS |
| 25 | +Setup](https://docs.aws.amazon.com/deep-learning-containers/latest/devguide/deep-learning-containers-eks-setup.html). |
| 26 | +For automated cluster creation using `eksctl`, see [Getting started with Amazon EKS - |
| 27 | +`eksctl`](https://docs.aws.amazon.com/eks/latest/userguide/getting-started-eksctl.html) and create your cluster with |
| 28 | +Amazon EC2 Linux managed nodes. |
| 29 | + |
| 30 | +### Prerequisites |
| 31 | + |
| 32 | +This guide assumes that you have: |
| 33 | + |
| 34 | +- Created an EKS cluster with Kubernetes version 1.24 or higher. |
| 35 | +- AWS IAM permissions to create roles and attach policies to roles. |
| 36 | +- AWS IAM permissions to manages queues and send messages to a queue. |
| 37 | +- Installed the following tools on the client machine used to access your Kubernetes cluster: |
| 38 | + - [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv1.html) - A command line tool for interacting |
| 39 | + with AWS services. |
| 40 | + - [kubectl](https://docs.aws.amazon.com/eks/latest/userguide/install-kubectl.html) - A command line tool for working |
| 41 | + with Kubernetes clusters. |
| 42 | + - [eksctl](https://docs.aws.amazon.com/eks/latest/userguide/eksctl.html) - A command line tool for working with EKS |
| 43 | + clusters. |
| 44 | + - [Helm 3.8+](https://helm.sh/docs/intro/install/) - A tool for installing and managing Kubernetes applications. |
| 45 | + - [jq](https://stedolan.github.io/jq/download/) to parse AWS CLI JSON output |
| 46 | + |
| 47 | +### Install the ACK service controller for Pipes |
| 48 | + |
| 49 | +Log into the Helm registry that stores the ACK charts: |
| 50 | +```bash |
| 51 | +aws ecr-public get-login-password --region us-east-1 | helm registry login --username AWS --password-stdin public.ecr.aws |
| 52 | +``` |
| 53 | + |
| 54 | +Deploy the ACK service controller for Amazon Pipes using the [pipes-chart Helm chart](https://gallery.ecr.aws/aws-controllers-k8s/pipes-chart). Resources should be created in the `us-east-1` region: |
| 55 | + |
| 56 | +```bash |
| 57 | +helm install --create-namespace -n ack-system oci://public.ecr.aws/aws-controllers-k8s/pipes-chart --version=v0.0.3 --generate-name --set=aws.region=us-east-1 |
| 58 | +``` |
| 59 | + |
| 60 | +For a full list of available values to the Helm chart, please [review the values.yaml file](https://github.com/aws-controllers-k8s/pipes-controller/blob/main/helm/values.yaml). |
| 61 | + |
| 62 | +### Configure IAM permissions |
| 63 | + |
| 64 | +Once the service controller is deployed, you will need to [configure the IAM permissions][irsa-permissions] for the |
| 65 | +controller to query the Pipes API. For full details, please review the AWS Controllers for Kubernetes documentation for |
| 66 | +[how to configure the IAM permissions][irsa-permissions]. If you follow the examples in the documentation, use the value |
| 67 | +of `pipes` for `SERVICE`. |
| 68 | + |
| 69 | +## Create an EventBridge Pipe |
| 70 | + |
| 71 | +### Create the source and target SQS queues |
| 72 | + |
| 73 | +To keep the scope of this tutorial simple, the SQS queues and IAM permissions will be created with the AWS CLI. |
| 74 | +Alternatively, the [ACK SQS |
| 75 | +Controller](https://aws-controllers-k8s.github.io/community/docs/community/services/#amazon-sqs) and [ACK IAM |
| 76 | +Controller](https://aws-controllers-k8s.github.io/community/docs/community/services/#amazon-iam) can be used to manage |
| 77 | +these resources with Kubernetes. |
| 78 | + |
| 79 | +Execute the following command to define the environment variables used throughout the example. |
| 80 | + |
| 81 | +{{% hint type="info" title="Make sure environment variables are set" %}} |
| 82 | +If you followed the steps in the IAM permissions section above, the required environment variables `${AWS_REGION}` and |
| 83 | +`${AWS_ACCOUNT_ID}` are already set. Otherwise please set these variables before executing the following steps. The value for `${AWS_REGION}` must also match the `--set=aws.region` value used in the `helm install` command above. |
| 84 | +{{% /hint %}} |
| 85 | + |
| 86 | +```bash |
| 87 | +export PIPE_NAME=pipes-sqs-to-sqs |
| 88 | +export PIPE_NAMESPACE=pipes-example |
| 89 | +export SOURCE_QUEUE=pipes-sqs-source |
| 90 | +export TARGET_QUEUE=pipes-sqs-target |
| 91 | +export PIPE_ROLE=pipes-sqs-to-sqs-role |
| 92 | +export PIPE_POLICY=pipes-sqs-to-sqs-policy |
| 93 | +``` |
| 94 | + |
| 95 | +Create the source and target queues. |
| 96 | + |
| 97 | +```bash |
| 98 | +aws sqs create-queue --queue-name ${SOURCE_QUEUE} |
| 99 | +aws sqs create-queue --queue-name ${TARGET_QUEUE} |
| 100 | +``` |
| 101 | + |
| 102 | +The output of above commands looks like |
| 103 | + |
| 104 | +```bash |
| 105 | +{ |
| 106 | + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/1234567890/pipes-sqs-source" |
| 107 | +} |
| 108 | +{ |
| 109 | + "QueueUrl": "https://sqs.us-east-1.amazonaws.com/1234567890/pipes-sqs-target" |
| 110 | +} |
| 111 | +``` |
| 112 | + |
| 113 | +### Create the Pipes IAM Role |
| 114 | + |
| 115 | +Create an IAM role for the pipe to consume messages from the source queue and send messages to the target queue. |
| 116 | + |
| 117 | +```bash |
| 118 | +cat <<EOF > trust.json |
| 119 | +{ |
| 120 | + "Version": "2012-10-17", |
| 121 | + "Statement": [ |
| 122 | + { |
| 123 | + "Effect": "Allow", |
| 124 | + "Principal": { |
| 125 | + "Service": "pipes.amazonaws.com" |
| 126 | + }, |
| 127 | + "Action": "sts:AssumeRole", |
| 128 | + "Condition": { |
| 129 | + "StringEquals": { |
| 130 | + "aws:SourceAccount": "${AWS_ACCOUNT_ID}" |
| 131 | + } |
| 132 | + } |
| 133 | + } |
| 134 | + ] |
| 135 | +} |
| 136 | +EOF |
| 137 | + |
| 138 | +aws iam create-role --role-name ${PIPE_ROLE} --assume-role-policy-document file://trust.json |
| 139 | +``` |
| 140 | + |
| 141 | +The output of above commands looks like |
| 142 | + |
| 143 | +```bash |
| 144 | +{ |
| 145 | + "Role": { |
| 146 | + "Path": "/", |
| 147 | + "RoleName": "pipes-sqs-to-sqs-role", |
| 148 | + "RoleId": "ABCDU3F4JDBEUCMGT3XBH", |
| 149 | + "Arn": "arn:aws:iam::1234567890:role/pipes-sqs-to-sqs-role", |
| 150 | + "CreateDate": "2023-03-21T13:11:59+00:00", |
| 151 | + "AssumeRolePolicyDocument": { |
| 152 | + "Version": "2012-10-17", |
| 153 | + "Statement": [ |
| 154 | + { |
| 155 | + "Effect": "Allow", |
| 156 | + "Principal": { |
| 157 | + "Service": "pipes.amazonaws.com" |
| 158 | + }, |
| 159 | + "Action": "sts:AssumeRole", |
| 160 | + "Condition": { |
| 161 | + "StringEquals": { |
| 162 | + "aws:SourceAccount": "1234567890" |
| 163 | + } |
| 164 | + } |
| 165 | + } |
| 166 | + ] |
| 167 | + } |
| 168 | + } |
| 169 | +} |
| 170 | +``` |
| 171 | + |
| 172 | +Attach a policy to the role to give the pipe permissions to read and send messages. |
| 173 | + |
| 174 | +```bash |
| 175 | +cat <<EOF > policy.json |
| 176 | +{ |
| 177 | + "Version": "2012-10-17", |
| 178 | + "Statement": [ |
| 179 | + { |
| 180 | + "Effect": "Allow", |
| 181 | + "Action": [ |
| 182 | + "sqs:ReceiveMessage", |
| 183 | + "sqs:DeleteMessage", |
| 184 | + "sqs:GetQueueAttributes" |
| 185 | + ], |
| 186 | + "Resource": [ |
| 187 | + "arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${SOURCE_QUEUE}" |
| 188 | + ] |
| 189 | + }, |
| 190 | + { |
| 191 | + "Effect": "Allow", |
| 192 | + "Action": [ |
| 193 | + "sqs:SendMessage" |
| 194 | + ], |
| 195 | + "Resource": [ |
| 196 | + "arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${TARGET_QUEUE}" |
| 197 | + ] |
| 198 | + } |
| 199 | + ] |
| 200 | +} |
| 201 | +EOF |
| 202 | + |
| 203 | +aws iam put-role-policy --role-name ${PIPE_ROLE} --policy-name ${PIPE_POLICY} --policy-document file://policy.json |
| 204 | +``` |
| 205 | + |
| 206 | +If the command executes successfully, no output is generated. |
| 207 | + |
| 208 | +### Create the Pipe |
| 209 | + |
| 210 | +Execute the following command to retrieve the ARNs for the resources created above needed for the Kubernetes manifest. |
| 211 | + |
| 212 | +```bash |
| 213 | +export SOURCE_QUEUE_ARN=$(aws --output json sqs get-queue-attributes --queue-url "https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SOURCE_QUEUE}" --attribute-names QueueArn | jq -r '.Attributes.QueueArn') |
| 214 | +export TARGET_QUEUE_ARN=$(aws --output json sqs get-queue-attributes --queue-url "https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${TARGET_QUEUE}" --attribute-names QueueArn | jq -r '.Attributes.QueueArn') |
| 215 | +export PIPE_ROLE_ARN=$(aws --output json iam get-role --role-name ${PIPE_ROLE} | jq -r '.Role.Arn') |
| 216 | +``` |
| 217 | + |
| 218 | +Execute the following command to create a Kubernetes manifest for a pipe consuming messages from the source queue and |
| 219 | +sending messages matching the filter criteria to the target queue using the above created IAM role. |
| 220 | + |
| 221 | +The EventBridge filter pattern will match any SQS message from the source queue with a JSON-stringified body |
| 222 | +`{\"from\":\"kubernetes\"}`. Alternatively, the filter pattern can be omitted to forward all messages from the source |
| 223 | +queue. |
| 224 | + |
| 225 | +```bash |
| 226 | +kubectl create ns ${PIPE_NAMESPACE} |
| 227 | + |
| 228 | +cat <<EOF > pipe-sqs-to-sqs.yaml |
| 229 | +apiVersion: pipes.services.k8s.aws/v1alpha1 |
| 230 | +kind: Pipe |
| 231 | +metadata: |
| 232 | + name: $PIPE_NAME |
| 233 | +spec: |
| 234 | + name: $PIPE_NAME |
| 235 | + source: $SOURCE_QUEUE_ARN |
| 236 | + description: "SQS to SQS Pipe with filtering" |
| 237 | + sourceParameters: |
| 238 | + filterCriteria: |
| 239 | + filters: |
| 240 | + - pattern: "{\"body\":{\"from\":[\"kubernetes\"]}}" |
| 241 | + sqsQueueParameters: |
| 242 | + batchSize: 1 |
| 243 | + maximumBatchingWindowInSeconds: 1 |
| 244 | + target: $TARGET_QUEUE_ARN |
| 245 | + roleARN: $PIPE_ROLE_ARN |
| 246 | +EOF |
| 247 | + |
| 248 | +kubectl -n ${PIPE_NAMESPACE} create -f pipe-sqs-to-sqs.yaml |
| 249 | +``` |
| 250 | + |
| 251 | +The output of above commands looks like |
| 252 | + |
| 253 | +```bash |
| 254 | +namespace/pipes-example created |
| 255 | +pipe.pipes.services.k8s.aws/pipes-sqs-to-sqs created |
| 256 | +``` |
| 257 | + |
| 258 | +### Describe Pipe Custom Resource |
| 259 | + |
| 260 | +View the Pipe custom resource to verify it is in a `RUNNING` state. |
| 261 | + |
| 262 | +```bash |
| 263 | +kubectl -n $PIPE_NAMESPACE get pipe $PIPE_NAME |
| 264 | +``` |
| 265 | + |
| 266 | +The output of above commands looks like |
| 267 | + |
| 268 | +```bash |
| 269 | +NAME STATE SYNCED AGE |
| 270 | +pipes-sqs-to-sqs RUNNING True 3m10s |
| 271 | +``` |
| 272 | + |
| 273 | +### Verify the Pipe filtering and forwarding is working |
| 274 | + |
| 275 | +Execute the following command to send a message to the source queue with a body matching the pipe filter pattern. |
| 276 | + |
| 277 | +```bash |
| 278 | +aws sqs send-message --queue-url https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SOURCE_QUEUE} --message-body "{\"from\":\"kubernetes\"}" |
| 279 | +``` |
| 280 | + |
| 281 | +The output of above commands looks like |
| 282 | + |
| 283 | +```bash |
| 284 | +{ |
| 285 | + "MD5OfMessageBody": "fde2da607356f1974691e48fa6a87157", |
| 286 | + "MessageId": "f4157187-0308-420c-b69b-aa439e6be7e3" |
| 287 | +} |
| 288 | +``` |
| 289 | + |
| 290 | +Verify the message was consumed by the pipe, the filter pattern matched and the message was received by the target queue |
| 291 | +with |
| 292 | + |
| 293 | +```bash |
| 294 | +aws sqs receive-message --queue-url https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${TARGET_QUEUE} |
| 295 | +``` |
| 296 | + |
| 297 | +{{% hint type="info" title="Receive Delays" %}} |
| 298 | +It might take some time for the Pipe to consume the message from the source and deliver it to the target queue. |
| 299 | +If the above command does not return a message, rerun the command a couple of times with some delay in between the requests. |
| 300 | +{{% /hint %}} |
| 301 | + |
| 302 | +The output of above commands looks like |
| 303 | + |
| 304 | +```bash |
| 305 | +{ |
| 306 | + "Messages": [ |
| 307 | + { |
| 308 | + <snip> |
| 309 | + "MD5OfBody": "d5255184c571cca2c78e76d6eea1745d", |
| 310 | + "Body": "{\"messageId\":\"f4157187-0308-420c-b69b-aa439e6be7e3\", |
| 311 | + <snip> |
| 312 | + \"body\":\"{\\\"from\\\":\\\"kubernetes\\\"}\",\"attributes\":{\"ApproximateReceiveCount\":\"1\", |
| 313 | + <snip> |
| 314 | + \"eventSourceARN\":\"arn:aws:sqs:us-east-1:1234567890:pipes-sqs-source\",\"awsRegion\":\"us-east-1\"}" |
| 315 | + } |
| 316 | + ] |
| 317 | +} |
| 318 | +``` |
| 319 | + |
| 320 | +## Next steps |
| 321 | + |
| 322 | +The ACK service controller for Amazon EventBridge Pipes is based on the [Amazon EventBridge Pipes |
| 323 | +API](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/Welcome.html). |
| 324 | + |
| 325 | +Refer to [API Reference](https://aws-controllers-k8s.github.io/community/reference/) for *Pipes* to find all the |
| 326 | +supported Kubernetes custom resources and fields. |
| 327 | + |
| 328 | +### Cleanup |
| 329 | + |
| 330 | +Remove all the resource created in this tutorial using `kubectl delete` command. |
| 331 | + |
| 332 | +```bash |
| 333 | +kubectl -n ${QUEUE_NAMESPACE} delete -f pipe-sqs-to-sqs.yaml |
| 334 | +``` |
| 335 | + |
| 336 | +The output of delete command should look like |
| 337 | + |
| 338 | +```bash |
| 339 | +pipe.pipes.services.k8s.aws "pipes-sqs-to-sqs" deleted |
| 340 | +``` |
| 341 | + |
| 342 | +{{% hint type="info" title="Deleting Delays" %}} |
| 343 | +It might take some time for the Pipe to be deleted as the operation is performed asynchronously in the API. |
| 344 | +{{% /hint %}} |
| 345 | + |
| 346 | +To remove the Pipes ACK service controller, related CRDs, and namespaces, see [ACK Cleanup][cleanup]. |
| 347 | + |
| 348 | +To delete your EKS clusters, see [Amazon EKS - Deleting a cluster][cleanup-eks]. |
| 349 | + |
| 350 | +[irsa-permissions]: ../../user-docs/irsa/ |
| 351 | +[cleanup]: ../../user-docs/cleanup/ |
| 352 | +[cleanup-eks]: https://docs.aws.amazon.com/eks/latest/userguide/delete-cluster.html |
0 commit comments